2

I have a Kafka topic with 3 partitions.
I am trying to create a test Consumer to fetch last N messages from each partition.

For that I manually assign to each partition, shift the offset and poll like the following:

val topicPartition = TopicPartition(topic, 1) // where 1 is the number of a partition
consumer.assign(listOf(topicPartition))
consumer.seekToEnd(listOf(topicPartition))
val lastOffset = consumer.position(topicPartition)
consumer.seek(topicPartition, lastOffset - N) // lastOffset is known to be > N
val consumerRecords = consumer.poll(Duration.ofMillis(10000))

I repeat this for all 3 partitions.
This works fine for 2 of 3 partitions.
Surprisingly, this never works for the one (always the same) partition, so that poll() always goes waiting for the given timeout and returns nothing.
Notes:

  • It does not depend on the partitions polling order.
  • I tried polling more than once as suggested here - no luck.
  • The consumer has its own unique group_id
  • Consumer has the following props:
consumerProps["auto.offset.reset"] = "earliest"
consumerProps["max.poll.records"] = 500
consumerProps["fetch.max.bytes"] = 50000000
consumerProps["max.partition.fetch.bytes"] = 50000000

What could be the reason for this behavior?

fyrkov
  • 2,245
  • 16
  • 41
  • Describe the consumer group, and show the offsets of the topic, please – OneCricketeer May 18 '21 at 13:00
  • The consumer group consists only of this test consumer. The offsets for the topics are something like [160, 40, 40] (its because messages are produced with a message key and this a little bit unbalanced at in test scenario) – fyrkov May 18 '21 at 19:07
  • Is the broker hosting this third partition healthy? And all its min ISRs are up to date? – OneCricketeer May 18 '21 at 23:32
  • Why can't you manually assign all three partitions (with required offsets) to a single consumer at once? – Inego May 26 '21 at 03:28

0 Answers0