返回到文章

采纳

编辑于 2年前

疑似kafka broker不稳定导致提交offset失败,导致重复消费

kafka

背景:有一个py定时任务自动化测试程序,每隔5分钟产生几条数据到指定topic:gzyq。 有个转换分流程序:一直运行,将gzyq的数据处理后分发到其他指定topic。py任务会检查,如果如果生产的数据没有在规定时间内到分流后指定topic(大概要求半分钟的样子),则通过钉钉机器人提醒我。

问题:最近几天才收到提醒,查看后发现数据确实分流到了,但是时间超过了半分钟,且重复了,代码一直跑了一个月没问题,而且出现的频率是一天大概5、6回,有时间隔一个小时左右。

求救:暂不知道是什么原因导致?难道是broker不稳定啥的?

这是报错提示,有点意思,上一秒还好好的,突然就断了:

9:50分给我发了提醒:“扈忠花”附近这几条数据没在规定时间到达topic,查看后发现重复了

2022-05-19 09:45:22.265 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"208767367384","sfzh":"530521198806021572","xm":"宿祥峰","tgkkmc":"东台站十五口(验检自助)","tgsj":"20220518151815","mz":"傣族"}
2022-05-19 09:45:22.265 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
2022-05-19 09:45:23.267 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:45:22topic :xds-TL-5C2DE02081DD4D05A49357D7A402548F
2022-05-19 09:49:24.092 INFO 8 --- [-thread | demo2] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Group coordinator 192.168.200.13:9092 (id: 2147483647 rack: null) is unavailable or invalid, will attempt rediscovery
2022-05-19 09:49:24.192 ERROR 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Offset commit with offsets {gzyq-0=OffsetAndMetadata{offset=468637, leaderEpoch=0, metadata=''}} failed
org.apache.kafka.clients.consumer.RetriableCommitFailedException: Offset commit failed with a retriable exception. You should retry committing the latest consumed offsets.
Caused by: org.apache.kafka.common.errors.DisconnectException: null
2022-05-19 09:49:44.818 INFO 8 --- [ceiveExecutor-1] o.a.kafka.clients.FetchSessionHandler : [Consumer clientId=1, groupId=demo2] Error sending fetch request (sessionId=746331934, epoch=2428) to node 0: {}.
org.apache.kafka.common.errors.DisconnectException: null
2022-05-19 09:50:48.843 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Discovered group coordinator 192.168.200.13:9092 (id: 2147483647 rack: null)
开始解析数据:{'area': '', 'code': '1C8F345445084E43B4BFA1ED546FD9C2', 'data': {'cfd': '', 'jpzt': 'RR', 'qfsj': '20220520004255', 'ddsj': '20220520054856', 'zjhm': '430382200407028548', 'mdd': 'HRB', 'hbh': '5687'}, 'district': '5100', 'rowkey': 'f1872cb66bc75744d3c0aa62ad3d6d94', 'id': '430382200407028548', 'time': '20220520054856', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cfd":"","jpzt":"RR","qfsj":"20220520004255","ddsj":"20220520054856","zjhm":"430382200407028548","mdd":"HRB","hbh":"5687"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-1C8F345445084E43B4BFA1ED546FD9C2
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'MFM', 'psr_chnname': '印华海', 'cert_no': '520303199701089502', 'seg_dept_code': 'SJW', 'sta_depttm': '20220516195831', 'flt_airlcode': 'CDC', 'flt_number': '1698', 'sta_arvetm': '20220516223540'}, 'district': '5100', 'rowkey': '2e4f5983022960fc4971ff59955e6273', 'id': '520303199701089502', 'time': '20220516223540', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"MFM","psr_chnname":"印华海","cert_no":"520303199701089502","seg_dept_code":"SJW","sta_depttm":"20220516195831","flt_airlcode":"CDC","flt_number":"1698","sta_arvetm":"20220516223540"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'XMN', 'psr_chnname': '扈忠花', 'cert_no': '653227197506160578', 'seg_dept_code': 'WDS', 'sta_depttm': '20220518054348', 'flt_airlcode': 'DKH', 'flt_number': '5032', 'sta_arvetm': '20220518092523'}, 'district': '5100', 'rowkey': '7a4cf8d61c2701bd68b87697a5aea4b3', 'id': '653227197506160578', 'time': '20220518092523', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"XMN","psr_chnname":"扈忠花","cert_no":"653227197506160578","seg_dept_code":"WDS","sta_depttm":"20220518054348","flt_airlcode":"DKH","flt_number":"5032","sta_arvetm":"20220518092523"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'YSQ', 'psr_chnname': '言少佳', 'cert_no': '422823201807096533', 'seg_dept_code': 'KWE', 'sta_depttm': '20220518181015', 'flt_airlcode': 'CZ', 'flt_number': '8387', 'sta_arvetm': '20220518220523'}, 'district': '5100', 'rowkey': '3a9dfc201fb7313a3e54640047998dda', 'id': '422823201807096533', 'time': '20220518220523', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"YSQ","psr_chnname":"言少佳","cert_no":"422823201807096533","seg_dept_code":"KWE","sta_depttm":"20220518181015","flt_airlcode":"CZ","flt_number":"8387","sta_arvetm":"20220518220523"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '86E69ACE27AA428DB8C7ED4DEAC746BA', 'data': {'cc': 'D1289', 'ccrq': '20220520', 'spsj': '20220517125414', 'cxh': '04', 'fz': '哈密', 'xm': '柯超美', 'dz': '三水南', 'zwh': '04C'}, 'district': '5100', 'rowkey': 'fd6118d244c5368969f88e88775d4aa3', 'id': '411503199007077380', 'time': '20220517125414', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cc":"D1289","ccrq":"20220520","spsj":"20220517125414","cxh":"04","fz":"哈密","xm":"柯超美","dz":"三水南","zwh":"04C"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-86E69ACE27AA428DB8C7ED4DEAC746BA
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '598542631634', 'sfzh': '341202199611210562', 'xm': '富维焕', 'tgkkmc': '盖州西站四口(验检半自助)', 'tgsj': '20220517035354', 'mz': '哈萨克族'}, 'district': '5100', 'rowkey': 'cc2a1cf44087dcf94ccbbaed120050aa', 'id': '341202199611210562', 'time': '20220517035354', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"598542631634","sfzh":"341202199611210562","xm":"富维焕","tgkkmc":"盖州西站四口(验检半自助)","tgsj":"20220517035354","mz":"哈萨克族"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '711236198700', 'sfzh': '130104199312167602', 'xm': '黎上游', 'tgkkmc': '吉文站三口(验检半自助)', 'tgsj': '20220516113903', 'mz': '裕固族'}, 'district': '5100', 'rowkey': 'c3e77da649aacc02b795447b8db11f45', 'id': '130104199312167602', 'time': '20220516113903', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"711236198700","sfzh":"130104199312167602","xm":"黎上游","tgkkmc":"吉文站三口(验检半自助)","tgsj":"20220516113903","mz":"裕固族"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '484264587151', 'sfzh': '230421198201147221', 'xm': '司徒民英', 'tgkkmc': '哈尔滨北站三口(验检半自助)', 'tgsj': '20220516184843', 'mz': '满族'}, 'district': '5100', 'rowkey': '3c16e5e717170b1cd47fd095a934b6aa', 'id': '230421198201147221', 'time': '20220516184843', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"484264587151","sfzh":"230421198201147221","xm":"司徒民英","tgkkmc":"哈尔滨北站三口(验检半自助)","tgsj":"20220516184843","mz":"满族"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '804167097290', 'sfzh': '140921197705152455', 'xm': '居宏烈', 'tgkkmc': '双丰站九口(自助)', 'tgsj': '20220517094244', 'mz': '布依族'}, 'district': '5100', 'rowkey': '7019b59cdb6d6a7ac11739db70124ee6', 'id': '140921197705152455', 'time': '20220517094244', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"804167097290","sfzh":"140921197705152455","xm":"居宏烈","tgkkmc":"双丰站九口(自助)","tgsj":"20220517094244","mz":"布依族"}
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Attempt to heartbeat failed for since member id 1-4a849c1a-5b7f-4d89-96b4-fa239dab7901 is not valid.
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Giving away all assigned partitions as lost since generation has been reset,indicating that consumer is no longer part of the group
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Lost previously assigned partitions gzyq-0
2022-05-19 09:50:48.844 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] (Re-)joining group
2022-05-19 09:50:48.845 ERROR 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Offset commit failed on partition gzyq-0 at offset 468646: The coordinator is not aware of this member.
2022-05-19 09:50:48.845 ERROR 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Offset commit with offsets {gzyq-0=OffsetAndMetadata{offset=468646, leaderEpoch=0, metadata=''}} failed
org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1206) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1133) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1096) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1076) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233) ~[kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1308) [kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1248) [kafka-clients-2.5.0.jar!/:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1216) [kafka-clients-2.5.0.jar!/:na]
at com.trs.nj.kafka.xdsconverter.serivce.impl.KafkaServiceImpl.lambda$consumer$3(KafkaServiceImpl.java:113) [classes!/:0.0.1-SNAPSHOT]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142) ~[na:1.8.0_102]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617) ~[na:1.8.0_102]
at java.lang.Thread.run(Thread.java:745) ~[na:1.8.0_102]
2022-05-19 09:50:48.845 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Join group failed with org.apache.kafka.common.errors.MemberIdRequiredException: The group member needs to have a valid member id before actually entering a consumer group
2022-05-19 09:50:48.846 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] (Re-)joining group
2022-05-19 09:50:49.345 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Finished assignment for group at generation 636: {1-37010865-f09f-4735-a516-964d71d4b85b=Assignment(partitions=[gzyq-0])}
2022-05-19 09:50:49.346 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.AbstractCoordinator : [Consumer clientId=1, groupId=demo2] Successfully joined group with generation 636
2022-05-19 09:50:49.346 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Adding newly assigned partitions: gzyq-0
2022-05-19 09:50:49.347 INFO 8 --- [ceiveExecutor-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=1, groupId=demo2] Setting offset for partition gzyq-0 to the committed offset FetchPosition{offset=468637, offsetEpoch=Optional[0], currentLeader=LeaderAndEpoch{leader=Optional.empty, epoch=0}}
开始解析数据:{'area': '', 'code': '1C8F345445084E43B4BFA1ED546FD9C2', 'data': {'cfd': '', 'jpzt': 'RR', 'qfsj': '20220520004255', 'ddsj': '20220520054856', 'zjhm': '430382200407028548', 'mdd': 'HRB', 'hbh': '5687'}, 'district': '5100', 'rowkey': 'f1872cb66bc75744d3c0aa62ad3d6d94', 'id': '430382200407028548', 'time': '20220520054856', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cfd":"","jpzt":"RR","qfsj":"20220520004255","ddsj":"20220520054856","zjhm":"430382200407028548","mdd":"HRB","hbh":"5687"}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-1C8F345445084E43B4BFA1ED546FD9C2
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'MFM', 'psr_chnname': '印华海', 'cert_no': '520303199701089502', 'seg_dept_code': 'SJW', 'sta_depttm': '20220516195831', 'flt_airlcode': 'CDC', 'flt_number': '1698', 'sta_arvetm': '20220516223540'}, 'district': '5100', 'rowkey': '2e4f5983022960fc4971ff59955e6273', 'id': '520303199701089502', 'time': '20220516223540', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"MFM","psr_chnname":"印华海","cert_no":"520303199701089502","seg_dept_code":"SJW","sta_depttm":"20220516195831","flt_airlcode":"CDC","flt_number":"1698","sta_arvetm":"20220516223540"}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'XMN', 'psr_chnname': '扈忠花', 'cert_no': '653227197506160578', 'seg_dept_code': 'WDS', 'sta_depttm': '20220518054348', 'flt_airlcode': 'DKH', 'flt_number': '5032', 'sta_arvetm': '20220518092523'}, 'district': '5100', 'rowkey': '7a4cf8d61c2701bd68b87697a5aea4b3', 'id': '653227197506160578', 'time': '20220518092523', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:49.447 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"XMN","psr_chnname":"扈忠花","cert_no":"653227197506160578","seg_dept_code":"WDS","sta_depttm":"20220518054348","flt_airlcode":"DKH","flt_number":"5032","sta_arvetm":"20220518092523"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '083883B9FAA44C00BDEA8A9353B0A045', 'data': {'seg_dest_code': 'YSQ', 'psr_chnname': '言少佳', 'cert_no': '422823201807096533', 'seg_dept_code': 'KWE', 'sta_depttm': '20220518181015', 'flt_airlcode': 'CZ', 'flt_number': '8387', 'sta_arvetm': '20220518220523'}, 'district': '5100', 'rowkey': '3a9dfc201fb7313a3e54640047998dda', 'id': '422823201807096533', 'time': '20220518220523', 'type': '1', 'event': 'MH'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"seg_dest_code":"YSQ","psr_chnname":"言少佳","cert_no":"422823201807096533","seg_dept_code":"KWE","sta_depttm":"20220518181015","flt_airlcode":"CZ","flt_number":"8387","sta_arvetm":"20220518220523"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-MH-083883B9FAA44C00BDEA8A9353B0A045
开始解析数据:{'area': '', 'code': '86E69ACE27AA428DB8C7ED4DEAC746BA', 'data': {'cc': 'D1289', 'ccrq': '20220520', 'spsj': '20220517125414', 'cxh': '04', 'fz': '哈密', 'xm': '柯超美', 'dz': '三水南', 'zwh': '04C'}, 'district': '5100', 'rowkey': 'fd6118d244c5368969f88e88775d4aa3', 'id': '411503199007077380', 'time': '20220517125414', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"cc":"D1289","ccrq":"20220520","spsj":"20220517125414","cxh":"04","fz":"哈密","xm":"柯超美","dz":"三水南","zwh":"04C"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-86E69ACE27AA428DB8C7ED4DEAC746BA
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '598542631634', 'sfzh': '341202199611210562', 'xm': '富维焕', 'tgkkmc': '盖州西站四口(验检半自助)', 'tgsj': '20220517035354', 'mz': '哈萨克族'}, 'district': '5100', 'rowkey': 'cc2a1cf44087dcf94ccbbaed120050aa', 'id': '341202199611210562', 'time': '20220517035354', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"598542631634","sfzh":"341202199611210562","xm":"富维焕","tgkkmc":"盖州西站四口(验检半自助)","tgsj":"20220517035354","mz":"哈萨克族"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '711236198700', 'sfzh': '130104199312167602', 'xm': '黎上游', 'tgkkmc': '吉文站三口(验检半自助)', 'tgsj': '20220516113903', 'mz': '裕固族'}, 'district': '5100', 'rowkey': 'c3e77da649aacc02b795447b8db11f45', 'id': '130104199312167602', 'time': '20220516113903', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"711236198700","sfzh":"130104199312167602","xm":"黎上游","tgkkmc":"吉文站三口(验检半自助)","tgsj":"20220516113903","mz":"裕固族"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '484264587151', 'sfzh': '230421198201147221', 'xm': '司徒民英', 'tgkkmc': '哈尔滨北站三口(验检半自助)', 'tgsj': '20220516184843', 'mz': '满族'}, 'district': '5100', 'rowkey': '3c16e5e717170b1cd47fd095a934b6aa', 'id': '230421198201147221', 'time': '20220516184843', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"484264587151","sfzh":"230421198201147221","xm":"司徒民英","tgkkmc":"哈尔滨北站三口(验检半自助)","tgsj":"20220516184843","mz":"满族"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
开始解析数据:{'area': '', 'code': '5C2DE02081DD4D05A49357D7A402548F', 'data': {'tgkkdm': '804167097290', 'sfzh': '140921197705152455', 'xm': '居宏烈', 'tgkkmc': '双丰站九口(自助)', 'tgsj': '20220517094244', 'mz': '布依族'}, 'district': '5100', 'rowkey': '7019b59cdb6d6a7ac11739db70124ee6', 'id': '140921197705152455', 'time': '20220517094244', 'type': '1', 'event': 'TL'}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 解析后数据 : {"tgkkdm":"804167097290","sfzh":"140921197705152455","xm":"居宏烈","tgkkmc":"双丰站九口(自助)","tgsj":"20220517094244","mz":"布依族"}
2022-05-19 09:50:49.448 INFO 8 --- [ceiveExecutor-1] c.t.n.k.x.serivce.KafkaService : 发送到topic: xds-TL-5C2DE02081DD4D05A49357D7A402548F
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:48topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:48topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:48topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:49topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045
2022-05-19 09:50:49.847 INFO 8 --- [ad | producer-1] c.t.n.k.x.serivce.KafkaService : 开始发送: 时间 :2022-05-19 09:50:49topic :xds-MH-083883B9FAA44C00BDEA8A9353B0A045

尝试:代码我尝试改成手动提交了,还是有同样问题:


    @PostConstruct
    private void init() {
        Properties props = new Properties();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, config);
        props.put(ProducerConfig.RETRIES_CONFIG, 3); // 发送失败时,重新发送消息次数
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1000); // 批量发送消息的间隔时间
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 1000000); // 生产者缓存消息的内存字节数
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        // props.put(ProducerConfig.PARTITIONER_CLASS_CONFIG)
        producer = new KafkaProducer<>(props);
    }

    public void producer(String data, String topic) {
        sendExecutor.execute(
                () -> {
                    try {
                        producer.send(
                                new ProducerRecord<>(topic, data),
                                ((metadata, exception) -> {
                                    SimpleDateFormat simpleDateFormat =
                                            new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                                    logger.info(
                                            "开始发送: 时间 :"
                                                    + simpleDateFormat.format(
                                                            new Date(metadata.timestamp()))
                                                    + "  topic :"
                                                    + topic);
                                }));
                        Thread.sleep(3000);
                    } catch (Exception e) {
                        logger.error("error : mq producer thread exception", e);
                    }
                });
    }

    @Override
    public void consumer() {
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, config);
        props.put(ConsumerConfig.GROUP_ID_CONFIG, groupIdConfig);
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, "1");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 100);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
        KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
        consumer.subscribe(Stream.of(consumerTopic).collect(Collectors.toList()));

        receiveExecutor.execute(
                () -> {
                    try {
                        while (true) {
                            ConsumerRecords<String, String> records =
                                    consumer.poll(Duration.ofSeconds(1L));
                            records.forEach(
                                    record -> {
                                        String data =
                                                convertDataService.convertData(record.value());
                                        logger.info("解析后数据 : " + data);
                                        String topic = convertDataService.getTopic();
                                        logger.info("发送到topic: " + topic);
                                        producer(data, topic);
                                    });
                            consumer.commitAsync();
                        }
                    } catch (Exception e) {
                        e.printStackTrace();
                    } finally {
                        try {
                            consumer.commitSync();
                        } finally {
                            consumer.close();
                        }
                    }
                });
    }

而且处理过程时间很短,就是根据内容生成topic直接发出去:

@Override
    public String convertData(String value) {
        System.out.println("开始解析数据:" + value);
        JSONObject jsonObject = JSON.parseObject(value);
        String eventCode = jsonObject.getString("code");
        topic = "xds-" + map.get(eventCode) + "-" + eventCode;
        return jsonObject.getJSONObject("data").toJSONString();
    }

screenshot

screenshot

不知道啥原因了,而且还有一个程序去消费这个分流后的数据:也是重复了:

screenshot