Looking to deploy on prem? Check out the demos here

3 Reasons counting records on a Kafka topic is hard

Counting records on a Kafka topic is difficult due to log compaction removing messages, aborted transactions leaving ignored offsets, and unreliable timestamp ordering misaligning messages with their offsets.

September 4, 2024
7 Minutes

Intro

It seems like such a simple task, you want to know how many individual messages are contained in a given Kafka topic. It should be simple but, in reality, is a much more nuanced problem that it first appears. In this article I will describe (with demonstrations) 3 complications that must be considered when counting the number of messages on a Kafka topic.

Before we start, let's set the scene, Kafka is famous for being an immutable commit log with a monotonically increasing offset that references every record. As I produce messages to a topic I see them appear in order with an assigned offset:

 100: B:Z

 101: A:X

 102: C:Z

 103: A:Y

 104: D:Z

 105: A:Z

 106: E:Z

Counting the number of messages should simply be a case of calculating the difference between the first and the last offset. However, as anyone who has tried to reconcile data copied from one Kafka cluster to another will know this only applies in the simplest of circumstances and often produces a very inaccurate result.


1. Compaction

Let’s start with the best documented reason that offsets may disappear from Kafka - log compaction. Compaction is the process of cleaning a topic of old values for a given key. Let’s say you produce a message with key: A and value: X, later you produce a new message with key: A and value: Y. Compaction treats only the latest message on the topic (value: Y) as relevant and may clean older messages (value: X) to save disk space (this is a super brief explanation and a full write up can be found here: https://kafka.apache.org/documentation/#compaction).

The problem this causes is that all messages produced (regardless whether they are deleted later) are assigned a unique and increasing offset. We may produce message A:X at offset 101, offset A:Y at offset 103 and A:Z at offset 105. When A:X and A:Y are removed due to compaction those offsets are deleted and will never be replaced. Offsets are deleted whether they are at the beginning, end or in the middle of the topic leading to gaps in the offsets as shown below (extra messages added for clarity):

 100: B:Z

 101: A:X ←offset 101 is deleted

 102: C:Z

 103: A:Y ← offset 103 is deleted

 104: D:Z

 105: A:Z

106: E:Z

2. Aborted Transactions

Beginning with Kafka 0.11.0.0, users gained the ability to use transaction-like semantics with Kafka. With this feature a group of messages within a transaction are either all guaranteed to be successfully written to Kafka or none of them are.

Internally, Kafka achieves this by writing any messages that are part of the transaction to the topic as they are produced. However, these messages are written with markers that ensure a consumer will ignore them whilst the transaction is in progress. When the transaction completes it is either aborted, at which point the messages with markers continue to be ignored by consumers or committed, at which point a further marker is written that signals consumers to include the transaction’s messages in further operations.

 The problem comes when a transaction is aborted. Messages (with offsets) are written to the topic but ignored by consumers so that the log looks like this:

 100: B:Z

 101: A:X

<— transaction starts here —> 

 102: C:Z 

 103: A:Y 

 104: D:Z ← the consumer ignores these offsets

<— transaction is aborted here —>

 105: A:Z

 106: E:Z

  A count of messages on the topic by offset would yield  6 (106 - 100) but a count of messages from a consumer would yield 4. Offsets 102,103 and 104 are not intended to be included in a count of messages but would be if we were just doing the start/end offset sum.

3. Timestamp ordering

This one is a little more complicated (but still very common) case that involves determining the number of messages produced within a given time period. The Kafka consumer API  gives you the ability to fetch offsets for a given timestamp via the offsetsForTimes method (https://javadoc.io/doc/org.apache.kafka/kafka-clients/latest/org/apache/kafka/clients/consumer/KafkaConsumer.html#offsetsForTimes-java.util.Map-). Using this we can determine the count of records for a given time range by fetching the offsets for the timestamp at the beginning and end of the range. 

Unfortunately these timestamps can be very unreliable. Kafka allows (by default) the producer to set the timestamp of the message and does nothing to ensure that the timestamps of the message on a topic are ordered such that messages with later timestamps have offsets larger than those with earlier. When you have multiple producers writing to a topic a situation like the below is very possible:

 100: B:Z - 2024-07-17 10:01:05

 101: A:X - 2024-07-17 10:01:07

 102: C:Z - 2024-07-17 10:01:09

 103: A:Y - 2024-07-17 10:01:02

 104: D:Z  - 2024-07-17 10:01:04

 105: A:Z  - 2024-07-17 10:01:11

 106: E:Z  - 2024-07-17 10:01:13

A simple count by offset for the time range 2024-07-17 10:01:05 to  2024-07-17 10:01:13 would yield 6 messages, however messages A:Y and D:Z have timestamps outside this range so should not be included.

Conclusion

I hope this article helps you identify a few gotchas that can crop up when we attempt to count records on a topic. All of the above are the result of design decisions made for performance or ease of use reasons but they can result in some hard to reason behaviour. 

It’s worth noting that this is far from an authoritative list. Unfortunately, where Kafka is concerned, the only way to be sure of what is on a topic is to consume it. This in itself raises further problems as the tooling that comes with Kafka makes it difficult to describe problems such as ”give me the count of messages between time X and Y”. Recent advances in SQL overlays for Kafka (I love Streambased.io because I run the company) give an easy way to describe these problems via simple SQL and allow you to connect industry standard tools like Tableau and Superset directly into Kafka data to visualise the results.

Experience lightning-fast filter queries with Streambased: achieve up to 30x speed boost!

Uncover the power of Streambased’s DataLake and unlock the potential for unparalleled efficiency and productivity. Learn more today!

Copyright 2024 Streambased Platform Limited. Company Number 14709247.