kafka消费者根据时间戳进行消费后,总是报错无法提交offset,导致再开启其他消费者时都是从最开始的消息开始消费,无法从后面连续消费。大佬帮忙看看,十分感谢!
// 已设置为自动提交 spring.kafka.consumer.enable-auto-commit=true
@RequestMapping(value = "/incrementConsumer")
public void IncrementConsumer() {
ConsumerFactory consumerFactory = containerFactory.getConsumerFactory();
Map<String, Object> properties = consumerFactory.getConfigurationProperties();
for (Object key : properties.keySet()) {
System.out.println("key:" + key + "---" + "value:" + properties.get(key));
}
Consumer consumer = consumerFactory.createConsumer();
Map<TopicPartition, Long> startMap = new HashMap<>();
List<PartitionInfo> partitionInfoList = consumer.partitionsFor("testIncrement");
List<TopicPartition> topicPartitions = new ArrayList<>();
for (PartitionInfo par : partitionInfoList) {
topicPartitions.add(new TopicPartition(par.topic(), par.partition()));
startMap.put(new TopicPartition("testIncrement", par.partition()), TableMessage.all_get_time.getTime());
}
DateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
consumer.assign(topicPartitions);
Map<TopicPartition, OffsetAndTimestamp> startOffsetMap = consumer.offsetsForTimes(startMap);
Map<TopicPartition, Long> endOffsets = consumer.endOffsets(topicPartitions);
System.out.println("startMap====" + startMap);
System.out.println("startOffsetMap====" + startOffsetMap);
OffsetAndTimestamp offsetTimestamp = null;
for (Map.Entry<TopicPartition, OffsetAndTimestamp> entry : startOffsetMap.entrySet()) {
// 如果设置的查询偏移量的时间点大于最大的索引记录时间,那么value就为空
offsetTimestamp = entry.getValue();
if (offsetTimestamp != null) {
int partition = entry.getKey().partition();
long timestamp = offsetTimestamp.timestamp();
long offset = offsetTimestamp.offset();
System.out.println("partition = " + partition +
", time = " + df.format(new Date(timestamp)) +
", offset = " + offset);
// 设置读取消息的偏移量
consumer.seek(entry.getKey(), offset);
} else {
consumer.seek(entry.getKey(), endOffsets.get(entry.getKey()));
}
}
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofSeconds(100));
ObjectMapper objectMapper = new ObjectMapper();
System.out.print(records.count());
for (ConsumerRecord<String, String> record : records) {
System.out.println(record.key() + record.value());
}
}
}
贴上报错信息
ERROR [Consumer clientId=consumer-1, groupId=test] Offset commit failed on partition topic01-0 at offset 205: The coordinator is not aware of this member. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
WARN [Consumer clientId=consumer-1, groupId=console-consumer-56648] Asynchronous auto-commit of offsets {topic01-0=OffsetAndMetadata{offset=205, metadata=''}} failed: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records. (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator)
尝试过使用手动提交,但是会在提交的方法处报错
consumer.commitSync(); //该行报错
Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.