How to write a Kotlin DSL – e.g. for Apache Kafka

No Comments

The Kotlin language is gaining more and more attention and is being used in an increasing number of projects. One thing that Kotlin can be used for is implementing special domain-specific-languages (DSLs). The Wikipedia entry on DSL states:

A domain-specific language (DSL) is a computer language specialized to a particular application domain.

In this post I will implement a minimal DSL for accessing Apache Kafka which uses keywords like kafka, producer, consumer. It also defines a nesting structure that models the relationship between these concepts.

The purpose of this blog post is to show how to create a custom DSL with Kotlin. The DSL created will only have minimal functionality. The messages that are written to the Kafka topics are just plain strings. The focus is on creating the DSL, not using Apache Kafka.

The complete code for this sample project is available on GitHub.

The target DSL

The following code pieces show the DSL in action. The first example shows a producer application that writes strings into Kafka. The second example is a consumer application that reads them from Kafka.

I first show the final code that results in using the DSL before explaining how to get there:

The producer

fun main(args: Array) {
    val log: Logger = LoggerFactory.getLogger("de.codecentric.ProducerRunnerDSL")
    
    kafka("localhost:9092") {
        producer("kt-topic") {
            (1..10).forEach {
                val msg = "test message $it ${LocalDateTime.now()}"
                log.info("sending $msg")
                send(msg)
            }
            flush()
        }
    }
}

In this code I first use the kafka("localhost:9092") function to define the connection to the Kafka broker that is running on the local machine. Within this context I then create a producer for the topic with producer("kt-topic").
In the context of the producer there is a loop that produces 10 messages. These messages are sent to Kafka by calling the producer’s send(msg) method. After the loop, the flush() method of the producer is called to make sure that the messages are delivered to Kafka before the program terminates.

The consumer

fun main(args: Array) {
  val log: Logger = LoggerFactory.getLogger("de.codecentric.ConsumerRunnerDSL")

  kafka("localhost:9092") {
    consumer("kt-topic") {
      Runtime.getRuntime().addShutdownHook(Thread(Runnable {
        stop()
      }))
      consume {
        log.info("got $it")
      }
    }
  }
}

Like the producer, the consumer application first defines the connection the Kafka broker. It then creates a consumer for the topic with consumer("kt-topic"). In this consumer, a shutdownhook is added which will call the consumer’s stop() method to ensure that the underlying KafkaConsumer is shut down properly on program termination.
Then the consumer’s consume method is called with a lambda function that processes the String which is retrieved from the Kafka topic by logging its value. This consume method blocks on reading the data from Kafka and will only return on an error or when the stop() method has been called from a different thread, for example on program shutdown.

Prerequesites

Apache Kafka

To get the programs running, it is necessary to have a running Apache Kafka. I used the Confluent Platform OpenSource to test my implementation, any other Apache Kafka setup is fine as well, as long as you have the address of a Kafka broker.

Kotlin concepts

There are two Kotlin language features that are essential for the writing of a DSL like I show it in this article:

lambda arguments to functions:
When a function func has another function as last argument and in the call to this function this argument is passed in as a lambda, then the argument can be written after the function argument call (documentation):

// func gets a String as first and a function from String to String as second argument.
// it is calling the second argument with the first as parameter and prints the result
fun func(arg1: String, calledWithArg1: (String) -> String) {
  println(calledWithArg1(arg1))
}

// this might be called like this (don't do it this way!):
func("foo", { s: String -> s + s })
// prints foofoo

//the Kotlin way, moved out of the parenthesis and using the its default parameter:
func("foo") {
  it + it
}
// prints foofoo as well

extension functions:
Extension functions offer the possibility to add functionality to existing classes. They are not implemented in the class itself (documentation) – the example is a bit off-topic here

class PaymentInfo(var amount: Double, var creditCard: String)

val p = PaymentInfo(42.0, "1234-5678-0123-1234")

// extension function to double the price, not defined within the class
fun PaymentInfo.twice() {
  // here 'this' is the PaymentInfo object
  amount *= 2
}

// call the extension function for an instance of class PaymentInfo
p.twice()
// now the amount is 84.0

// a function to modify a PaymentInfo with an extension passed in as a lambda
fun modifyPayment(payment: PaymentInfo, modifier: PaymentInfo.() -> Unit) {
  payment.modifier()
}

// mask the credit card
modifyPayment(p) {
  // here 'this' is the PaymentInfo object
  creditCard = "XXX"
}
// now the creditcard value in the object is masked

We can use extension functions to either add functionality to some classes which cannot be modified (like for example String or List).
The other main purpose is that by using an extension function, a lambda can be passed to a function which is then executed in the context of the object – like in the example when the credit card number is masked. The modifyPayment() call in the example again shows the Kotlin way of moving the lambda out of the parenthesis.

Step 1: internal classes used to access Apache Kafka

In a first step, we create some Kotlin classes to write to and read from Kafka. These classes use the official Kafka client library written in Java. The classes make up a small Kotlin layer around the Kafka client library.

Kafka configuration class

data class Kafka(val bootstrapServers: String)

This first very simple class just stores the configuration for the connection to the Kafka broker. I kept the configurable properties to the absolute minimum to keep the example small.

The Producer class

class Producer(kafka: Kafka, private val topic: String) {
  private val kafkaProducer: KafkaProducer<String, String>

  init {
    val config = Properties()
    config[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers
    config[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
    config[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
    kafkaProducer = KafkaProducer(config)
  }

  fun send(msg: String) {
    kafkaProducer.send(ProducerRecord(topic, msg))
  }

  fun flush() = kafkaProducer.flush()
}

The Producer class is initialized with a Kafka configuration object and the topic name. In the init block, a KafkaProducer from the official client library is instantiated with the necessary values.
The send(msg: String) method creates a ProducerRecord and sends it to Kafka, and the flush() is just passed on to the client library.

The Consumer class

class Consumer(kafka: Kafka, topic: String) {
  private val kafkaConsumer: KafkaConsumer<String, String>

  @Volatile
  var keepGoing = true

  init {
    val config = Properties()
    config[ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG] = kafka.bootstrapServers
    config[ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    config[ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG] = StringDeserializer::class.java
    config[ConsumerConfig.GROUP_ID_CONFIG] = UUID.randomUUID().toString()
    kafkaConsumer = KafkaConsumer<String, String>(config).apply {
      subscribe(listOf(topic))
    }
  }

  fun consume(handler: (value: String) -> Unit) = Thread(Runnable {
    keepGoing = true
    kafkaConsumer.use { kc ->
      while (keepGoing) {
        kc.poll(500)?.forEach {
          handler(it?.value() ?: "???")
        }
      }
    }
  }).start()

  fun stop() {
  keepGoing = false
  }
}

This class as well is initialized with the Kafka configuration and topic and creates a KafkaConsumer instance that subscribes to the requested topic. The class has a property named keepGoing which is set to true when its consume(handler: (value: String) -> Unit) method is called. This method starts a new thread that polls the Kafka topic and calls the handler method for the retrieved values.
The stop() method is used to set the keepRunning flag to false which stops the polling loop and ends the thread.

Applications using these classes

Here are two applications that use these classes to produce and consume data without using the custom DSL; this is just plain Kotlin code:

fun main(args: Array) {
  val log: Logger = LoggerFactory.getLogger("de.codecentric.ProducerRunner")

  val kafka = Kafka("localhost:9092")
  val topic = "kt-topic"

  val producer = Producer(kafka, topic)
  (1..10).forEach {
    val msg = "test message $it ${LocalDateTime.now()}"
    log.info("sending $msg")
    producer.send(msg)
  }
  producer.flush()
}

 

fun main(args: Array) {
  val log: Logger = LoggerFactory.getLogger("de.codecentric.ConsumerRunner")

  val kafka = Kafka("localhost:9092")
  val topic = "kt-topic"

  val consumer = Consumer(kafka, topic)
  Runtime.getRuntime().addShutdownHook(Thread(Runnable {
    consumer.stop()
  }))
  consumer.consume {
    log.info("got $it")
  }
}

Step 2: The code defining the DSL

In order to create the DSL using these classes the following code is required:

class KafkaDSL(bootstrapServers: String) {
  private val kafka = Kafka(bootstrapServers)

  fun producer(topic: String, doProduce: Producer.() -> Unit) =
    Producer(kafka, topic).doProduce()

  fun consumer(topic: String, doConsume: Consumer.() -> Unit) =
    Consumer(kafka, topic).doConsume()
}

fun kafka(bootstrapServers: String, init: KafkaDSL.() -> Unit) =
  KafkaDSL(bootstrapServers).init()

This is all that is necessary to create our DSL. I will explain these classes and methods step by step for both our producer and consumer example.

The producer

kafka("localhost:9092") {
  // this is the body of the KafkaDSL.init() function.
  // In this block, 'this' is the KafkaDSL object.
}

This call to the global kafka function creates a KafkaDSL object and passes the bootstrap servers in the constructor. The KafkaDSL object uses this information to create a Kafka configuration object. The second argument – init – to the kafka() function is a Kotlin extension function for the KafkaDSL class which – in Kotlin style – is written after the kafka() parenthesis. This function is called for the newly created KafkaDSL object after creation.

kafka("localhost:9092") {
  // this is the body of the KafkaDSL.init() function.
  // In this block, 'this' is the KafkaDSL object.
  producer("kt-topic") {
    // this is the body of the Producer.doProduce() function.
    // In this block, 'this' is the Producer object
  }
}

The KafkaDSL.init() function, which has been passed to the KafkaDSL object, calls the method producer("kt-topic"){}. This is a call to the corresponding method of the KafkaDSL class, the first parameter being the topic name and the second being an extension function for the Producer class. This call creates a Producer object with the necessary arguments and then calls the extension function on this Producer object.

kafka("localhost:9092") {
  // this is the body of the KafkaDSL.init() function.
  // In this block, 'this' is the KafkaDSL object.
  producer("kt-topic") {
    // this is the body of the Producer.doProduce() function.
    // In this block, 'this' is the Producer object
    (1..10).forEach {
      send("test message $it ${LocalDateTime.now()}") // <- Producer.send()
    }
    flush() // <- Producer.flush()
  }
}

In the body of the Producer.doProduce() extension function, the loop creating the messages is implemented and the messages are sent with the send() and flush() methods of the Producer object.

The consumer

The code for the consumer is similar to the producer code, so I will just explain the differences. First a KafkaDSL object is created. The difference lies in the implementation of the KafkaDSL.init() function:

kafka("localhost:9092") {
  // this is the body of the KafkaDSL.init() function.
  // In this block, 'this' is the KafkaDSL object.
  consumer("kt-topic") {
    // this is the body of the Consumer.doConsume() function.
    // In this block, 'this' is the Consumer object
  }
}

Here the consumer("kt-topic") method is called which creates a Consumer object and then calls the Consumer.doConsume() extension function which is passed in as second argument.

kafka("localhost:9092") {
  // this is the body of the KafkaDSL.init() function.
  // In this block, 'this' is the KafkaDSL object.
  consumer("kt-topic") {
    // this is the body of the Consumer.doConsume() function.
    // In this block, 'this' is the Consumer object
    Runtime.getRuntime().addShutdownHook(Thread(Runnable {
      stop() // <- Consumer.stop()
    }))
    consume { // <- Consumer.consume(handler)
      log.info("got $it")
    }
  }
}

The body of the Consumer.doConsume() function first installs the shutdownhook and then calls the Consumer‘s consume() method passing in a lambda as handler to process the values read from Kafka.

And this is all that we need to create our small custom DSL.

Conclusion

First we wrote some helper classes in Kotlin to have the desired functionality to write to and read from Apache Kafka. And with only one global method and a helper class with two methods, we implemented our DSL for that. This example shows the potential that lies in Kotlin’s language constructs like extension functions or lambda arguments to functions. Even more possibilities are in using operator overloading or infix functions. I refrained from using them here to keep the example small.
Coming from Java, these language features seem strange in the first place, but it’s definitely worth working through this stuff.

Hope you enjoyed this excursion into the world of Kotlin.

Peter-Josef Meisch

P.J. is writing software since he first got his hands on a computer in 1980. Mostly developing in Java and Kotlin, but always open for new languages and technologies.

Comment

Your email address will not be published. Required fields are marked *