1、场景和问题
部署两个相同group的消费者消费第三方的Kafka集群,收到消息后进行数据处理,然后转发到自己的单机Kafka上,日均约40W数据量,出现了部分数据(约7~8W)消费不到丢失的问题,使用Kafka Tool可以在源Kafka Topic中搜索到消息,但是消费者没有收到消息并打印日志。
2、版本和环境
源Kafka 0.10.0.0 集群,源Topic12个分区
目标Kafka 0.10.0.1 单机,目标Topic10个分区
消费者+生产者程序 Spring Boot 2.3.12.RELEASE + Spring Kafka 2.5.14.RELEASE(kafka-clients 2.5.1)
3、相关代码和配置
配置如下:
## consumer
spring.kafka.consumer.bootstrap-servers=
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.group-id=kafka-consumer-group-20230606
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.properties.max.poll.interval.ms=300000
spring.kafka.consumer.properties.session.timeout.ms=60000
spring.kafka.consumer.properties.request.timeout.ms=18500000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
## producer
spring.kafka.producer.bootstrap-servers=
spring.kafka.producer.retries=3
spring.kafka.producer.acks=1
spring.kafka.producer.batch-size=4096
spring.kafka.producer.buffer-memory=40960
spring.kafka.producer.properties.linger.ms=0
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
## listener
spring.kafka.listener.type=BATCH
spring.kafka.listener.ack-mode=MANUAL
spring.kafka.listener.concurrency=12
代码如下:
@KafkaListener(containerFactory = "batchFactory", topics = "ORIGIN_TOPIC")
public void doMessage(ConsumerRecords<String, String> consumerRecords, Acknowledgment ack) {
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (consumerRecord != nulllic void doMessage)
public void doMessage(ConsumerRecords<String, String> consumerRecords, Acknowledgment ack) {
for (ConsumerRecord<String, String> consumerRecord : consumerRecordsic void)
public void doMessage(ConsumerRecords<String, String> consumerRecords, Acknowledgment ack) {
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (consumerRecord != null) {
try {
log.info("message:{}", consumerRecord.value());
// 处理收到的数据数据并发送目标topic
String data = consumerRecord.value() + "sssss";
kafkaTemplate.send("target_topic", data);
} catch (Exception e) {
log.error("发送数据到targetTopic异常: {}", e.getMessage(), e);
}
}
}
ack.acknowledge();
}
4、报错信息
报错1:
2023-06-11T00:51:56.178+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is java.lang.IllegalStateException: Correlation id for response (922079) does not match request (922069), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-1, correlationId=922069)
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Correlation id for response (922079) does not match request (922069), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-1, correlationId=922069)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:949)
at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:727)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:840)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:559)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:984)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2378)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2373)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2359)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2173)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
... 3 common frames omitted
报错2:
2023-06-12T10:10:04.542+08:00 WARN ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] [o.a.k.c.consumer.internals.ConsumerCoordinator:1165] - [Consumer clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-4, groupId=quanzhougaozhi-ivehcletransfer-group-0516] Offset commit failed on partition deploycontrol_high_speed-3 at offset 11409204: The request timed out.
报错3:
2023-05-17T20:33:01.077+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#1-9-C-1] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading array of size 9092, only 641 bytes available
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading array of size 9092, only 641 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:313)
at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:726)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:840)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:559)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1246)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1146)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
... 3 common frames omitted
报错4:
2023-05-18T00:58:33.372+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-10-C-1] [o.a.k.c.consumer.internals.ConsumerCoordinator:1167] - [Consumer clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-11, groupId=quanzhougaozhi-ivehcletransfer-group-0516] Offset commit failed on partition deploycontrol_high_speed-4 at offset 10469635: The coordinator is not aware of this member.
2023-05-18T00:58:33.373+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-10-C-1] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1213)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1140)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:984)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2378)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2373)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2359)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2173)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
... 3 common frames omitted
报错5:
使用Kafka Tool查看消费者消费情况,正常能看到12个分区(0~11),但是有时会少掉几个分区(3、4、6)
5、已尝试方法
已经尝试以下方法,但是不明确效果
(1)降低Spring Boot和Spring Kafka版本,Spring Boot 2.1.8.RELEASE + Spring Kafka 2.2.8.RELEASE
(2)客户端使用kafka-clients 0.10.0.0
请求和收到的不匹配,可能是异常太多,多线程springboot错乱导致的。
所以我们聚焦核心异常:
核心异常是:org.apache.kafka.clients.consumer.CommitFailedException
默认情况下,消息处理时间超过了30秒,kafka将该消费者从组中移除了,认为其已经无效。所以将组中移除,所以提交offset失败了。
而失败,会导致重新选举消费者,而你的消费者程序有12个,就是此时导致的错乱。
解决:
CommitFailedException
,那你就需要设置获取消息的数量大小了,就是pull少一点的kafka消息,这样发送到新kafka的量少了,超时的问题就得到了缓解。spring.kafka.listener.concurrency=12
,并发也是导致你异常加重的原因之一,越小的话同时处理的消息少,不容易阻塞在提交新集群那里,异常的概率越低。另外你现在是2个消费者程序,可以通过增加消费者程序来解决并发的问题,而不是通过调这个参数。spring.kafka.listener.concurrency=12
话说回来,你一共12个分区,1个消费者程序12个并发的话,1个消费者程序全占满了,另外一个消费者程序可能分到个别甚至是0个消费者分区。比如你现在2个消费者,concurrency
不应该超过6个,3个话不能超过4个,我建议你默认1个就行了,处理不会慢太多,可以增加消费者程序来提高并发。感谢站长回复。我新开了一个消费者demo,换了一个group消费第三方Kafka集群,参数如正文没做调整,去掉了发送数据到目标Kafka的操作,依然出现正文提到的报错1、2、3、4。我接下来尝试一下站长提出的解决方案1和2,但是目前的配置时间是不是已经很长了,有没有可能是和第三方的Kafka集群间的网络问题造成的呢?
spring.kafka.consumer.max-poll-records=50 spring.kafka.consumer.properties.max.poll.interval.ms=300000 spring.kafka.consumer.properties.session.timeout.ms=60000 spring.kafka.consumer.properties.request.timeout.ms=18500000
3、4也很重要,降低并发消费者先。
你的答案