疑似kafka broker不稳定导致提交offset失败,导致重复消费

✎ @#* 发表于: 2022-05-19   最后更新时间: 2022-05-19 15:17:14   3,458 游览

背景:有一个py定时任务自动化测试程序,每隔5分钟产生几条数据到指定topic:gzyq。 有个转换分流程序:一直运行,将gzyq的数据处理后分发到其他指定topic。py任务会检查,如果如果生产的数据没有在规定时间内到分流后指定topic(大概要求半分钟的样子),则通过钉钉机器人提醒我。

问题:最近几天才收到提醒,查看后发现数据确实分流到了,但是时间超过了半分钟,且重复了,代码一直跑了一个月没问题,而且出现的频率是一天大概5、6回,有时间隔一个小时左右。

求救:暂不知道是什么原因导致?难道是broker不稳定啥的?

这是报错提示,有点意思,上一秒还好好的,突然就断了:

9:50分给我发了提醒:“扈忠花”附近这几条数据没在规定时间到达topic,查看后发现重复了

2022-05-19 09:45:22.265 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"208767367384","sfzh":"530521198806021572","xm":"宿祥峰","tgkkmc":"东台站十五口(验检自助)","tgsj":"20220518151815","mz":"傣族"}
2022-05-19 09:45:22.265 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
2022-05-19 09:45:23.267 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:45:22topic :xds-TL-5C2DE02081DD4D05A49357D7A402548F
2022-05-19 09:49:24.092 INFO 8 --- [-thread | demo2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Group coordinator 192.168.200.13:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2022-05-19 09:49:24.192 ERROR 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Offset commit with offsets {gzyq-0=OffsetAndMetadata{offset=468637, leaderEpoch=0, metadata=''}} failed
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException: null
2022-05-19 09:49:44.818 INFO 8 --- [ceiveExecutor-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=1, groupId=demo2] Error sending fetch request (sessionId=746331934, epoch=2428) to node 0: {}.
org.apache.kafka.common.errors.DisconnectException: null
2022-05-19 09:50:48.843 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Discovered group coordinator 192.168.200.13:9092 (id: 2147483647 rack: null)
开始解析数据:{'area': '', 'code': '1C8F345445084E43B4BFA1ED546FD9C2', 'data': {'cfd': '', 'jpzt': 'RR', 'qfsj': '20220520004255', 'ddsj': '20220520054856', 'zjhm': '430382200407028548', 'mdd': 'HRB', 'hbh': '5687'}, 'district': '5100', 'rowkey': 'f1872cb66bc75744d3c0aa62ad3d6d94', 'id': '430382200407028548', 'time': '20220520054856', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cfd":"","jpzt":"RR","qfsj":"20220520004255","ddsj":"20220520054856","zjhm":"430382200407028548","mdd":"HRB","hbh":"5687"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-1C8F345445084E43B4BFA1ED546FD9C2
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'MFM', 'psr_chnname': '印华海', 'cert_no': '520303199701089502', 'seg_dept_code': 'SJW', 'sta_depttm': '20220516195831', 'flt_airlcode': 'CDC', 'flt_number': '1698', 'sta_arvetm': '20220516223540'}, 'district': '5100', 'rowkey': '2e4f5983022960fc4971ff59955e6273', 'id': '520303199701089502', 'time': '20220516223540', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"MFM","psr_chnname":"印华海","cert_no":"520303199701089502","seg_dept_code":"SJW","sta_depttm":"20220516195831","flt_airlcode":"CDC","flt_number":"1698","sta_arvetm":"20220516223540"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'XMN', 'psr_chnname': '扈忠花', 'cert_no': '653227197506160578', 'seg_dept_code': 'WDS', 'sta_depttm': '20220518054348', 'flt_airlcode': 'DKH', 'flt_number': '5032', 'sta_arvetm': '20220518092523'}, 'district': '5100', 'rowkey': '7a4cf8d61c2701bd68b87697a5aea4b3', 'id': '653227197506160578', 'time': '20220518092523', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"XMN","psr_chnname":"扈忠花","cert_no":"653227197506160578","seg_dept_code":"WDS","sta_depttm":"20220518054348","flt_airlcode":"DKH","flt_number":"5032","sta_arvetm":"20220518092523"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'YSQ', 'psr_chnname': '言少佳', 'cert_no': '422823201807096533', 'seg_dept_code': 'KWE', 'sta_depttm': '20220518181015', 'flt_airlcode': 'CZ', 'flt_number': '8387', 'sta_arvetm': '20220518220523'}, 'district': '5100', 'rowkey': '3a9dfc201fb7313a3e54640047998dda', 'id': '422823201807096533', 'time': '20220518220523', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"YSQ","psr_chnname":"言少佳","cert_no":"422823201807096533","seg_dept_code":"KWE","sta_depttm":"20220518181015","flt_airlcode":"CZ","flt_number":"8387","sta_arvetm":"20220518220523"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '86E69ACE27AA428DB8C7ED4DEAC746BA', 'data': {'cc': 'D1289', 'ccrq': '20220520', 'spsj': '20220517125414', 'cxh': '04', 'fz': '哈密', 'xm': '柯超美', 'dz': '三水南', 'zwh': '04C'}, 'district': '5100', 'rowkey': 'fd6118d244c5368969f88e88775d4aa3', 'id': '411503199007077380', 'time': '20220517125414', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cc":"D1289","ccrq":"20220520","spsj":"20220517125414","cxh":"04","fz":"哈密","xm":"柯超美","dz":"三水南","zwh":"04C"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-86E69ACE27AA428DB8C7ED4DEAC746BA
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '598542631634', 'sfzh': '341202199611210562', 'xm': '富维焕', 'tgkkmc': '盖州西站四口(验检半自助)', 'tgsj': '20220517035354', 'mz': '哈萨克族'}, 'district': '5100', 'rowkey': 'cc2a1cf44087dcf94ccbbaed120050aa', 'id': '341202199611210562', 'time': '20220517035354', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"598542631634","sfzh":"341202199611210562","xm":"富维焕","tgkkmc":"盖州西站四口(验检半自助)","tgsj":"20220517035354","mz":"哈萨克族"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '711236198700', 'sfzh': '130104199312167602', 'xm': '黎上游', 'tgkkmc': '吉文站三口(验检半自助)', 'tgsj': '20220516113903', 'mz': '裕固族'}, 'district': '5100', 'rowkey': 'c3e77da649aacc02b795447b8db11f45', 'id': '130104199312167602', 'time': '20220516113903', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"711236198700","sfzh":"130104199312167602","xm":"黎上游","tgkkmc":"吉文站三口(验检半自助)","tgsj":"20220516113903","mz":"裕固族"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '484264587151', 'sfzh': '230421198201147221', 'xm': '司徒民英', 'tgkkmc': '哈尔滨北站三口(验检半自助)', 'tgsj': '20220516184843', 'mz': '满族'}, 'district': '5100', 'rowkey': '3c16e5e717170b1cd47fd095a934b6aa', 'id': '230421198201147221', 'time': '20220516184843', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"484264587151","sfzh":"230421198201147221","xm":"司徒民英","tgkkmc":"哈尔滨北站三口(验检半自助)","tgsj":"20220516184843","mz":"满族"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '804167097290', 'sfzh': '140921197705152455', 'xm': '居宏烈', 'tgkkmc': '双丰站九口(自助)', 'tgsj': '20220517094244', 'mz': '布依族'}, 'district': '5100', 'rowkey': '7019b59cdb6d6a7ac11739db70124ee6', 'id': '140921197705152455', 'time': '20220517094244', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"804167097290","sfzh":"140921197705152455","xm":"居宏烈","tgkkmc":"双丰站九口(自助)","tgsj":"20220517094244","mz":"布依族"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Attempt to heartbeat failed for since member id 1-4a849c1a-5b7f-4d89-96b4-fa239dab7901 is not valid.
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Lost previously assigned partitions gzyq-0
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] (Re-)joining group
2022-05-19 09:50:48.845 ERROR 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Offset commit failed on partition gzyq-0 at offset 468646: The coordinator is not aware of this member.
2022-05-19 09:50:48.845 ERROR 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Offset commit with offsets {gzyq-0=OffsetAndMetadata{offset=468646, leaderEpoch=0, metadata=''}} failed
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1206) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1133) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308) [kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) [kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) [kafka-clients-2.5.0.jar!/:na]
at com.trs.nj.kafka.xdsconverter.serivce.impl.KafkaServiceImpl.lambda$consumer$3(KafkaServiceImpl.java:113) [classes!/:0.0.1-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_102]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_102]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_102]
2022-05-19 09:50:48.845 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2022-05-19 09:50:48.846 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] (Re-)joining group
2022-05-19 09:50:49.345 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Finished assignment for group at generation 636: {1-37010865-f09f-4735-a516-964d71d4b85b=Assignment(partitions=[gzyq-0])}
2022-05-19 09:50:49.346 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Successfully joined group with generation 636
2022-05-19 09:50:49.346 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Adding newly assigned partitions: gzyq-0
2022-05-19 09:50:49.347 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Setting offset for partition gzyq-0 to the committed offset FetchPosition{offset=468637, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional.empty, epoch=0}}
开始解析数据:{'area': '', 'code': '1C8F345445084E43B4BFA1ED546FD9C2', 'data': {'cfd': '', 'jpzt': 'RR', 'qfsj': '20220520004255', 'ddsj': '20220520054856', 'zjhm': '430382200407028548', 'mdd': 'HRB', 'hbh': '5687'}, 'district': '5100', 'rowkey': 'f1872cb66bc75744d3c0aa62ad3d6d94', 'id': '430382200407028548', 'time': '20220520054856', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cfd":"","jpzt":"RR","qfsj":"20220520004255","ddsj":"20220520054856","zjhm":"430382200407028548","mdd":"HRB","hbh":"5687"}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-1C8F345445084E43B4BFA1ED546FD9C2
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'MFM', 'psr_chnname': '印华海', 'cert_no': '520303199701089502', 'seg_dept_code': 'SJW', 'sta_depttm': '20220516195831', 'flt_airlcode': 'CDC', 'flt_number': '1698', 'sta_arvetm': '20220516223540'}, 'district': '5100', 'rowkey': '2e4f5983022960fc4971ff59955e6273', 'id': '520303199701089502', 'time': '20220516223540', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"MFM","psr_chnname":"印华海","cert_no":"520303199701089502","seg_dept_code":"SJW","sta_depttm":"20220516195831","flt_airlcode":"CDC","flt_number":"1698","sta_arvetm":"20220516223540"}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'XMN', 'psr_chnname': '扈忠花', 'cert_no': '653227197506160578', 'seg_dept_code': 'WDS', 'sta_depttm': '20220518054348', 'flt_airlcode': 'DKH', 'flt_number': '5032', 'sta_arvetm': '20220518092523'}, 'district': '5100', 'rowkey': '7a4cf8d61c2701bd68b87697a5aea4b3', 'id': '653227197506160578', 'time': '20220518092523', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"XMN","psr_chnname":"扈忠花","cert_no":"653227197506160578","seg_dept_code":"WDS","sta_depttm":"20220518054348","flt_airlcode":"DKH","flt_number":"5032","sta_arvetm":"20220518092523"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'YSQ', 'psr_chnname': '言少佳', 'cert_no': '422823201807096533', 'seg_dept_code': 'KWE', 'sta_depttm': '20220518181015', 'flt_airlcode': 'CZ', 'flt_number': '8387', 'sta_arvetm': '20220518220523'}, 'district': '5100', 'rowkey': '3a9dfc201fb7313a3e54640047998dda', 'id': '422823201807096533', 'time': '20220518220523', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"YSQ","psr_chnname":"言少佳","cert_no":"422823201807096533","seg_dept_code":"KWE","sta_depttm":"20220518181015","flt_airlcode":"CZ","flt_number":"8387","sta_arvetm":"20220518220523"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '86E69ACE27AA428DB8C7ED4DEAC746BA', 'data': {'cc': 'D1289', 'ccrq': '20220520', 'spsj': '20220517125414', 'cxh': '04', 'fz': '哈密', 'xm': '柯超美', 'dz': '三水南', 'zwh': '04C'}, 'district': '5100', 'rowkey': 'fd6118d244c5368969f88e88775d4aa3', 'id': '411503199007077380', 'time': '20220517125414', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cc":"D1289","ccrq":"20220520","spsj":"20220517125414","cxh":"04","fz":"哈密","xm":"柯超美","dz":"三水南","zwh":"04C"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-86E69ACE27AA428DB8C7ED4DEAC746BA
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '598542631634', 'sfzh': '341202199611210562', 'xm': '富维焕', 'tgkkmc': '盖州西站四口(验检半自助)', 'tgsj': '20220517035354', 'mz': '哈萨克族'}, 'district': '5100', 'rowkey': 'cc2a1cf44087dcf94ccbbaed120050aa', 'id': '341202199611210562', 'time': '20220517035354', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"598542631634","sfzh":"341202199611210562","xm":"富维焕","tgkkmc":"盖州西站四口(验检半自助)","tgsj":"20220517035354","mz":"哈萨克族"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '711236198700', 'sfzh': '130104199312167602', 'xm': '黎上游', 'tgkkmc': '吉文站三口(验检半自助)', 'tgsj': '20220516113903', 'mz': '裕固族'}, 'district': '5100', 'rowkey': 'c3e77da649aacc02b795447b8db11f45', 'id': '130104199312167602', 'time': '20220516113903', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"711236198700","sfzh":"130104199312167602","xm":"黎上游","tgkkmc":"吉文站三口(验检半自助)","tgsj":"20220516113903","mz":"裕固族"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '484264587151', 'sfzh': '230421198201147221', 'xm': '司徒民英', 'tgkkmc': '哈尔滨北站三口(验检半自助)', 'tgsj': '20220516184843', 'mz': '满族'}, 'district': '5100', 'rowkey': '3c16e5e717170b1cd47fd095a934b6aa', 'id': '230421198201147221', 'time': '20220516184843', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"484264587151","sfzh":"230421198201147221","xm":"司徒民英","tgkkmc":"哈尔滨北站三口(验检半自助)","tgsj":"20220516184843","mz":"满族"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '804167097290', 'sfzh': '140921197705152455', 'xm': '居宏烈', 'tgkkmc': '双丰站九口(自助)', 'tgsj': '20220517094244', 'mz': '布依族'}, 'district': '5100', 'rowkey': '7019b59cdb6d6a7ac11739db70124ee6', 'id': '140921197705152455', 'time': '20220517094244', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"804167097290","sfzh":"140921197705152455","xm":"居宏烈","tgkkmc":"双丰站九口(自助)","tgsj":"20220517094244","mz":"布依族"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:48topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:48topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:48topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:49topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:49topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045

尝试:代码我尝试改成手动提交了,还是有同样问题:


    @PostConstruct
    private void init() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config);
        props.put(ProducerConfig.RETRIES_CONFIG, 3); // 发送失败时,重新发送消息次数
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000); // 批量发送消息的间隔时间
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1000000); // 生产者缓存消息的内存字节数
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG)
        producer = new KafkaProducer<>(props);
    }

    public void producer(String data, String topic) {
        sendExecutor.execute(
                () -> {
                    try {
                        producer.send(
                                new ProducerRecord<>(topic, data),
                                ((metadata, exception) -> {
                                    SimpleDateFormat simpleDateFormat =
                                            new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                    logger.info(
                                            "开始发送: 时间 :"
                                                    + simpleDateFormat.format(
                                                            new Date(metadata.timestamp()))
                                                    + "  topic :"
                                                    + topic);
                                }));
                        Thread.sleep(3000);
                    } catch (Exception e) {
                        logger.error("error : mq producer thread exception", e);
                    }
                });
    }

    @Override
    public void consumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Stream.of(consumerTopic).collect(Collectors.toList()));

        receiveExecutor.execute(
                () -> {
                    try {
                        while (true) {
                            ConsumerRecords<String, String> records =
                                    consumer.poll(Duration.ofSeconds(1L));
                            records.forEach(
                                    record -> {
                                        String data =
                                                convertDataService.convertData(record.value());
                                        logger.info("解析后数据 : " + data);
                                        String topic = convertDataService.getTopic();
                                        logger.info("发送到topic: " + topic);
                                        producer(data, topic);
                                    });
                            consumer.commitAsync();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            consumer.commitSync();
                        } finally {
                            consumer.close();
                        }
                    }
                });
    }

而且处理过程时间很短,就是根据内容生成topic直接发出去:

@Override
    public String convertData(String value) {
        System.out.println("开始解析数据:" + value);
        JSONObject jsonObject = JSON.parseObject(value);
        String eventCode = jsonObject.getString("code");
        topic = "xds-" + map.get(eventCode) + "-" + eventCode;
        return jsonObject.getJSONObject("data").toJSONString();
    }

screenshot

screenshot

不知道啥原因了,而且还有一个程序去消费这个分流后的数据:也是重复了:

screenshot

发表于 2022-05-19
添加评论
✎ @#* -> 半兽人 2年前

参数已经改了,还是不行呢?感觉不是处理数据消耗时间的问题,我这几乎没有处理逻辑,老问题。我看报错是:突然获取不到协调者了,offset=469854提交失败,然后解析了几条消息,然后offset=469865提交失败,然后发送消息成功,然后又提交offset=469854失败,然后又第二次解析数据并发送成功。

public void consumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        // 这几个是新加的
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,60000);
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
        props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,60000);
        props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,70000);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Stream.of(consumerTopic).collect(Collectors.toList()));

        receiveExecutor.execute(
                () -> {
                    try {
                        while (true) {
                            ConsumerRecords<String, String> records =
                                    consumer.poll(Duration.ofSeconds(1L));
                            // 这个日志打印新加的,想看是不是一直在循环拉
                            logger.info("消息拉取时间:" + LocalDateTime.now());
                            records.forEach(
                                    record -> {
                                        String data =
                                                convertDataService.convertData(record.value());
                                        logger.info("解析后数据 : " + data);
                                        String topic = convertDataService.getTopic();
                                        logger.info("发送到topic: " + topic);
                                        producer(data, topic);
                                    });
                            consumer.commitAsync();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            consumer.commitSync();
                        } finally {
                            consumer.close();
                        }
                    }
                });
    }
2022-05-19 18:35:14.895 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:35:14.895
2022-05-19 18:35:15.059 INFO 7 --- [-thread | demo2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Group coordinator 192.168.200.13:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2022-05-19 18:35:15.169 ERROR 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Offset commit with offsets {gzyq-0=OffsetAndMetadata{offset=469854, leaderEpoch=0, metadata=''}} failed
2022-05-19 18:35:15.896 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:35:15.896
·
·
·
2022-05-19 18:35:32.896 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:35:32.896
2022-05-19 18:35:33.707 INFO 7 --- [ceiveExecutor-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=1, groupId=demo2] Error sending fetch request (sessionId=1208246929, epoch=117) to node 0: {}.
2022-05-19 18:35:33.896 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:35:33.896
2022-05-19 18:35:34.896 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:35:34.896
2022-05-19 18:35:35.896 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:35:35.896
2022-05-19 18:35:36.896 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:35:36.896
2022-05-19 18:35:36.908 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Discovered group coordinator 192.168.200.13:9092 (id: 2147483647 rack: null)
2022-05-19 18:35:36.913 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:35:36.913
2022-05-19 18:35:36.914 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cfd":"","jpzt":"RR","qfsj":"20220522160537","ddsj":"20220522215709","zjhm":"36102219750828743X","mdd":"KHN","hbh":"2297"}
2022-05-19 18:35:36.914 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-1C8F345445084E43B4BFA1ED546FD9C2
2022-05-19 18:35:36.918 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Attempt to heartbeat failed for since member id 1-482e3534-f142-4c54-a55e-439f5498a971 is not valid.
2022-05-19 18:35:36.918 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2022-05-19 18:35:36.918 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Lost previously assigned partitions gzyq-0
2022-05-19 18:35:36.919 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] (Re-)joining group
2022-05-19 18:35:36.919 ERROR 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Offset commit failed on partition gzyq-0 at offset 469865: The coordinator is not aware of this member.
2022-05-19 18:35:36.922 ERROR 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Offset commit with offsets {gzyq-0=OffsetAndMetadata{offset=469865, leaderEpoch=0, metadata=''}} failed
2022-05-19 18:35:36.923 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2022-05-19 18:35:36.923 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] (Re-)joining group
2022-05-19 18:35:41.450 INFO 7 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 18:35:36 topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 18:36:36.916 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Finished assignment for group at generation 686: {1-4bfc7969-b15b-4674-840b-00dc2a128894=Assignment(partitions=[gzyq-0])}
2022-05-19 18:36:36.917 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:36:36.916
2022-05-19 18:36:36.917 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Successfully joined group with generation 686
2022-05-19 18:36:36.918 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Adding newly assigned partitions: gzyq-0
2022-05-19 18:36:36.920 INFO 7 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Setting offset for partition gzyq-0 to the committed offset FetchPosition{offset=469854, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional.empty, epoch=0}}
2022-05-19 18:36:37.021 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:36:37.021
2022-05-19 18:36:37.022 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cfd":"","jpzt":"RR","qfsj":"20220522160537","ddsj":"20220522215709","zjhm":"36102219750828743X","mdd":"KHN","hbh":"2297"}
2022-05-19 18:36:37.022 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-1C8F345445084E43B4BFA1ED546FD9C2
2022-05-19 18:36:38.027 INFO 7 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 18:36:37 topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 18:36:38.027 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:36:38.027
2022-05-19 18:36:39.028 INFO 7 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 消息拉取时间:2022-05-19T18:36:39.028
半兽人 -> ✎ @#* 2年前

timeout没了,消费了之后,又调用producer()这个方法,你用了线程池,还休眠3秒,我觉得是这段逻辑写的有问题,如果你把自动提交改成每条提交一次,至少不会在报错了,如:

records.forEach(
record -> {
    String data =
            convertDataService.convertData(record.value());
    logger.info("解析后数据 : " + data);
    String topic = convertDataService.getTopic();
    logger.info("发送到topic: " + topic);
    producer(data, topic);

    consumer.commitAsync();   // 这里
});
✎ @#* -> 半兽人 2年前

收到,感谢大佬,我立马试试,
1、每次发送后提交
2、去除休眠
(当时休眠的原因是正式环境担心数据量太大,比较可能拉一次很多数据,但是发是一条一条的,担心kafka扛不住之类的原因)

✎ @#* -> ✎ @#* 2年前

改了之后:虽然钉钉还会在网络波动的时候报警,因为数据从生产到目标topic大概会用1分钟时间。但是再也没有重复消费情况了。感谢!

你的答案

查看kafka相关的其他问题或提一个您自己的问题