I have a Kafka topic with 3 partitions.
I am trying to create a test Consumer to fetch last N messages from each partition.
For that I manually assign to each partition, shift the offset and poll like the following:
val topicPartition = TopicPartition(topic, 1) // where 1 is the number of a partition
consumer.assign(listOf(topicPartition))
consumer.seekToEnd(listOf(topicPartition))
val lastOffset = consumer.position(topicPartition)
consumer.seek(topicPartition, lastOffset - N) // lastOffset is known to be > N
val consumerRecords = consumer.poll(Duration.ofMillis(10000))
I repeat this for all 3 partitions.
This works fine for 2 of 3 partitions.
Surprisingly, this never works for the one (always the same) partition, so that poll() always goes waiting for the given timeout and returns nothing.
Notes:
- It does not depend on the partitions polling order.
- I tried polling more than once as suggested here - no luck.
- The consumer has its own unique group_id
- Consumer has the following props:
consumerProps["auto.offset.reset"] = "earliest"
consumerProps["max.poll.records"] = 500
consumerProps["fetch.max.bytes"] = 50000000
consumerProps["max.partition.fetch.bytes"] = 50000000
What could be the reason for this behavior?