//

Neues in Apache Kafka 0.10 und Confluent Platform 3.0.0

7.6.2016 | 10 Minuten Lesezeit

Die im Mai erschienenen neuen Versionen von Apache Kafka und Confluent Platform enthalten einige spannende Neuerungen. Diese werden in diesem Artikel vorgestellt.

Was ist Apache Kafka?

Kafka ist ein verteilter Message Broker, der nach dem Publish-Subscribe-Prinzip arbeitet. Durch seine spezielle Architektur kann ein Kafka-Cluster immensen Durchsatz bei sehr niedriger Latenz erreichen – und obendrein noch hervorragend horizontal skalieren. Die offizielle Dokumentation führt die Grundprinzipien hier sehr gut aus – eine Erläuterung an dieser Stelle würde den Rahmen dieses Artikels sprengen. Seine Eigenschaften machen Kafka zu einem elementaren Bestandteil des SMACK-Stack , über den ich schon an anderer Stelle geschrieben habe.

Die Kafka-Distribution von Apache enthält im Wesentlichen drei Hauptkomponenten:

  • Kafka-Broker
  • Kafka-Java-Client-Bibliotheken
    • Producer-API (seit 0.8.1)
    • Consumer-API (seit 0.9)
    • Kafka Streams (seit 0.10)
  • Kafka Connect (seit 0.9)
    • Verteiltes Werkzeug zur Integration von Kafka und anderen Systemen
    • enthält Dateisystem-Connector
    • Eine ganze Reihe von Third-Party Connectors sind verfügbar (http://www.confluent.io/product/connectors)

Confluent Platform

Die Firma Confluent wurde von den Entwicklern von Kafka gegründet und ist – innerhalb der Regeln der Apache Software Foundation – die treibende Kraft der Weiterentwicklung Kafkas. Die “Confluent Platform” ist eine Streaming-Plattform, die auf der Basis von Apache Kafka aufsetzt und diese um zusätzliche Features erweitert.

Elemente der Plattform in der Version 3.0.0 sind:

    • Kafka-Distribution
      • Enthält ggf. schon Bugfixes, die in der aktuellsten Apache-Kafka-Distribution noch nicht enthalten sind.
    • Zusätzliche Kafka Connectors
      • JDBC
      • HDFS
    • Kafka-Clients
      • Python
      • C/C++
    • REST-Proxy
      • Kafka-REST-API für Sprachen, für die es keine nativen Clients gibt
    • Schema-Registry
      • Auf Apache Avro basierendes Tool zur Verwaltung von Avro-Schemata für die Serialisierung und Deserialisierung von Kafka-Nachrichten
    • Control Center
      • Monitoring-Lösung für ein Kafka-Cluster
      • Im Gegensatz zu den anderen Teilen der Plattform nicht Open Source und lizenzpflichtig!
      • 30 Tage kostenfrei evaluierbar

Somit ergibt sich folgendes Bild:

Die Confluent Platform besteht aus der Basisdistribution, Erweiterungen wie der Schema Registry sowie dem kommerziellen Confluent Control Center

Wesentliche Neuerungen

Dieser Absatz stellt die wichtigsten Neuerungen an den Brokern, Connect und Client-Bibliotheken in Apache Kafka 0.10 sowie das neue Confluent Control Center vor:

Kafka-Broker

      • Zeitstempel

        Mit Version 0.10 wurde das Kafka-Protokoll erweitert. Jede Nachricht enthält nun im Header einen Zeitstempel zusätzlich zum Offset der Nachricht (fortlaufende ID einer Nachricht innerhalb eines Topics und einer Partition). Der Zeitstempel zeigt an, wann die Nachricht produziert wurde.

      • Rack-Awareness

        Kafka Topics sind in der Regel verteilt. Ein neues Feature stellt sicher, dass Replicas eines Topics über verschiedene Racks in einem Rechenzentrum verteilt werden können, so dass bei einem Ausfall eines Racks nicht sämtliche Replikationen einer Topic-Partition verloren gehen. Ein Beispiel hierfür ist die Verteilung von Brokern über Availability Zones in AWS.

      • Neue Security Features

        Die SASL -Unterstützung wurde ausgebaut.

Kafka-Clients

Die wichtigsten funktionalen Neuerungen in Kafka 0.10 liegen im Client-Bereich:

      • Kafka Streams

        Kafka Streams ist eine vollkommen neue Kafka-Client-API. Die API enhält eine Streaming-DSL, mit der leichtgewichtig Streaming-Anwendungen implementiert werden können. Sie enthält die aus anderen APIs und Frameworks bekannten Operationen wie map, filter, groupBy etc. Die Verarbeitung von Nachrichten erfolgt pro Event, durch die neuen Zeitstempel in Kafka-Nachrichten können sogar zeitliche Abweichungen in der Verarbeitung von Nachrichten behandelt werden. Zusätzlich skalieren Streaming-Anwendungen „out of the box“ und können sowohl eigenständig, in Docker-Containern als auch über Resource-Scheduler wie Mesos laufen. Ein konkretes Beispiel einer einfachen Kafka-Streaming-Anwendung folgt weiter unten im Text. Bei Kafka Streams handelt es sich um eine Bibliothek für Kafka-zu-Kafka-Verarbeitung. Für den Austausch von Daten zwischen Kafka und anderen Systemen wie Datenbanken soll Kafka Connect verwendet werden.

      • Producer- und Consumer-API

        Es gibt zwei große Änderungen in den APIs. Zum einen kann ein Kafka-Consumer nun definieren, wie viele Nachrichten er pro Polling-Operation maximal erhalten will. In früheren Versionen war dies nicht möglich und konnte dazu führen, dass ein Consumer so viele Nachrichten erhielt, dass die Verarbeitung länger dauerte als das konfigurierte Heartbeat-Intervall. In diesen Fällen wird ein Consumer am Cluster deregistriert, da dieses annimmt, dass er nicht mehr existiert. Der Entwickler bekommt somit mehr Kontrolle.

        Die zweite Neuerung sind Interceptors an Producern und Consumern. Hierbei handelt es sich um Implementierungen der Interfaces ProducerInterceptor und ConsumerInteceptor .
        Diese Interceptoren sind mächtige Tools. Sie können verwendet werden, um Nachrichten vor dem Schreiben oder Lesen durch die Anwendung zu verändern, zum Beispiel zum Zweck einer Verschlüsselung oder Signatur. Anderseits ermöglichen sie ein Ende-zu-Ende-Monitoring, das in Kafka bislang so nicht möglich war. Zum Beispiel ist es somit möglich, Interceptoren zu schreiben, die Kafka-Metadaten an Monitoring-Lösungen wie den ELK Stack schicken. Confluent nutzt Interceptoren für das Confluent Control Center.

Kafka Connect

Die wichtigste Änderung an Kafka Connect ist die Erweiterung der REST-API um Monitoring-Endpunkte. Damit muss man nicht wie bislang Log-Dateien auswerten, um den Zustand eines Kafka Connectors zu beobachten.

Confluent Control Center

Auch wenn es sich hierbei um die einzige proprietäre Komponente der Confluent Platform handelt, muss es an dieser Stelle erwähnt werden. Auf Basis der neuen Interceptoren in den Kafka-Client-APIs ist das Control Center ein Schritt zur besseren Monitorbarkeit eines Kafka-Clusters. Da das Control Center auch nur Gebrauch von den Standardmöglichkeiten Kafkas macht, sind freie Tools mit ähnlichen Fähigkeiten durchaus möglich.

Ein Beispiel

Das folgende Beispiel demonstriert einige der bereits genannten neuen Features. Wir bauen eine Datenpipeline auf, die über Kafka Connect eine Datei ausliest und den Inhalt zeilenweise in ein Kafka Topic schreibt. Dieses Topic dient als Datenquelle für mehrere Kafka Streams, die die Nachrichten auf verschiedene Arten verarbeiten und in weitere Kafka Topics schreiben. Über eine Kafka-Connect-Datensenke werden die Inhalte dieses Topics wieder auf die Festplatte geschrieben. Wir nutzen das Confluent Control Center zur Administration der Pipeline.

Aufsetzen der Kafka-Umgebung

Ein normal ausgestatteter PC reicht zum Aufsetzen einer Testumgebung.
Zunächst checken wir ein GitHub-Projekt aus und laden die Confluent Platform herunter.

1git clone https://github.com/ftrossbach/kafka010intro
2cd kafka010intro
3wget http://packages.confluent.io/archive/3.0/confluent-3.0.0-2.11.zip
4unzip confluent-3.0.0-2.11.zip
5

Die folgenden Schritte starten die einzelnen Infrastrukturbestandteile im Hintergrund und leiten das Output an Logdateien weiter:

1# Start Zookeeper
2./confluent-3.0.0/bin/zookeeper-server-start ./confluent-3.0.0/etc/kafka/zookeeper.properties > zk.log &
3 
4# Give Zookeeper some time to start
5sleep 5s
6 
7# Start a single Kafka Broker
8./confluent-3.0.0/bin/kafka-server-start ./confluent-3.0.0/etc/kafka/server.properties > broker.log &
9# Give Broker some time to start
10sleep 5s
11 
12# Start Schema Registry
13./confluent-3.0.0/bin/schema-registry-start ./confluent-3.0.0/etc/schema-registry/schema-registry.properties > registry.log &
14# Give Schema Registry some time to start
15sleep 5s
16 
17# Start Kafka Connect
18./confluent-3.0.0/bin/connect-distributed connect-distributed.properties > connect.log &
19 
20# Give Connect some time to start
21sleep 5s
22 
23# Start Confluent Control Center (dauert eine Weile)
24./confluent-3.0.0/bin/control-center-start control-center.properties > ccc.log &
25

Nach dem Start können wir uns unter http://localhost:9021 das Control-Center ansehen:

Die Übersichtsseite des Confluent Control Centers

Da wir noch keinen Kafka-Producer oder -Consumer gestartet haben, sehen wir auch noch keine Informationen. Dies ändern wir nun aber, indem wir eine Kafka-Connect-Datei-Quelle anlegen, die den Text von Goethes Faust aus einer Datei einliest. Zunächst legen wir ein Topic namens „faust“ an:

1./confluent-3.0.0/bin/kafka-topics --create --topic faust --zookeeper localhost:2181 \
2--partitions 1 --replication-factor 1
3

Im Anschluss konfigurieren wir den Connector. Dies kann per REST-API oder per Control Center geschehen. Wir nutzen das Control Center:

Die Konfiguration einer Dateiquelle

Der funktional äquivalente REST-Aufruf sähe so aus:

1curl -X POST -H "Content-Type: application/json" --data '{"name": "faust-quelle", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "tasks.max":"1", "topic":"faust", "file":"/Users/ftr/Projects/kafka010intro/faust.txt" }}' http://localhost:8083/connectors
2

Im Anschluss prüfen wir per Terminal, ob das Topic nun alle Zeilen des Faust als einzelne Nachrichten enthält:

1./confluent-3.0.0/bin/kafka-console-consumer --from-beginning \
2--zookeeper localhost:2181 --topic faust
3

Den Status des Connectors können wir seit Version 0.10 nun auch per REST prüfen:

1curl localhost:8083/connectors/faust-quelle/status
2 
3{
4 "name": "faust-quelle",
5 "connector": {
6   "state": "RUNNING",
7   "worker_id": "192.168.178.23:8083"
8 },
9 "tasks": [
10   {
11     "state": "RUNNING",
12     "id": 0,
13     "worker_id": "192.168.178.23:8083"
14   }
15 ]
16}
17

Als nächstes starten wir einen Connector, der die Zeilen aus dem Topic “faust” liest und in eine andere Datei schreibt:

Ein Blick auf das Stream-Monitoring zeigt mittlerweile Bewegung:

Wir können uns nun auf der Festplatte eine Kopie des Faust ansehen, aber das ist natürlich noch ein ziemlich langweiliger Anwendungsfall. Im nächsten Schritt starten wir eine Kafka-Streaming-Anwendung, die die Daten zwischen zwei Kafka Connectors transformiert. Der simpelste Fall ist die Faust-Shout-Anwendung – jede Zeile wird in Großbuchstaben transformiert. Dazu legen wir zunächst ein Ziel-Topic namens „faust-shout“ an:

1./confluent-3.0.0/bin/kafka-topics --create --topic faust-shout \
2--zookeeper localhost:2181 --partitions 1 --replication-factor 1
3

Die Streaming-Anwendung selbst ist nun ziemlich einfach:

1public class Shout {
2 
3   public static void main(String[] args) {
4 
5       Properties streamsConfiguration = new Properties();
6       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "shout");
7       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
8       streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
9       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
10       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
11       streamsConfiguration.put("auto.offset.reset", "earliest");
12 
13 
14       final Serde stringSerde = Serdes.String();
15 
16       KStreamBuilder builder = new KStreamBuilder();
17       //lese die Key-Value Paare aus dem Topic faust
18       KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "faust");
19       //transformiere die Paare, in dem der Value (line) in Großbuchstaben formatiert wird
20       KStream<String, String> upperCaseLines = textLines.map((key, line) -> new KeyValue<>(key, line.toUpperCase()));
21       //schreibe den transformierten Stream in das Topic faust-shout
22       upperCaseLines.to(stringSerde, stringSerde, "faust-shout");
23 
24 
25       KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
26       //hier startet die Transformations-Pipeline
27       streams.start();
28   }
29}
30

Nachdem wir eine weitere Kafka-Connect-Dateisenke angelegt haben, die die Daten aus dem Topic “faust-shout” liest und in eine Datei schreibt, sehen wir den Faust in seiner ganzen schreienden Pracht:

ZUEIGNUNG.

IHR NAHT EUCH WIEDER, SCHWANKENDE GESTALTEN,
DIE FRÜH SICH EINST DEM TRÜBEN BLICK GEZEIGT.
VERSUCH ICH WOHL, EUCH DIESMAL FESTZUHALTEN?
FÜHL ICH MEIN HERZ NOCH JENEM WAHN GENEIGT?
IHR DRÄNGT EUCH ZU! NUN GUT, SO MÖGT IHR WALTEN,
WIE IHR AUS DUNST UND NEBEL UM MICH STEIGT;
MEIN BUSEN FÜHLT SICH JUGENDLICH ERSCHÜTTERT
VOM ZAUBERHAUCH, DER EUREN ZUG UMWITTERT.

Im Prinzip haben wir somit eine erste ETL-Pipeline umgesetzt – wir haben Daten aus einer Dateiquelle extrahiert, sie mit Kafka Streams transformiert und sie in ein weiteres System geladen, auch wenn es sich bei letzterem nur um das Dateisystem handelt. Kafka Streams kann aber noch mehr! Im letzten Beispiel werden wir zählen, wie oft ein Protagonist im Faust spricht.
Zunächst legen wir hierfür ein weiteres Topic an:

1./confluent-3.0.0/bin/kafka-topics --create --topic faust-count \
2--zookeeper localhost:2181 --partitions 1 --replication-factor 1
3

Für die Berechnung kommt uns die Struktur der Quelldatei zu Gute, da man die benötigte Information sehr leicht daraus gewinnen kann:

MEPHISTOPHELES:
Wozu der Lärm? was steht dem Herrn zu Diensten?

FAUST:
Das also war des Pudels Kern!
Ein fahrender Skolast? Der Kasus macht mich lachen.

Wir betrachten also alle Zeilen, die mit mindestens zwei Großbuchstaben beginnen und mit einem Doppelpunkt enden. Die Namen, die wir daraus mit einem regulären Ausdruck extrahieren, benutzen wir als Schlüssel, deren Vorkommen wir im Anschluss zählen.

In Quellcode sieht dies so aus:

1final static Pattern pattern = Pattern.compile("([A-Z]{2,}\\s?([A-Z]*)).*:");
2public static void main(String[] args) {
3 
4   Properties streamsConfiguration = new Properties();
5   streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "count");
6   streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
7   streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
8   streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
9   streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
10   streamsConfiguration.put("auto.offset.reset", "earliest");
11 
12 
13   final Serde stringSerde = Serdes.String();
14   final Serde longSerde = Serdes.Long();
15 
16   KStreamBuilder builder = new KStreamBuilder();
17   //lese die Key-Value Paare aus dem Topic faust
18   KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "faust");
19 
20 
21   //filtere nur die Zeilen heraus, die dem Schema "NAME IN GROSSBUCHSTABEN:" folgen
22   KStream<String, String> filteredLines = textLines.filter((key, value) -> pattern.matcher(value).matches());
23 
24   //extrahiere den Characternamen und schreibe ihn als Schlüssel
25   KStream<String, String> characterNameAsKey = filteredLines.map((key, value) -> {
26       Matcher matcher = pattern.matcher(value);
27       matcher.find();
28       return new KeyValue<>(matcher.group(1).trim(), value);
29   });
30 
31   //Zähle die Vorkommen der Schlüssel (Charakternamen)
32   KTable<String, Long> countTable = characterNameAsKey.countByKey("CountTable");
33 
34   //Kombiniere Charaktername (key) und Wert im Wert
35   KStream<String, String> countStream = countTable.toStream().map((key,value) -> new KeyValue<>(key, key + ": " + value));
36 
37 
38   //schreibe den transformierten Stream in das Topic faust-count
39   countStream.to(stringSerde, stringSerde, "faust-count");
40 
41 
42   KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
43   //hier startet die Transformations-Pipeline
44   streams.start();
45}
46

Wir können uns den Inhalt des Topics “faust-count” nun über Kafka Connect auf die Festplatte schreiben. Da es sich um Stream-Processing handelt, sehen wir in dieser Datei nicht nur die endgültige Anzahl an Auftritten pro Protagonist, sondern die Entwicklung. So finden wir heraus, dass Mephistopheles ca 250 mal zur Sprache kommt:

DIREKTOR: 1
DICHTER: 1
LUSTIGE PERSON: 1
DIREKTOR: 2
DICHTER: 2
DIREKTOR: 3
DICHTER: 3
LUSTIGE PERSON: 2
DICHTER: 4
LUSTIGE PERSON: 3
DIREKTOR: 4
RAPHAEL: 1
GABRIEL: 1
MICHAEL: 1
ZU DREI: 1
MEPHISTOPHELES: 1
[..]
MARGARETE: 79
FAUST: 224
MARGARETE: 80
FAUST: 225
MARGARETE: 81
FAUST: 226
MARGARETE: 82
FAUST: 227
MEPHISTOPHELES: 253
MARGARETE: 83
FAUST: 228
MARGARETE: 84
MEPHISTOPHELES: 254
MARGARETE: 85
MEPHISTOPHELES: 255
STIMME: 12
MEPHISTOPHELES: 256
STIMME: 13
MEPHISTOPHELES: 257
MEPHISTOPHELES: 258

Fazit

Hiermit beenden wir unseren ersten Einblick in die Neuerungen in Kafka 0.10 und Confluent Platform 3.0.0. Kafka Streams treten an, um Frameworks wie Spark den Rang abzulaufen. Das ist vor allem da möglich, wo es um Transformationen zwischen Kafka-Topics handelt. Die Anbindung an Umsysteme über Connect wurde vor allem um Monitoring-Fähigkeiten erweitert – ob dies genügt, um etabliertere Technologien wie Spark ablösen zu können, hängt vom Anwendungsfall ab. Das Konzept der Interceptors ermöglicht verbessertes Monitoring von Kafka-Clustern. Das Confluent Control Center zeigt hier einen vielversprechenden Ansatz, ist aber kommerziell.

Anhänge und Literatur

Beitrag teilen

Gefällt mir

0

//

Gemeinsam bessere Projekte umsetzen

Wir helfen Deinem Unternehmen

Du stehst vor einer großen IT-Herausforderung? Wir sorgen für eine maßgeschneiderte Unterstützung. Informiere dich jetzt.

Hilf uns, noch besser zu werden.

Wir sind immer auf der Suche nach neuen Talenten. Auch für dich ist die passende Stelle dabei.