返回到文章

采纳

编辑于 5年前

kafka 死循环无限重置offset

kafka

这是spark submit程序显示的日志

20/06/28 15:39:40 INFO Executor: Running task 0.0 in stage 41.0 (TID 41) [Executor task launch worker for task 41]
20/06/28 15:39:40 INFO ShuffleBlockFetcherIterator: Getting 0 non-empty blocks out of 1 blocks [Executor task launch worker for task 41]
20/06/28 15:39:40 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 0 ms [Executor task launch worker for task 41]
20/06/28 15:39:50 INFO Fetcher: [Consumer clientId=consumer-1, groupId=headlinesRank] Resetting offset for partition nginx01-0 to offset 75. [JobGenerator]
20/06/28 15:39:50 INFO JobScheduler: Added jobs for time 1593329990000 ms [JobGenerator]
20/06/28 15:40:00 INFO Fetcher: [Consumer clientId=consumer-1, groupId=headlinesRank] Resetting offset for partition nginx01-0 to offset 75. [JobGenerator]
20/06/28 15:40:00 INFO JobScheduler: Added jobs for time 1593330000000 ms [JobGenerator]
20/06/28 15:40:10 INFO Fetcher: [Consumer clientId=consumer-1, groupId=headlinesRank] Resetting offset for partition nginx01-0 to offset 75. [JobGenerator]
20/06/28 15:40:10 INFO JobScheduler: Added jobs for time 1593330010000 ms [JobGenerator]
20/06/28 15:40:20 INFO Fetcher: [Consumer clientId=consumer-1, groupId=headlinesRank] Resetting offset for partition nginx01-0 to offset 75. [JobGenerator]
20/06/28 15:40:20 INFO JobScheduler: Added jobs for time 1593330020000 ms [JobGenerator]

kafka报的日志

[2020-06-28 14:54:46,288] INFO [GroupCoordinator 0]: Preparing to rebalance group test2 with old generation 1 (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-06-28 14:54:46,290] INFO [GroupCoordinator 0]: Group test2 with generation 2 is now empty (__consumer_offsets-38) (kafka.coordinator.group.GroupCoordinator)
[2020-06-28 14:57:17,527] INFO [GroupMetadataManager brokerId=0] Group test2 transitioned to Dead in generation 2 (kafka.coordinator.group.GroupMetadataManager)

为了防止出现rebalance起了 一个consumer 一个partition 和无备份 但是还是会出现死循环,kafka输出此日志后便没有多余之日输出 留下spark程序循环重置offset且无法消费

auto.offset.reset是 latest enable.auto.commit是 false group.id 是headlinesRank bootstrap.servers是host:port(测试是通的)

现象
是spark submit消费kafka无限重置offset 且查看consumer_group中无信息显示