Wie schreibt man eine Kotlin-DSL – z.B. für Apache Kafka?

Keine Kommentare

Das Interesse an der Programmiersprache Kotlin wächst, und auch die Verwendung von Kotlin in Projekten nimmt zu. Ein Bereich, in dem Kotlin hervorragend verwendet werden kann, ist die Implementierung von speziellen Domänen-spezifischen Sprachen, den DSLs.
Der Wikipedia-Artikel definiert:

Eine domänenspezifische Sprache … oder anwendungsspezifische Sprache ist eine formale Sprache, die zur Interaktion zwischen Menschen und digital arbeitenden Computern („Computersprache“) für ein bestimmtes Problemfeld (die sogenannte Domäne) entworfen und implementiert wird.

In diesem Artikel werde ich eine minimale DSL für den Zugriff auf Apache Kafka implementieren. Diese Sprache verwendet Schlüsselwörter wie kafka, producer, consumer. Sie definiert ebenso eine hierarchische Struktur, welche die Beziehungen dieser Bergiffe modelliert.

Der Zweck dieses Blog-Posts ist es, zu zeigen, wie eine DSL mit Kotlin erstellt werden kann. Die erzeugte DSL wird nur eine minimale Funktionalität haben. Die Nachrichten, die in Kafka Topics geschrieben werden, sind reine Strings. Der Fokus des Artikels liegt auf dem Schreiben der DSL, nicht auf der Benutzung von Apache Kafka.

Der komplette Code für dieses Beispiel ist auf Github verfügbar.

Die DSL

Die beiden folgenden Code-Stücke zeigen die Verwendung der DSL. Das erste Beispiel zeigt einen Producer, der Strings in ein Kafka Topic schreibt. Das zweite Beispiel ist eine Consumer-Anwendung, welche aus Kafka liest.

Ich zeige zuerst den fertigen Code bevor ich erkläre, wie man diese DSL erzeugt:

Der Producer

fun main(args: Array) {
    val log: Logger = LoggerFactory.getLogger("de.codecentric.ProducerRunnerDSL")
    
    kafka("localhost:9092") {
        producer("kt-topic") {
            (1..10).forEach {
                val msg = "test message $it ${LocalDateTime.now()}"
                log.info("sending $msg")
                send(msg)
            }
            flush()
        }
    }
}

Ich verwende als erstes die kafka("localhost:9092")-Methode, um die Verbindungsdetails zum Kafka Broker zu definieren, der auf meinem lokalen Rechner läuft. Innerhalb dieses Kontextes erzeuge ich dann einen Producer mit producer("kt-topic").

Im Kontext des Producers ist eine Schleife, welche 10 Nachrichten erzeugt. Diese Nachrichten werden durch den Aufruf der Producermethode send(msg) an Kafka geschickt. Nach der Schleife sorgt der Aufruf von flush() dafür, dass alle Nachrichten an Kafka geschickt wurden, bevor das Programm terminiert.

Der Consumer

fun main(args: Array) {
  val log: Logger = LoggerFactory.getLogger("de.codecentric.ConsumerRunnerDSL")

  kafka("localhost:9092") {
    consumer("kt-topic") {
      Runtime.getRuntime().addShutdownHook(Thread(Runnable {
        stop()
      }))
      consume {
        log.info("got $it")
      }
    }
  }
}

Wie der Producer definiert auch die Consumer-Anwendung zuerst die Verbindung zum Kafka Broker. Sie erzeugt dann einen Consumer für das topic mit consumer("kt-topic"). In diesem Consumer wird ein Shutdownhook installiert, der sicherstellt, dass die Methode stop() des Consumers am Programmende aufgerufen wird, damit der intern verwendete KafkaConsumerkorrekt beendet wird.

Danach wird die Methode consume des Consumers aufgerufen mit einer Lambda-Funktion als Parameter, welche einfach den String protokolliert, der aus dem Kafka Topic gelesen wurde. Die consume-Methode blockiert, während sie Daten aus Kafka liest und kehrt nur bei einem Fehler zurück oder wenn die Methode stop() von einem anderen Thread aus aufgerufen wird, zum Beispiel beim Programmende.

Voraussetzungen

Apache Kafka

Um die Programme laufen zu lassen benötigt man Apache Kafka. Ich verwende Confluent Platform OpenSource, um meine Implementierung zu testen, aber jedes andere Apache Kafka Setup ist auch in Ordnung.

Kotlin-Konzepte

Zwei Konzepte der Sprache Kotlin sind essentiell, um eine DSL in der Form zu schreiben, wie ich es in diesem Artikel zeige:

Lambdas als Funktionsparameter
Wenn eine Funktion als letzten Aufrufparameter eine andere Funktion erwartet, und dieser Parameter wird in Form eines Lambda übergeben, dann schreibt man das Aufrufargument hinter hinter die Klammern mit den anderen Parametern (Dokumentation):

// func hat einen String als erstes und eine function von String nach String als zweites Argument.
// func ruft die als Argument übergebene Funktion mit dem ersten Parameter auf und gibt den Rückgabewert aus
fun func(arg1: String, calledWithArg1: (String) -> String) {
  println(calledWithArg1(arg1))
}

// man kann den Aufruf so schreiben:
func("foo", { s: String -> s + s })
// Ausgabe ist foofoo

// in Kotlin besser so:
func("foo") {
  it + it
}
//Ausgabe ist foofoo

extension functions:
Extension Functions erlauben es, Funktionalität zu einer vorhandenen Klasse hinzuzufügen. Dabei werden sie nicht in der Klasse selber definiert (Documentation) – das Beispiel hier ist etwas off-topic:

class PaymentInfo(var amount: Double, var creditCard: String)

val p = PaymentInfo(42.0, "1234-5678-0123-1234")

// extension function um den Preis zu verdoppeln, nicht in der Klasse definiert
fun PaymentInfo.twice() {
  // hier ist 'this' das PaymentInfo Objekt
  amount *= 2
}

// die Funktion wird wie eine Methode der Klasse selbst aufgerufen.
p.twice()
// der Betrag ist jetzt 84.0

// eine Function um ein PaymentInfo zu modifizieren, dabei ist der modifier eine extension function
// die als Parameter übergeben wird
fun modifyPayment(payment: PaymentInfo, modifier: PaymentInfo.() -> Unit) {
  payment.modifier()
}

// maskieren der Kreditkartennummer
modifyPayment(p) {
  // hier ist 'this' das PaymentInfo Objekt
  creditCard = "XXX"
}
// im PaymentInfo Objekt ist jetzt die Kreditkarte maskiert

Wir können extension functions zum einen verwenden, um Funktionalität zu Klassen hinzuzufügen, die nicht selber modifiziert werden können, wie zum Beispiel String oder List.
Zum anderen bieten extension functions die Möglichkeit, einer Funktion ein Lambda mitzugeben, welches dann im Kontext des erweiterten Objektes ausgeführt wird. Im Beispielcode wird der modifyPayment()-Methode eine extension function als Parameter übergeben. Diese wird im Kotlin Stil hinter den Methodenaufruf geschrieben.

Schritt 1: interne Klassen um auf Apache Kafka zuzugreifen

In einem ersten Schritt schreiben wir drei Klassen, um in Kafka zu schreiben und von Kafka zu lesen. Diese Klassen verwenden die offizielle Kafka Client Library, die in Java geschrieben ist. Sie bilden eine Kapselung der Kafka-Bibliothek in Kotlin.

Kafka-Konfigurationsklasse

data class Kafka(val bootstrapServers: String)

Die erste Klasse dient der Speicherung der Konfiguration für die Verbindung zu Kafka. Die konfigurierbaren Eigenschaften sind hier auf ein Minimum reduziert.

Die Producer-Klasse

class Producer(kafka: Kafka, private val topic: String) {
  private val kafkaProducer: KafkaProducer<String, String>

  init {
    val config = Properties()
    config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers
    config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
    config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
    kafkaProducer = KafkaProducer(config)
  }

  fun send(msg: String) {
    kafkaProducer.send(ProducerRecord(topic, msg))
  }

  fun flush() = kafkaProducer.flush()
}

Die Producer-Klasse wird mit einem Kafka-Konfigurationsobjekt und dem Namen des Topic initialisiert. Im init-Block wird ein KafkaProducer aus der offiziellen Client-Bibliothek mit den notwendigen Konfigurationswerten angelegt. Die send(msg: String)-Methode erzeugt einen ProducerRecord und sendet ihn zu Kafka, und die flush() Methode wird an die Client-Bibliothek durchgereicht.

Die Consumer-Klasse

class Consumer(kafka: Kafka, topic: String) {
  private val kafkaConsumer: KafkaConsumer<String, String>

  @Volatile
  var keepGoing = true

  init {
    val config = Properties()
    config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers
    config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    config[ConsumerConfig.GROUP_ID_CONFIG] = UUID.randomUUID().toString()
    kafkaConsumer = KafkaConsumer<String, String>(config).apply {
      subscribe(listOf(topic))
    }
  }

  fun consume(handler: (value: String) -> Unit) = Thread(Runnable {
    keepGoing = true
    kafkaConsumer.use { kc ->
      while (keepGoing) {
        kc.poll(500)?.forEach {
          handler(it?.value() ?: "???")
        }
      }
    }
  }).start()

  fun stop() {
  keepGoing = false
  }
}

Auch diese Klasse wird mit der Kafka-Konfiguration und dem Topic initialisiert und erzeugt eine KafkaConsumer-Instanz, die sich beim gewünschten Topic registriert. Die Klasse hat eine Property keepGoing, welche bei Aufruf der Methode consume(handler: (value: String) -> Unit) auf true gesetzt wird. Diese Methode startet einen Thread, in dem das Kafka Topic gepolled wird und in dem für jeden empfangenen Wert die Handler-Methode aufgerufen wird.

Die stop()-Methode wird verwendet, um das keepRunning-Flag auf false zu setzen, was dazu führt, dass das Polling und damit auch der Thread beendet wird.

Beispielanwendungen für diese Klassen

Als Beispiel hier zwei Klassen, welche diesen Code verwenden. Hier wird noch keine DSL verwendet, es handelt sich um einfachen Kotlin Code:

fun main(args: Array) {
  val log: Logger = LoggerFactory.getLogger("de.codecentric.ProducerRunner")

  val kafka = Kafka("localhost:9092")
  val topic = "kt-topic"

  val producer = Producer(kafka, topic)
  (1..10).forEach {
    val msg = "test message $it ${LocalDateTime.now()}"
    log.info("sending $msg")
    producer.send(msg)
  }
  producer.flush()
}

 

fun main(args: Array) {
  val log: Logger = LoggerFactory.getLogger("de.codecentric.ConsumerRunner")

  val kafka = Kafka("localhost:9092")
  val topic = "kt-topic"

  val consumer = Consumer(kafka, topic)
  Runtime.getRuntime().addShutdownHook(Thread(Runnable {
    consumer.stop()
  }))
  consumer.consume {
    log.info("got $it")
  }
}

Schritt 2: der Code für die DSL

Um die DSL zu schreiben, welche die gerade definierten Klassen verwendet, wird der folgende Code benötigt:

class KafkaDSL(bootstrapServers: String) {
  private val kafka = Kafka(bootstrapServers)

  fun producer(topic: String, doProduce: Producer.() -> Unit) =
    Producer(kafka, topic).doProduce()

  fun consumer(topic: String, doConsume: Consumer.() -> Unit) =
    Consumer(kafka, topic).doConsume()
}

fun kafka(bootstrapServers: String, init: KafkaDSL.() -> Unit) =
  KafkaDSL(bootstrapServers).init()

Mehr ist nicht notwendig. Ich werde die Klassen und Funktionen Schritt für Schritt erläutern an Hand des Producer und Consumer Beispiels vom Anfang des Artikels:

Der Producer

kafka("localhost:9092") {
  // Dies ist der Rumpf der KafkaDSL.init()-Funktion.
  // In diesem Block ist 'this' das KafkaDSL-Objekt.
}

Dieser Aufruf der globalen kafka-Funktion erzeugt ein KafkaDSL-Objekt und übergibt die bootstrap Server im Konstruktor. Das KafkaDSL-Objekt verwendet diese Information, um ein Kafka-Konfigurationsobjekt zu erstellen. Das zweite Argument – init – der kafka()-Methode ist eine Kotlin Extension für die KafkaDSL-Klasse, die – im Kotlin-Stil – hinter die kafka()-Klammern geschrieben wird. Diese Funktion wird für das neu erzeugte KafkaDSL-Objekt aufgerufen.

kafka("localhost:9092") {
  // Dies ist der Rumpf der KafkaDSL.init()-Funktion.
  // In diesem Block ist 'this' das KafkaDSL-Objekt.
  producer("kt-topic") {
    // Dies ist der Rumpf der Producer.doProduce()-Funktion.
    // In diesem Block ist 'this' das Producer-Objekt.
  }
}

Die KafkaDSL.init()-Funktion, die für das KafkaDSL-Objekt übergeben wurde, ruft die Methode producer("kt-topic"){} auf. Diese Methode ist in der KafkaDSL-Klasse spezifizert, der erste Parameter ist der Name des Topic und der zweite ist eine Extension-Funktion für die Producer-Klasse. Dieser Aufruf erzeugt ein Producer-Objekt mit den notwendigen Argumenten und ruft dann die Extension-Funktion für dieses Objekt auf.

kafka("localhost:9092") {
  // Dies ist der Rumpf der KafkaDSL.init()-Funktion.
  // In diesem Block ist 'this' das KafkaDSL-Objekt.
  producer("kt-topic") {
    // Dies ist der Rumpf der Producer.doProduce()-Funktion.
    // In diesem Block ist 'this' das Producer-Objekt.
    (1..10).forEach {
      send("test message $it ${LocalDateTime.now()}") // <- Producer.send()
    }
    flush() // <- Producer.flush()
  }
}

Im Rumpf der Producer.doProduce()-Extension-Funktion ist die Schleife implementiert, welche die Nachrichten erzeugt. Diese Nachrichten werden mit den send()– und flush()-Methoden des Producer-Objekts versandt.

Der Consumer

Der Code für den Consumer entspricht dem Code für den producer, ich werde nur die Unterschiede erläutern. Zuerst wird ein KafkaDSL-Objekt erzeugt. Der Unterschied liegt in der Implementierung der KafkaDSL.init()-Funktion:

kafka("localhost:9092") {
  // Dies ist der Rumpf der KafkaDSL.init()-Funktion.
  // In diesem Block ist 'this' das KafkaDSL-Objekt.
  consumer("kt-topic") {
    // Dies ist der Rumpf der Consumer.doConsume()-Funktion.
    // In diesem Block ist 'this' das Consumer-Objekt.
  }
}

Hier wird die consumer("kt-topic")-Methode aufgerufen, die ein Consumer-Objekt erzeugt und dann die Consumer.doConsume()-Extension-Funktion aufruft, welche als zweiter Parameter übergeben wurde.

kafka("localhost:9092") {
  // Dies ist der Rumpf der KafkaDSL.init()-Funktion.
  // In diesem Block ist 'this' das KafkaDSL-Objekt.
  consumer("kt-topic") {
    // Dies ist der Rumpf der Consumer.doConsume()-Funktion.
    // In diesem Block ist 'this' das Consumer-Objekt.
    Runtime.getRuntime().addShutdownHook(Thread(Runnable {
      stop() // <- Consumer.stop()
    }))
    consume { // <- Consumer.consume(handler)
      log.info("got $it")
    }
  }
}

In der Consumer.doConsume()-Funktion wird zuerst der Shutdownhook installiert und dann die Methode consume() des Consumer ausfgerufen. Dieser Methode wird ein Lambda übergeben als Handler, um die aus Kafka gelesenen Wert zur verarbeiten.

Mehr benötigen wir nicht, umn unsere kleine DSL zu schreiben.

Fazit

Zuerst haben wir ein paar Hilfsklassen geschrieben für die gewünschte Funktionalität zum Lesen und Schreiben in und aus Kafka. Und mit nur einer globalen Funktion und einer Hilfsklasse mit zwei Methoden haben wir unsere DSL definiert. Das Beispiel zeigt das Potential, das in Kotlins Sprachkonzepten wie Extension-Funktionen oder Lambda-Argumenten steckt. Weitere Möglichkeiten sind noch Operator-Overloading und Infix-Funktionen, die ich hier nicht verwendet habe, um das Beispiel nicht zu komplex zu machen.
Für Java-Entwickler sind diese Spracheigenschaften zuerst ungewohnt, aber es lohnt sich, sich diese genauer anzuschauen.

Ich hoffe, euch hat dieser Ausflug in die Welt von Kotlin gefallen.

Peter-Josef Meisch

P.J. schreibt Software seit er 1980 zum ersten Mal einen Computer in die Hände bekommen hat. Er entwickelt hauptsächlich in Java, ist aber immer offen für neue Sprachen und Technologien.

Weitere Inhalte zu Kotlin

Kommentieren

Deine E-Mail-Adresse wird nicht veröffentlicht. Erforderliche Felder sind mit * markiert.