Guillaume Hérail

Ramblings in Frenglish

Kafka crashed __consumer_offsets compaction

Posted on — Jan 11, 2021

Disclaimer: this post is about an ancient (0.10.0.1) version of Kafka and the bugs described have since been fixed.


I’ve got a (legacy) kafka cluster that is still being used, pending an upgrade that takes a bit more time than expected. This kafka cluster is still used for many workloads, we have plenty of experience with it and it runs flawlessly. Or so that’s what we thought.

At some point, we lost a broker. This caused the cluster to reassign partition leaderships and consumer groups to reconnect. Most of them were fine but a small group was taking hours to reconnect without us knowing why. This small group was, coincidentally, part of the same product and so we deemed it to be a bug in their kafka client.

Example of a problematic consumer:

INFO [afka-consumer-1] o.a.k.c.c.internals.ConsumerCoordinator  : Revoking previously assigned partitions [] for group cgA
INFO [afka-consumer-1] o.s.k.l.KafkaMessageListenerContainer    : partitions revoked:[]
INFO [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : (Re-)joining group cgA
INFO [afka-consumer-1] o.a.k.c.c.internals.AbstractCoordinator  : Marking the coordinator kafka1.xxx:9092 (id: 2130938402 rack: null) dead for group cgA

These log lines would be written constantly to the application logs.

Fast forward 1 month and a half later, we lose another broker. Same issue this time, consumer groups take an even longer time to reconnect and the team in charge of the impacted product couldn’t find an issue with their implementation. To understand that issue, we need to review how compaction and more specifically, the __consumer_offsets topic works.

How the __consumer_offset topic works

The __consumer_offsets topic is a compacted topic. It means that it will only keep 1 version of a specific key over 2 segments. Let’s imagine we have 3 consumer groups: cgA, cgB and cgC. These consumer groups are committing offsets to, to keep it simple, 100 partitions each.

Everytime a consumer groups commits its offsets, 100 offsets will be sent to Kafka and written in the current segment in __consumer_offsets. As such, Kafka only need these 100 offsets and can discard the previous ones, this is what compaction is in charge of. Periodically, the LogCleaner thread will scan compacted topics for eligible segments to compact and compact them to make sure we keep the smallest amount of data in that topic.

Because of that compaction, offsets for a consumer group need to end up in the same partition. Kafka does that by hashing the consumer group and use that to always end in the same partition. This means that offsets for cgA might end up in partition 8 of __consumer_offsets while cgB is in partition 12 and cgC in 25.

What if LogCleaner doesn’t work anymore?

And this is exactly the issue we had. If LogCleaner doesn’t work anymore, our __consumer_offsets topic will grow indefinitely until we fix it. The more this topic grows, the longer it will take a consumer group to rejoin the cluster when it needs to, as Kafka needs to replay the corresponding partition to get its current state.

In 0.10.0.1, there was a bug (KAKFA-5413, fixed in 0.10.2.2) that made the LogCleaner thread die when an offset larger than the the last offset in a partition was marked as the last compacted offset. In our case it manifested like:

java.lang.IllegalArgumentException: requirement failed: Last clean offset is xxx but segment base offset is yyy for log __consumer_offsets-168

With that, the LogCleaner thread was dying but kafka kept running. This is how we ended up with multiple TB of __consumer_offsets logs to compact.

What metrics to use to spot the issue?

If you are using jmx_exporter, chances are that you have the following metric: kafka_log_size. If so, you can do:

sum by (topic, instance) (kafka_log_size{topic="__comsumer_offsets"})

If the trend is increasing steadily, you are affected.

How does it happen

This bug apparently happens when kafka is shut down unproperly, in some cases, it will cause it to write invalid offsets in its map. For instance, if you had OutOfMemory exception, it might have led to this behaviour.

The fix

Kakfa keeps a small file in its log directory where it writes its current compacted position on each partition. The file is named cleaner-offset-checkpoint. This is what we did to recover the brokers, one at a time:

  1. Check that the number of Under Replicated Partitions == 0
  2. Stop kafka on the broker
  3. rm /path/to/your/log/directory/cleaner-offset-checkpoint
  4. Start kafka on the broker
  5. Monitor the size of your kafka log size and see it get back to an acceptable value before continuing

Depending on the size, it’ll take a while for your brokers to process their compactions.

Learnings

Monitoring the size of the __consumer_offsets logs is important, we now have an alert on kafka_log_size being too big compared to what we expect it to be.

This is day 9/100 of #100DaysToOffLoad!