Version 0.10.0 of the popular distributed streaming platform Apache Kafka saw the introduction of Kafka Streams. In its initial release, the Streams-API enabled stateful and stateless Kafka-to-Kafka message processing using concepts such as map, flatMap, filter or groupBy that many developers are familiar with these days. In Kafka 0.10.1, it started to support “Interactive Queries”, an API that allows querying stateful stream transformations without going through another Kafka topic.
In this article, we will talk about a specific kind of streaming operation – the joining of streams. We will begin with a brief walkthrough of some core concepts. Then we will take a look at the kinds of joins that the Streams API permits. Following that, we’ll walk through each possible join by looking at the output of an established example. At the end, you should be aware of what kinds of joins are possible in Kafka Streams and where the caveats lie.
A brief introduction to some core concepts
The central component of Kafka is a distributed pub-sub message broker where producers send messages – key-value pairs – to topics which in turn are polled and read by consumers. Each topic is partitioned and the partitions are distributed among brokers. The excellent Kafka documentation explains it best.
There are two main abstractions in the Streams API. A KStream is a stream of key-value pairs – basically a close model of a Kafka topic. The records in a KStream either come directly from a topic or have gone through some kind of transformation – it has for example a filter-method that takes a predicate and returns another KStream that only contains those elements that satisfy the predicate. KStreams are stateless, but they allow for aggregation by turning them into the other core abstraction – KTable – which is often describe as “changelog stream”.
A KTable statefully holds the latest value for a given message key and reacts automatically to newly incoming messages.
A nice example is perhaps counting visits to a website by unique IP addresses. Let’s assume we have a Kafka topic containing messages of the following type: (key=IP, value=timestamp). A KStream contains all visits by all IPs, even if the IP is recurring. A count on such a KStream sums up all visits to a site including duplicates. A KTable on the other hand only contains the latest message and a count on the KTable represents the number of distinct IP addresses that visited the site.
KTables and KStreams can also be windowed. Regarding the example, this means we could add a time dimensions to our stateful operations. To enable windowing, Kafka 0.10 changed the Kafka message format to include a timestamp. This timestamp can either be CreateTime or AppendTime. CreateTime is set by the producer and can be be set manually or automatically. AppendTime is the time a message is appended to the log by the broker. Next up: joins.
Taking a leaf out of SQLs book, Kafka Streams supports three kinds of joins:
- Inner Joins
- Emits an output when both input sources have records with the same key.
- Outer Joins
- Emits an output for each record in either input source. If only one source contains a key, the other is null
- Left Joins
- Emits an output for each record in the left or primary input source. If the other source does not have a value for a given key, it is set to null
Another important aspect to consider are the input types. The following table shows which operations are permitted between KStreams and KTables:
|Primary Type||Secondary Type||Inner Join||Outer Join||Left Join|
|KStream||KTable||Not Allowed (Coming 0.10.2)||Allowed||Not Allowed|