目前看我对应分区的偏移量正确提交了,成功日志正常打印了。
public void run() {
// 设置线程Name
threadName = Thread.currentThread().getName() + BuildUtils.buildUniqueNo();
Thread.currentThread().setName(threadName);
LOGGER.info("UserComponentConsumer init threadName:{}!", threadName);
try {
// 监控消费者
Thread.sleep(50);
// 订阅Topic
consumer.subscribe(Collections.singletonList(componentGroupConfig.getTopic()), new UserComponentRebalanceHandler());
// init 偏移量
consumerInitOffset(new ArrayList<>(consumer.assignment()), consumer);
// 循环接收消息
while (consumerOpenFlag && !Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (TopicPartition partition : records.partitions()) {
// 依次处理分区批量消息
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long firstOffset = partitionRecords.get(0).offset(); // 分区批量消息 头偏移量
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 分区批量消息 尾偏移量
try {
// 消息解析
messageDispose(partitionRecords);
} catch (Exception exception) {
LOGGER.error("UserComponentConsumer messageDispose threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {} error:{}", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, exception);
}
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset))); // 同步提交
LOGGER.info("UserComponentConsumer threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {}, GroupId:{} !!", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, componentGroupConfig.getGroupId());
}
}
} catch (WakeupException ex) {
LOGGER.error("UserComponentConsumer threadName:{} WakeupException:{}", threadName, ex);
} catch (InterruptedException ex) {
LOGGER.error("UserComponentConsumer threadName:{} current thread of Interrupted, InterruptedException:{} !!", threadName, ex);
// 恢复线程的中断状态
Thread.currentThread().interrupt();
} catch (Exception ex) {
LOGGER.error("UserComponentConsumer threadName:{} Exception error:{} ", threadName, ex);
} finally {
// 消费者退出
consumer.close();
}
}
目前看我对应分区的偏移量正确提交了,成功日志正常打印了。
public void run() {
// 设置线程Name
threadName = Thread.currentThread().getName() + BuildUtils.buildUniqueNo();
Thread.currentThread().setName(threadName);
LOGGER.info("UserComponentConsumer init threadName:{}!", threadName);
try {
// 监控消费者
Thread.sleep(50);
// 订阅Topic
consumer.subscribe(Collections.singletonList(componentGroupConfig.getTopic()), new UserComponentRebalanceHandler());
// init 偏移量
consumerInitOffset(new ArrayList<>(consumer.assignment()), consumer);
// 循环接收消息
while (consumerOpenFlag && !Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (TopicPartition partition : records.partitions()) {
// 依次处理分区批量消息
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long firstOffset = partitionRecords.get(0).offset(); // 分区批量消息 头偏移量
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 分区批量消息 尾偏移量
try {
// 消息解析
messageDispose(partitionRecords);
} catch (Exception exception) {
LOGGER.error("UserComponentConsumer messageDispose threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {} error:{}", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, exception);
}
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset))); // 同步提交
LOGGER.info("UserComponentConsumer threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {}, GroupId:{} !!", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, componentGroupConfig.getGroupId());
}
}
} catch (WakeupException ex) {
LOGGER.error("UserComponentConsumer threadName:{} WakeupException:{}", threadName, ex);
} catch (InterruptedException ex) {
LOGGER.error("UserComponentConsumer threadName:{} current thread of Interrupted, InterruptedException:{} !!", threadName, ex);
// 恢复线程的中断状态
Thread.currentThread().interrupt();
} catch (Exception ex) {
LOGGER.error("UserComponentConsumer threadName:{} Exception error:{} ", threadName, ex);
} finally {
// 消费者退出
consumer.close();
}
}
kafka消费者是批量拉取消息的,比如一次拉取2000条。
说到这里,简单的消费者逻辑你已经理解了,问题的核心在于你什么时候提交这个offset。
目前看我对应分区的偏移量正确提交了,成功日志正常打印了。
public void run() {
// 设置线程Name
threadName = Thread.currentThread().getName() + BuildUtils.buildUniqueNo();
Thread.currentThread().setName(threadName);
LOGGER.info("UserComponentConsumer init threadName:{}!", threadName);
try {
// 监控消费者
Thread.sleep(50);
// 订阅Topic
consumer.subscribe(Collections.singletonList(componentGroupConfig.getTopic()), new UserComponentRebalanceHandler());
// init 偏移量
consumerInitOffset(new ArrayList<>(consumer.assignment()), consumer);
// 循环接收消息
while (consumerOpenFlag && !Thread.currentThread().isInterrupted()) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
for (TopicPartition partition : records.partitions()) {
// 依次处理分区批量消息
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
long firstOffset = partitionRecords.get(0).offset(); // 分区批量消息 头偏移量
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset(); // 分区批量消息 尾偏移量
try {
// 消息解析
messageDispose(partitionRecords);
} catch (Exception exception) {
LOGGER.error("UserComponentConsumer messageDispose threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {} error:{}", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, exception);
}
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset))); // 同步提交
LOGGER.info("UserComponentConsumer threadName:{} topic:{}, partition:{} firstOffset:{} lastOffset: {}, GroupId:{} !!", threadName, partition.topic(), partition.partition(), firstOffset, lastOffset, componentGroupConfig.getGroupId());
}
}
} catch (WakeupException ex) {
LOGGER.error("UserComponentConsumer threadName:{} WakeupException:{}", threadName, ex);
} catch (InterruptedException ex) {
LOGGER.error("UserComponentConsumer threadName:{} current thread of Interrupted, InterruptedException:{} !!", threadName, ex);
// 恢复线程的中断状态
Thread.currentThread().interrupt();
} catch (Exception ex) {
LOGGER.error("UserComponentConsumer threadName:{} Exception error:{} ", threadName, ex);
} finally {
// 消费者退出
consumer.close();
}
}
没有那么多玄幻的情况,首先你要确保你提交的offset是正确的分区并且是成功的。
我是通过一个线程包装成一个消费者,在关闭程序的时候是通过优雅停机,在线程里面循环判断是否中断,再退出消费者,正常情况下都是这批消息处理完,提交偏移量之后,下一次循环进行退出,理论上不应该出现重复数据。
kafka消费者是批量拉取消息的,比如一次拉取2000条。
说到这里,简单的消费者逻辑你已经理解了,问题的核心在于你什么时候提交这个offset。