Messages in Apache Kafka are appended to (partitions of) a topic. Topics have a partition count, a replication factor and various other configuration values. Why do those matter and what could possibly go wrong?
Why does Kafka topic configuration matter?
There are three main parts that define the configuration of a Kafka topic:
- Partition count
- Replication factor
- Technical configuration
The partition count defines the level of parallelism of the topic. For example, a partition count of 50 means that up to 50 consumer instances in a consumer group can process messages in parallel. The replication factor specifes how many copies of a partition are held in the cluster to enable failover in case of broker failure. And in the technical configuration, one can define the cleanup policy (deletion or log compaction), flushing of data to disk, maximum message size, permitting unclean leader elections and so on. For a complete list, see https://kafka.apache.org/documentation/#topicconfigs. Some of these properties are quite easy to change at runtime. For others this is a lot harder, though.
Let’s take the partition count. Increasing it upwards is easy – just run
bin/kafka-topics.sh --alter --zookeeper zk:2181 --topic mytopic --partitions 42
This might be sufficient for you. Or it might open the fiery gates of hell and break your application. The latter is the case if you depend on all messages for a given key landing on the same partition (to be handled by the same consumer in a group) or for example if you run a Kafka Streams application. If that application uses joins, the involved topics need to be copartitioned, meaning that they need to have the same partition count (and producers using the same partitioner, but that is hard to enforce). Even without joins, you don’t want messages with the same key end up in different KTables.
Changing the replication factor is serious business. It is not a case of simply saying “please increase the replication factor to x” as it is with the partition count. You need to completely reassign partitions to brokers, specifying the preferred leader and n replicas for each partition. It is your task to distribute those well across your cluster. This is no fun for anyone involved. Practical experience with this has actually led to this blog post.
The technical configuration has an impact as well. It could be for example quite essential that a topic is using compaction instead of deletion if an application depends on that. You also might find the retention time too small or too big.
The Evils of Automatic Topic Creation
In a recent project, a central team managed the Kafka cluster. This team kept a lot of default values in the broker configuration. This is mostly sensible as Kafka comes with pretty good defaults. However, one thing they kept was
auto.create.topics.enable=true. This property means that whenever a client tries to write to or read from a non-existing topic, Kafka will automatically create it. Defaults for partition count and replication factor were kept at 1.
This led to the situation where the team forgot to set up a new topic manually before running producers and consumers. Kafka created that topic with default configuration. Once this was noticed, all applications were stopped and the topic deleted – only to be created again automatically seconds later, presumably because the team didn’t find all clients. “Ok”, they thought, “let’s fix it manually”. They increased the partition count to 32, only to realize that they had to provide the complete partition assignment map to fix the replication factor. Even with tool support from Kafka Manager, this didn’t give the team members a great feeling. Luckily, this was only a development cluster, so nothing really bad happened. But it was easy to conceive that this could also happen in production as there are no safeguards.
Another danger of automatic topic creation is the sensitivity to typos. Let’s face it – sometimes we all suffer from butterfingers. Even if you took all necessary care to correctly create a topic called “parameters”, you might end up with something like
Automatic topic creating means that your producer thinks everything is fine, and you’ll scratch your head as to why your consumers don’t receive any data.
Another conceivable issue is that a developer that maybe is not yet that familiar with the Producer API might confuse the String parameters in the send method
So while our developer meant to assign a random value to the message key, he accidentally set a random topic name. Every time a message is produced, Kafka creates a new topic.
So why don’t we just switch automatic topic creation off? Well, if you can: do it. Do it now! Sadly, the team didn’t have that option. But an idea was born – what would be the easiest way to at least fail fast at application startup when something is different than expected?
How to automatically check your topic configuration
In older versions of Kafka, we basically used the code called by the
kafka-topics.sh script to programmatically work with topics. To create a topic for example we looked at how to use
kafka.admin.CreateTopicCommand. This was definitely better than writing straight to Zookeeper because there is no need to replicate the logic of “which ZNode goes where”, but it always felt like a hack. And of course we got a dependency on the Kafka broker in our code – definitely not great.
Kafka 0.11 implemented KIP-117, thus providing a new type of Kafka client –
org.apache.kafka.clients.admin.AdminClient. This client enables users to programmatically execute admin tasks without relying on those old internal classes or even Zookeeper – all Zookeeper tasks are executed by brokers.
With AdminClient, it’s fairly easy to query the cluster for the current configuration of a topic. For example, this is the code to find out if a topic exists and what its partition count and replication factor is:
DescribeTopicsResult contains all the info required to find out if the topic exists and how partition count and replication factor are set. It’s asynchronous, so be prepared to work with Futures to get your info.
Getting configs like
cleanup.policy works similarly, but uses a different method:
Under the hood there is the same Future-based mechanism.
A first implementation attempt
If you are in a situation where your application depends on a certain configuration for the Kafka topics you use, it might make sense to fail early when something is not right. You get instant feedback and have a chance to fix the problem. Or you might at least want to emit a warning in your log. In any case, as nice as the AdminClient is, this check is not something you should have to implement yourself in every project.
Thus, the idea for a small library was born. And since naming things is hard, it’s called “Club Topicana”.
With Club Topicana, you can check your topic configuration every time you create a Kafka Producer, Consumer or Streams client.
Expectations can be expressed programmatically or configuratively. Programmatically, it uses a builder:
This basically says “I expect the topic test_topic to exist. It should also have 32 partitions and a replication factor of 3. I also expect the cleanup policy to be delete. Kafka should retain messages for at least 30 seconds.”
Another option to specify an expected configuration is YAML (parser is included):
What do you do with those expectations? The library provides factories for all Kafka clients that mirror their public constructors and additionally expects a collection of expected topic configurations. For example, creating a producer can look like this:
The last line throws a MismatchedTopicConfigException if the actual configuration does not meet expectations. The message of that exception lists the differences. It also provides access to the computed result so users can react to it in any way they want.
The code for consumers and streams clients looks similar. Examples are available on GitHub. If all standard clients are created using Club Topicana, an exception will prevent creation of a client and thus auto creation of a topic. Even if auto creation is disabled, it might be valuable to ensure that topics have the correct configuration.
There is also a Spring client. The
@EnableClubTopicana annotation triggers Club Topicana to read YAML configuration and execute the checks. You can configure if you want to just log any mismatches or if you want to let the creation of the application context fail.
Club Topicana will not notice when someone changes the configuration of a topic after your application has successfully started. It also of course cannot guard against other clients doing whatever on Kafka.
The configuration of your Kafka topics is an essential part of running your Kafka applications. Wrong partition count? You might not get the parallelism you need or your streams application might not even start. Wrong replication factor? Data loss is a real possibility. Wrong cleanup policy? You might lose messages that you depend on later. Sometimes, your topics might be auto-generated and come with bad defaults that you have to fix manually. With the AdminClient introduced in Kafka 0.11, it’s simple to write a library that compares actual and desired topic configurations at application startup.