Overview

Crossing the Streams – Joins in Apache Kafka

No Comments

Version 0.10.0 of the popular message broker Apache Kafka saw the introduction of Kafka Streams. In its initial release, the Streams-API enabled stateful and stateless Kafka-to-Kafka message processing using concepts such as map, flatMap, filter or groupBy that many developers are familiar with these days. In Kafka 0.10.1, it started to support “Interactive Queries”, an API that allows querying stateful stream transformations without going through another Kafka topic.

In this article, we will talk about a specific kind of streaming operation – the joining of streams. We will begin with a brief walkthrough of some core concepts. Then we will take a look at the kinds of joins that the Streams API permits. Following that, we’ll walk through each possible join by looking at the output of an established example. At the end, you should be aware of what kinds of joins are possible in Kafka Streams and where the caveats lie.

A brief introduction to some core concepts

Kafka is a distributed pub-sub message broker where producers send messages – key-value pairs – to topics which in turn are polled and read by consumers. Each topic is partitioned and the partitions are distributed among brokers. The excellent Kafka documentation explains it best.

There are two main abstractions in the Streams API. A KStream is a stream of key-value pairs – basically a close model of a Kafka topic. The records in a KStream either come directly from a topic or have gone through some kind of transformation – it has for example a filter-method that takes a predicate and returns another KStream that only contains those elements that satisfy the predicate. KStreams are stateless, but they allow for aggregation by turning them into the other core abstraction – KTable – which is often describe as “changelog stream”.

A KTable statefully holds the latest value for a given message key and reacts automatically to newly incoming messages.
A nice example is perhaps counting visits to a website by unique IP addresses. Let’s assume we have a Kafka topic containing messages of the following type: (key=IP, value=timestamp). A KStream contains all visits by all IPs, even if the IP is returning. A count on such a KStream sums up all visits to a site including duplicates. A KTable on the other hand only contains the latest message and a count on the KTable represents the number of distinct IP addresses that visited the site.

KTables and KStreams can also be windowed. Regarding the example, this means we could add a time dimensions to our stateful operations. To enable windowing, Kafka 0.10 changed the Kafka message format to include a timestamp. This timestamp can either be CreateTime or AppendTime. CreateTime is set by the producer and can be be set manually or automatically. AppendTime is the time a message is appended to the log by the broker. Next up: joins.

Joins

Taking a leaf out of SQLs book, Kafka Streams supports three kinds of joins:

  • Inner Joins

    • Emits an output when both input sources have records with the same key.
  • Outer Joins

    • Emits an output for each record in either input source. If only one source contains a key, the other is null
  • Left Joins

    • Emits an output for each record in the left or primary input source. If the other source does not have a value for a given key, it is set to null
  • Another important aspect to consider are the input types. The following table shows which operations are permitted between KStreams and KTables:

    Primary TypeSecondary TypeInner JoinOuter JoinLeft Join
    KStreamKStreamAllowedAllowedAllowed
    KTableKTableAllowedAllowedAllowed
    KStreamKTableNot Allowed (Coming 0.10.2)AllowedNot Allowed
    KTableKTableNot AllowedNot AllowedNot Allowed

    As the table shows, all joins are permitted between equal types. The only permitted inter-type join is a left join between a KStream and a KTable. A possible use case for this join is the matching of incoming streaming data to a KTable that contains some kind of master data, like matching an incoming user id to a more detailed profile.

    In total there are seven possible join types. Let’s look at them in detail. Disclaimer: the table of possible joins and the join semantics are only valid for Kafka 0.10.0 and 0.10.1. They will be improved in Kafka 0.10.2 (see KIP-77).

    Example

    We are going to use a consistent example to demonstrate the differences in the joins. It is based on the online advertising domain. There is one Kafka topic that contains view events of particular ads and another one that contains click events based on those ads. Views and click share an ID that serves as the key in both topics.

    In the examples, custom set event times provide a convenient way to simulate the timing within the streams. We will look at the following 7 scenarios with ids 0 to 6:

    1. a click event arrives 1000 ms after the view
    2. a click event arrives 10,000 ms after the view
    3. a view event arrives 1000 ms after the view
    4. there is a view event but no click
    5. there is click event but no view event
    6. there are two view events at distinct times, and a click event 1000 ms after the first view. The view events are denoted as 5.1 and 5.2
    7. there is a view event followed by a click event after 500 ms and another one after 1200 ms. The clicks are denoted as 6.1 and 6.2 in the following.

    This visualization shows these streams:

    Inner Join KStream-KStream

    All KStream-KStream joins are windowed, so the developer has to specify how long that window should be and if the order of the elements in the streams matters. The rationale behind that forced windowing is this: a KStream is stateless. To execute a join with acceptable performance, some internal state needs to be kept – otherwise the whole streams would need to be scanned each time a new element arrives. That state contains all elements of the stream within the time window (possible duplicates included). We will use a window of 5000 milliseconds in the following examples.

    An inner join on two streams yields a result if a key appears in the both streams within the window. Applied to the example, this produces the following results:

    Scenarios 0 and 2 appear as expected as the key appears in both streams within 5000 ms, even though they come in different order. Scenario 1 is missing as view and click did not appear within the window. Scenarios 3 and 4 are missing because they do not appear in both streams. Scenarios 5 and 6 appear duplicated as the keys appear twice in the view stream for scenario 5 and in the click stream for scenario 6.

    A variation of this the enforcement of ordering. The developer can specify that a click event can only be joined if it occurs after a view event. This setting would lead to the elimination of scenario 2 in the example.

    Outer Join KStream-KStream

    The following will assume the idealistic ordering that each event is processed in the order of its timestamp. In practice, this is not always guaranteed.

    An outer join will emit an output each time an event is processed in either stream. If the window state already contains an element with the same key in the other stream, it will apply the join method to both elements. If not, it will only apply the incoming element.

    The following shows the idealistic result:

    For scenario 0, an event is emitted once the view is processed. There is not click yet. When the click arrives, the joined event on view and click is emitted. For case 1, we also get two output events. However, since the events do not occur within the window, neither of these events contains both view and click. Scenario 3 appears in the output without a click, and the equivalent output is emitted for scenario 4. Scenario 5 produces 4 output events as there are two views that are emitted immediately and once again when they are joined against a click. Case 6 only produces 3 events as both clicks can be immediately joined against a view that arrived earlier.

    What happens in real life? Hard to say because the timing is important. In tests, the clicks for scenario 0 were always processed earlier than the views by the streaming application, despite using CreateTime for the message timestamps. This is probably what you should take away from this paragraph: as of Kafka 0.10.1.x, outer joins work on processing time, not message time.

    Left Join KStream-KStream

    This type of join is processing time dependent as well and has some behaviour that you might not expect.
    The left join emits an output event every time an event arrives in the left stream. If an event with the same key has previously arrived in the right stream, it is joined with the one in the primary stream. Otherwise it is set to null. With our examples, that is going to result in a bit of a sad picture as we only have one event where the click arrives before the view and thus has already been processed. This leads to the following result:

    Only scenario 2 yields a complete result in the idealistic version, but as expected from a left join, all elements from the left stream show up. This is one of the semantics that will change with Kafka 0.10.2 where “right” messages arriving within the window will cause the emission of a joined event.

    Inner Join KTable-KTable

    Now we’re switching from KStreams to KTables. KTables are represented by materializing the incoming data into a local state store, building a table that always contains the latest update for a key. When a new record arrives, it is joined with the other table’s state store. Joins on KTables are not windowed as KTables describe a state, not a stream. The emitted events show state changes within the table.

    It is not easy to figure out what happens here exactly. The following chart represents observations – please correct me if I’m wrong.

    The good news is that all the inner join pairs are emitted. Since we’re no longer windowed, even scenario 1 is represented. The empty brackets represent scenarios 3 and 4 – the output is more or less a notification that the state for those cases has been updated, but as they could not be joined, the new state is empty/null. The KTables take some time to materialize and though the chart suggests that they appear immediately after the last event, it was significant in the tests. This is due to the fact that the output changelog of a KTable is updated in batches. In our case, all data has been processed before such an update happens, so we’re missing events 5.1 and 6.2 from the join output. Another curious thing is that events are duplicated. According to KAFKA-4609 this is caused by the fact that both source tables are flushed individually and therefore both trigger the join. This seems to fit as the first set of emissions contains the empty update for scenario 3 whereas the second set contains scenario 4.

    The lesson to be learned seems to be that some batching and duplication needs to be accounted for.

    Outer Join KTable-KTable

    Apart from the duplication that occurs in the outer join as well, it behaves pretty much as expected:

    The results are the same as with the inner join with the addition of proper data for scenarios 3 and 4 which for scenario 3 contains the lone view and for scenario 4 the lone click.

    Left Join KTable-KTable

    Left joins exhibit the same duplication behaviour. Apart from that, they also work the way you’d expect by now:

    So especially for our scenarios 3 and 4, we get a click element for case 3 and an empty/null event for scenario 4. The rest of the data is joined completely.

    Left Join KStream-KTable

    As of Kafka <= 0.10.1.x, this is the only type of stream/table join. Its semantics imply that an incoming event in a stream can be joined against a table. The output of this operation is another stream (and not a table!). The output of the stream is highly timing dependent – the example with the views and clicks does not really work well as you’d probably not use a stream-table join for this – a click in adserving usually arrives well after the view. But in any case, please consider this contrieved example:

    We’re simulating the output of an incoming stream joined against a table of clicks that contains events 0, 2, 4, 5 and 6.1. Each incoming stream event is joined against that table and the result either contains both view and click or if there is no click only the view. There is no issue with duplication as the result is a stream and not a table.

    Partitioning and Parallelization

    If you are familiar with Kafka consumers, you are probably aware of the concept of the consumer groups – Kafka consumption is parallelized by assigning partitions to exactly one consumer in a group of consumers that share the same group id. If you’re using defaults, Kafka itself will handle the distribution and assign the partitions to consumers. This happens in a way that the consumers have little influence over.
    With simple consumers, this is quite straightforward. However, what does it mean for a Kafka stream with state and joins? Can partitions still be randomly distributed? No, they cannot. While you can run multiple instances of your streaming application and partitions will be distributed among them, there are requirements that will be checked at startup. Topics that are joined need to be copartitioned. That means that they need to have the same number of partitions. The streaming application will fail if this is not the case. Producers to the topic also need to use the same partitioner although that is something that cannot be verified by the streaming application as the partitioner is a property of the producer. For example, you will not get any join results if you send view event 0 to partition 0 and the corresponding click event to partition 1 even if both partitions are handled by the same instance of the streaming application.

    Summary and Outlook

    Kafka Streams is a very interesting API that can handle quite a few use cases in a scalable way. However, some join semantics are a bit weird and might be surprising to developers. An example of this is left and outer join on streams depending on the processing time of the events instead of the event time. Some of those issues are addressed by KIP-77 and are scheduled to be released with Kafka 0.10.2.
    The duplicates produced by KTable joins look weird, but they can provide a lot of benefit when used with the “Interactive Query” feature introduced with Kafka 0.10.1 where state stores can be directly queried.

    Kafka’s journey from Pub/Sub broker to distributed streaming platform is well underway and times are very exciting.

    References

Florian Troßbach

Florian Troßbach has his roots in classic Java Enterprise development. After a brief detour in the world of classic SAP, he joined codecentric as an IT Consultant and focusses on Fast Data and the SMACK stack.

Share on FacebookGoogle+Share on LinkedInTweet about this on TwitterShare on RedditDigg thisShare on StumbleUpon

Comment

Your email address will not be published. Required fields are marked *