部署两个相同group的消费者消费第三方的Kafka集群,收到消息后进行数据处理,然后转发到自己的单机Kafka上,日均约40W数据量,出现了部分数据(约7~8W)消费不到丢失的问题,使用Kafka Tool可以在源Kafka Topic中搜索到消息,但是消费者没有收到消息并打印日志。
源Kafka 0.10.0.0 集群,源Topic12个分区
目标Kafka 0.10.0.1 单机,目标Topic10个分区
消费者+生产者程序 Spring Boot 2.3.12.RELEASE + Spring Kafka 2.5.14.RELEASE(kafka-clients 2.5.1)
配置如下:
## consumer
spring.kafka.consumer.bootstrap-servers=
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.auto-commit-interval=1000
spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.group-id=kafka-consumer-group-20230606
spring.kafka.consumer.max-poll-records=50
spring.kafka.consumer.properties.max.poll.interval.ms=300000
spring.kafka.consumer.properties.session.timeout.ms=60000
spring.kafka.consumer.properties.request.timeout.ms=18500000
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
## producer
spring.kafka.producer.bootstrap-servers=
spring.kafka.producer.retries=3
spring.kafka.producer.acks=1
spring.kafka.producer.batch-size=4096
spring.kafka.producer.buffer-memory=40960
spring.kafka.producer.properties.linger.ms=0
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
## listener
spring.kafka.listener.type=BATCH
spring.kafka.listener.ack-mode=MANUAL
spring.kafka.listener.concurrency=12
代码如下:
@KafkaListener(containerFactory = "batchFactory", topics = "ORIGIN_TOPIC")
public void doMessage(ConsumerRecords<String, String> consumerRecords, Acknowledgment ack) {
for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
if (consumerRecord != null) {
try {
log.info("message:{}", consumerRecord.value());
// 处理收到的数据数据并发送目标topic
String data = consumerRecord.value() + "sssss";
kafkaTemplate.send("target_topic", data);
} catch (Exception e) {
log.error("发送数据到targetTopic异常: {}", e.getMessage(), e);
}
}
}
ack.acknowledge();
}
报错1:
2023-06-11T00:51:56.178+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is java.lang.IllegalStateException: Correlation id for response (922079) does not match request (922069), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-1, correlationId=922069)
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: Correlation id for response (922079) does not match request (922069), request header: RequestHeader(apiKey=OFFSET_COMMIT, apiVersion=2, clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-1, correlationId=922069)
at org.apache.kafka.clients.NetworkClient.correlate(NetworkClient.java:949)
at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:727)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:840)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:559)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:984)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2378)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2373)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2359)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2173)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
... 3 common frames omitted
报错2:
2023-06-12T10:10:04.542+08:00 WARN ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-3-C-1] [o.a.k.c.consumer.internals.ConsumerCoordinator:1165] - [Consumer clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-4, groupId=quanzhougaozhi-ivehcletransfer-group-0516] Offset commit failed on partition deploycontrol_high_speed-3 at offset 11409204: The request timed out.
报错3:
2023-05-17T20:33:01.077+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#1-9-C-1] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading array of size 9092, only 641 bytes available
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.protocol.types.SchemaException: Error reading field 'responses': Error reading field 'partition_responses': Error reading array of size 9092, only 641 bytes available
at org.apache.kafka.common.protocol.types.Schema.read(Schema.java:110)
at org.apache.kafka.common.protocol.ApiKeys.parseResponse(ApiKeys.java:313)
at org.apache.kafka.clients.NetworkClient.parseStructMaybeUpdateThrottleTimeMetrics(NetworkClient.java:726)
at org.apache.kafka.clients.NetworkClient.handleCompletedReceives(NetworkClient.java:840)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:559)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:262)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1307)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1243)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doPoll(KafkaMessageListenerContainer.java:1246)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1146)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
... 3 common frames omitted
报错4:
2023-05-18T00:58:33.372+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-10-C-1] [o.a.k.c.consumer.internals.ConsumerCoordinator:1167] - [Consumer clientId=consumer-quanzhougaozhi-ivehcletransfer-group-0516-11, groupId=quanzhougaozhi-ivehcletransfer-group-0516] Offset commit failed on partition deploycontrol_high_speed-4 at offset 10469635: The coordinator is not aware of this member.
2023-05-18T00:58:33.373+08:00 ERROR ctm01dt.ctm01dt [org.springframework.kafka.KafkaListenerEndpointContainer#0-10-C-1] [o.s.k.l.KafkaMessageListenerContainer$ListenerConsumer:149] - Consumer exception
org.springframework.kafka.KafkaException: Seek to current after exception; nested exception is org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.springframework.kafka.listener.SeekToCurrentBatchErrorHandler.handle(SeekToCurrentBatchErrorHandler.java:72)
at org.springframework.kafka.listener.RecoveringBatchErrorHandler.handle(RecoveringBatchErrorHandler.java:124)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1405)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1108)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1213)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1140)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076)
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:212)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:984)
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1510)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:2378)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:2373)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2359)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2173)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1133)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1059)
... 3 common frames omitted
报错5:
使用Kafka Tool查看消费者消费情况,正常能看到12个分区(0~11),但是有时会少掉几个分区(3、4、6)
已经尝试以下方法,但是不明确效果
(1)降低Spring Boot和Spring Kafka版本,Spring Boot 2.1.8.RELEASE + Spring Kafka 2.2.8.RELEASE
(2)客户端使用kafka-clients 0.10.0.0