Neues in Apache Kafka 0.10 und Confluent Platform 3.0.0

Keine Kommentare

Die im Mai erschienenen neuen Versionen von Apache Kafka und Confluent Platform enthalten einige spannende Neuerungen. Diese werden in diesem Artikel vorgestellt.

Was ist Apache Kafka?

Kafka ist ein verteilter Message Broker, der nach dem Publish-Subscribe-Prinzip arbeitet. Durch seine spezielle Architektur kann ein Kafka-Cluster immensen Durchsatz bei sehr niedriger Latenz erreichen – und obendrein noch hervorragend horizontal skalieren. Die offizielle Dokumentation führt die Grundprinzipien hier sehr gut aus – eine Erläuterung an dieser Stelle würde den Rahmen dieses Artikels sprengen. Seine Eigenschaften machen Kafka zu einem elementaren Bestandteil des SMACK-Stack, über den ich schon an anderer Stelle geschrieben habe.

Die Kafka-Distribution von Apache enthält im Wesentlichen drei Hauptkomponenten:

  • Kafka-Broker
  • Kafka-Java-Client-Bibliotheken
    • Producer-API (seit 0.8.1)
    • Consumer-API (seit 0.9)
    • Kafka Streams (seit 0.10)
  • Kafka Connect (seit 0.9)
    • Verteiltes Werkzeug zur Integration von Kafka und anderen Systemen
    • enthält Dateisystem-Connector
    • Eine ganze Reihe von Third-Party Connectors sind verfügbar (http://www.confluent.io/product/connectors)

Confluent Platform

Die Firma Confluent wurde von den Entwicklern von Kafka gegründet und ist – innerhalb der Regeln der Apache Software Foundation – die treibende Kraft der Weiterentwicklung Kafkas. Die “Confluent Platform” ist eine Streaming-Plattform, die auf der Basis von Apache Kafka aufsetzt und diese um zusätzliche Features erweitert.

Elemente der Plattform in der Version 3.0.0 sind:

    • Kafka-Distribution
      • Enthält ggf. schon Bugfixes, die in der aktuellsten Apache-Kafka-Distribution noch nicht enthalten sind.
    • Zusätzliche Kafka Connectors
      • JDBC
      • HDFS
    • Kafka-Clients
      • Python
      • C/C++
    • REST-Proxy
      • Kafka-REST-API für Sprachen, für die es keine nativen Clients gibt
    • Schema-Registry
      • Auf Apache Avro basierendes Tool zur Verwaltung von Avro-Schemata für die Serialisierung und Deserialisierung von Kafka-Nachrichten
    • Control Center
      • Monitoring-Lösung für ein Kafka-Cluster
      • Im Gegensatz zu den anderen Teilen der Plattform nicht Open Source und lizenzpflichtig!
      • 30 Tage kostenfrei evaluierbar

Somit ergibt sich folgendes Bild:

Die Confluent Platform besteht aus der Basisdistribution, Erweiterungen wie der Schema Registry sowie dem kommerziellen Confluent Control Center

Die Confluent Platform besteht aus der Basisdistribution, Erweiterungen wie der Schema Registry sowie dem kommerziellen Confluent Control Center

Wesentliche Neuerungen

Dieser Absatz stellt die wichtigsten Neuerungen an den Brokern, Connect und Client-Bibliotheken in Apache Kafka 0.10 sowie das neue Confluent Control Center vor:

Kafka-Broker

      • Zeitstempel

        Mit Version 0.10 wurde das Kafka-Protokoll erweitert. Jede Nachricht enthält nun im Header einen Zeitstempel zusätzlich zum Offset der Nachricht (fortlaufende ID einer Nachricht innerhalb eines Topics und einer Partition). Der Zeitstempel zeigt an, wann die Nachricht produziert wurde.

      • Rack-Awareness

        Kafka Topics sind in der Regel verteilt. Ein neues Feature stellt sicher, dass Replicas eines Topics über verschiedene Racks in einem Rechenzentrum verteilt werden können, so dass bei einem Ausfall eines Racks nicht sämtliche Replikationen einer Topic-Partition verloren gehen. Ein Beispiel hierfür ist die Verteilung von Brokern über Availability Zones in AWS.

      • Neue Security Features

        Die SASL-Unterstützung wurde ausgebaut.

Kafka-Clients

Die wichtigsten funktionalen Neuerungen in Kafka 0.10 liegen im Client-Bereich:

      • Kafka Streams

        Kafka Streams ist eine vollkommen neue Kafka-Client-API. Die API enhält eine Streaming-DSL, mit der leichtgewichtig Streaming-Anwendungen implementiert werden können. Sie enthält die aus anderen APIs und Frameworks bekannten Operationen wie map, filter, groupBy etc. Die Verarbeitung von Nachrichten erfolgt pro Event, durch die neuen Zeitstempel in Kafka-Nachrichten können sogar zeitliche Abweichungen in der Verarbeitung von Nachrichten behandelt werden. Zusätzlich skalieren Streaming-Anwendungen „out of the box“ und können sowohl eigenständig, in Docker-Containern als auch über Resource-Scheduler wie Mesos laufen. Ein konkretes Beispiel einer einfachen Kafka-Streaming-Anwendung folgt weiter unten im Text. Bei Kafka Streams handelt es sich um eine Bibliothek für Kafka-zu-Kafka-Verarbeitung. Für den Austausch von Daten zwischen Kafka und anderen Systemen wie Datenbanken soll Kafka Connect verwendet werden.

      • Producer- und Consumer-API

        Es gibt zwei große Änderungen in den APIs. Zum einen kann ein Kafka-Consumer nun definieren, wie viele Nachrichten er pro Polling-Operation maximal erhalten will. In früheren Versionen war dies nicht möglich und konnte dazu führen, dass ein Consumer so viele Nachrichten erhielt, dass die Verarbeitung länger dauerte als das konfigurierte Heartbeat-Intervall. In diesen Fällen wird ein Consumer am Cluster deregistriert, da dieses annimmt, dass er nicht mehr existiert. Der Entwickler bekommt somit mehr Kontrolle.

        Die zweite Neuerung sind Interceptors an Producern und Consumern. Hierbei handelt es sich um Implementierungen der Interfaces ProducerInterceptor und ConsumerInteceptor.
        Diese Interceptoren sind mächtige Tools. Sie können verwendet werden, um Nachrichten vor dem Schreiben oder Lesen durch die Anwendung zu verändern, zum Beispiel zum Zweck einer Verschlüsselung oder Signatur. Anderseits ermöglichen sie ein Ende-zu-Ende-Monitoring, das in Kafka bislang so nicht möglich war. Zum Beispiel ist es somit möglich, Interceptoren zu schreiben, die Kafka-Metadaten an Monitoring-Lösungen wie den ELK Stack schicken. Confluent nutzt Interceptoren für das Confluent Control Center.

Kafka Connect

Die wichtigste Änderung an Kafka Connect ist die Erweiterung der REST-API um Monitoring-Endpunkte. Damit muss man nicht wie bislang Log-Dateien auswerten, um den Zustand eines Kafka Connectors zu beobachten.

Confluent Control Center

Auch wenn es sich hierbei um die einzige proprietäre Komponente der Confluent Platform handelt, muss es an dieser Stelle erwähnt werden. Auf Basis der neuen Interceptoren in den Kafka-Client-APIs ist das Control Center ein Schritt zur besseren Monitorbarkeit eines Kafka-Clusters. Da das Control Center auch nur Gebrauch von den Standardmöglichkeiten Kafkas macht, sind freie Tools mit ähnlichen Fähigkeiten durchaus möglich.

Ein Beispiel

Das folgende Beispiel demonstriert einige der bereits genannten neuen Features. Wir bauen eine Datenpipeline auf, die über Kafka Connect eine Datei ausliest und den Inhalt zeilenweise in ein Kafka Topic schreibt. Dieses Topic dient als Datenquelle für mehrere Kafka Streams, die die Nachrichten auf verschiedene Arten verarbeiten und in weitere Kafka Topics schreiben. Über eine Kafka-Connect-Datensenke werden die Inhalte dieses Topics wieder auf die Festplatte geschrieben. Wir nutzen das Confluent Control Center zur Administration der Pipeline.

Aufsetzen der Kafka-Umgebung

Ein normal ausgestatteter PC reicht zum Aufsetzen einer Testumgebung.
Zunächst checken wir ein GitHub-Projekt aus und laden die Confluent Platform herunter.

git clone https://github.com/ftrossbach/kafka010intro
cd kafka010intro
wget http://packages.confluent.io/archive/3.0/confluent-3.0.0-2.11.zip
unzip confluent-3.0.0-2.11.zip

Die folgenden Schritte starten die einzelnen Infrastrukturbestandteile im Hintergrund und leiten das Output an Logdateien weiter:

# Start Zookeeper
./confluent-3.0.0/bin/zookeeper-server-start ./confluent-3.0.0/etc/kafka/zookeeper.properties > zk.log &
 
# Give Zookeeper some time to start
sleep 5s
 
# Start a single Kafka Broker
./confluent-3.0.0/bin/kafka-server-start ./confluent-3.0.0/etc/kafka/server.properties > broker.log &
# Give Broker some time to start
sleep 5s
 
# Start Schema Registry
./confluent-3.0.0/bin/schema-registry-start ./confluent-3.0.0/etc/schema-registry/schema-registry.properties > registry.log &
# Give Schema Registry some time to start
sleep 5s
 
# Start Kafka Connect
./confluent-3.0.0/bin/connect-distributed connect-distributed.properties > connect.log &
 
# Give Connect some time to start
sleep 5s
 
# Start Confluent Control Center (dauert eine Weile)
./confluent-3.0.0/bin/control-center-start control-center.properties > ccc.log &

Nach dem Start können wir uns unter http://localhost:9021 das Control-Center ansehen:

Die Übersichtsseite des Confluent Control Centers

Die Übersichtsseite des Confluent Control Centers

Da wir noch keinen Kafka-Producer oder -Consumer gestartet haben, sehen wir auch noch keine Informationen. Dies ändern wir nun aber, indem wir eine Kafka-Connect-Datei-Quelle anlegen, die den Text von Goethes Faust aus einer Datei einliest. Zunächst legen wir ein Topic namens „faust“ an:

./confluent-3.0.0/bin/kafka-topics --create --topic faust --zookeeper localhost:2181 \
--partitions 1 --replication-factor 1

Im Anschluss konfigurieren wir den Connector. Dies kann per REST-API oder per Control Center geschehen. Wir nutzen das Control Center:

connect-source

Die Konfiguration einer Dateiquelle

Der funktional äquivalente REST-Aufruf sähe so aus:

curl -X POST -H "Content-Type: application/json" --data '{"name": "faust-quelle", "config": {"connector.class":"org.apache.kafka.connect.file.FileStreamSourceConnector", "tasks.max":"1", "topic":"faust", "file":"/Users/ftr/Projects/kafka010intro/faust.txt" }}' http://localhost:8083/connectors

Im Anschluss prüfen wir per Terminal, ob das Topic nun alle Zeilen des Faust als einzelne Nachrichten enthält:

./confluent-3.0.0/bin/kafka-console-consumer --from-beginning \
--zookeeper localhost:2181 --topic faust

Den Status des Connectors können wir seit Version 0.10 nun auch per REST prüfen:

curl localhost:8083/connectors/faust-quelle/status
 
{
 "name": "faust-quelle",
 "connector": {
   "state": "RUNNING",
   "worker_id": "192.168.178.23:8083"
 },
 "tasks": [
   {
     "state": "RUNNING",
     "id": 0,
     "worker_id": "192.168.178.23:8083"
   }
 ]
}

Als nächstes starten wir einen Connector, der die Zeilen aus dem Topic “faust” liest und in eine andere Datei schreibt:

Die Konfiguration einer Dateisenke

Ein Blick auf das Stream-Monitoring zeigt mittlerweile Bewegung:

Das Stream-Monitoring nach dem ersten Nachrichtenfluss

Wir können uns nun auf der Festplatte eine Kopie des Faust ansehen, aber das ist natürlich noch ein ziemlich langweiliger Anwendungsfall. Im nächsten Schritt starten wir eine Kafka-Streaming-Anwendung, die die Daten zwischen zwei Kafka Connectors transformiert. Der simpelste Fall ist die Faust-Shout-Anwendung – jede Zeile wird in Großbuchstaben transformiert. Dazu legen wir zunächst ein Ziel-Topic namens „faust-shout“ an:

./confluent-3.0.0/bin/kafka-topics --create --topic faust-shout \
--zookeeper localhost:2181 --partitions 1 --replication-factor 1

Die Streaming-Anwendung selbst ist nun ziemlich einfach:

public class Shout {
 
   public static void main(String[] args) {
 
       Properties streamsConfiguration = new Properties();
       streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "shout");
       streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
       streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
       streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
       streamsConfiguration.put("auto.offset.reset", "earliest");
 
 
       final Serde stringSerde = Serdes.String();
 
       KStreamBuilder builder = new KStreamBuilder();
       //lese die Key-Value Paare aus dem Topic faust
       KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "faust");
       //transformiere die Paare, in dem der Value (line) in Großbuchstaben formatiert wird
       KStream<String, String> upperCaseLines = textLines.map((key, line) -> new KeyValue<>(key, line.toUpperCase()));
       //schreibe den transformierten Stream in das Topic faust-shout
       upperCaseLines.to(stringSerde, stringSerde, "faust-shout");
 
 
       KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
       //hier startet die Transformations-Pipeline
       streams.start();
   }
}

Nachdem wir eine weitere Kafka-Connect-Dateisenke angelegt haben, die die Daten aus dem Topic “faust-shout” liest und in eine Datei schreibt, sehen wir den Faust in seiner ganzen schreienden Pracht:

ZUEIGNUNG.

IHR NAHT EUCH WIEDER, SCHWANKENDE GESTALTEN,
DIE FRÜH SICH EINST DEM TRÜBEN BLICK GEZEIGT.
VERSUCH ICH WOHL, EUCH DIESMAL FESTZUHALTEN?
FÜHL ICH MEIN HERZ NOCH JENEM WAHN GENEIGT?
IHR DRÄNGT EUCH ZU! NUN GUT, SO MÖGT IHR WALTEN,
WIE IHR AUS DUNST UND NEBEL UM MICH STEIGT;
MEIN BUSEN FÜHLT SICH JUGENDLICH ERSCHÜTTERT
VOM ZAUBERHAUCH, DER EUREN ZUG UMWITTERT.

Im Prinzip haben wir somit eine erste ETL-Pipeline umgesetzt – wir haben Daten aus einer Dateiquelle extrahiert, sie mit Kafka Streams transformiert und sie in ein weiteres System geladen, auch wenn es sich bei letzterem nur um das Dateisystem handelt. Kafka Streams kann aber noch mehr! Im letzten Beispiel werden wir zählen, wie oft ein Protagonist im Faust spricht.
Zunächst legen wir hierfür ein weiteres Topic an:

./confluent-3.0.0/bin/kafka-topics --create --topic faust-count \
--zookeeper localhost:2181 --partitions 1 --replication-factor 1

Für die Berechnung kommt uns die Struktur der Quelldatei zu Gute, da man die benötigte Information sehr leicht daraus gewinnen kann:

MEPHISTOPHELES:
Wozu der Lärm? was steht dem Herrn zu Diensten?

FAUST:
Das also war des Pudels Kern!
Ein fahrender Skolast? Der Kasus macht mich lachen.

Wir betrachten also alle Zeilen, die mit mindestens zwei Großbuchstaben beginnen und mit einem Doppelpunkt enden. Die Namen, die wir daraus mit einem regulären Ausdruck extrahieren, benutzen wir als Schlüssel, deren Vorkommen wir im Anschluss zählen.

In Quellcode sieht dies so aus:

final static Pattern pattern = Pattern.compile("([A-Z]{2,}\\s?([A-Z]*)).*:");
public static void main(String[] args) {
 
   Properties streamsConfiguration = new Properties();
   streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG, "count");
   streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
   streamsConfiguration.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, "localhost:2181");
   streamsConfiguration.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
   streamsConfiguration.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
   streamsConfiguration.put("auto.offset.reset", "earliest");
 
 
   final Serde stringSerde = Serdes.String();
   final Serde longSerde = Serdes.Long();
 
   KStreamBuilder builder = new KStreamBuilder();
   //lese die Key-Value Paare aus dem Topic faust
   KStream<String, String> textLines = builder.stream(stringSerde, stringSerde, "faust");
 
 
   //filtere nur die Zeilen heraus, die dem Schema "NAME IN GROSSBUCHSTABEN:" folgen
   KStream<String, String> filteredLines = textLines.filter((key, value) -> pattern.matcher(value).matches());
 
   //extrahiere den Characternamen und schreibe ihn als Schlüssel
   KStream<String, String> characterNameAsKey = filteredLines.map((key, value) -> {
       Matcher matcher = pattern.matcher(value);
       matcher.find();
       return new KeyValue<>(matcher.group(1).trim(), value);
   });
 
   //Zähle die Vorkommen der Schlüssel (Charakternamen)
   KTable<String, Long> countTable = characterNameAsKey.countByKey("CountTable");
 
   //Kombiniere Charaktername (key) und Wert im Wert
   KStream<String, String> countStream = countTable.toStream().map((key,value) -> new KeyValue<>(key, key + ": " + value));
 
 
   //schreibe den transformierten Stream in das Topic faust-count
   countStream.to(stringSerde, stringSerde, "faust-count");
 
 
   KafkaStreams streams = new KafkaStreams(builder, streamsConfiguration);
   //hier startet die Transformations-Pipeline
   streams.start();
}

Wir können uns den Inhalt des Topics “faust-count” nun über Kafka Connect auf die Festplatte schreiben. Da es sich um Stream-Processing handelt, sehen wir in dieser Datei nicht nur die endgültige Anzahl an Auftritten pro Protagonist, sondern die Entwicklung. So finden wir heraus, dass Mephistopheles ca 250 mal zur Sprache kommt:

DIREKTOR: 1
DICHTER: 1
LUSTIGE PERSON: 1
DIREKTOR: 2
DICHTER: 2
DIREKTOR: 3
DICHTER: 3
LUSTIGE PERSON: 2
DICHTER: 4
LUSTIGE PERSON: 3
DIREKTOR: 4
RAPHAEL: 1
GABRIEL: 1
MICHAEL: 1
ZU DREI: 1
MEPHISTOPHELES: 1
[..]
MARGARETE: 79
FAUST: 224
MARGARETE: 80
FAUST: 225
MARGARETE: 81
FAUST: 226
MARGARETE: 82
FAUST: 227
MEPHISTOPHELES: 253
MARGARETE: 83
FAUST: 228
MARGARETE: 84
MEPHISTOPHELES: 254
MARGARETE: 85
MEPHISTOPHELES: 255
STIMME: 12
MEPHISTOPHELES: 256
STIMME: 13
MEPHISTOPHELES: 257
MEPHISTOPHELES: 258

Fazit

Hiermit beenden wir unseren ersten Einblick in die Neuerungen in Kafka 0.10 und Confluent Platform 3.0.0. Kafka Streams treten an, um Frameworks wie Spark den Rang abzulaufen. Das ist vor allem da möglich, wo es um Transformationen zwischen Kafka-Topics handelt. Die Anbindung an Umsysteme über Connect wurde vor allem um Monitoring-Fähigkeiten erweitert – ob dies genügt, um etabliertere Technologien wie Spark ablösen zu können, hängt vom Anwendungsfall ab. Das Konzept der Interceptors ermöglicht verbessertes Monitoring von Kafka-Clustern. Das Confluent Control Center zeigt hier einen vielversprechenden Ansatz, ist aber kommerziell.

Anhänge und Literatur

Florian Troßbach

Florian Troßbach arbeitet als IT-Consultant bei der codecentric AG. Seine Wurzeln hat er in der klassischen Java-Enterprise-Entwicklung, mittlerweile gilt sein Hauptaugenmerk jedoch dem Themenbereich “Fast Data” und dem SMACK Stack.

Share on FacebookGoogle+Share on LinkedInTweet about this on TwitterShare on RedditDigg thisShare on StumbleUpon

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind markiert *