你看到的是缓存,因为kakfa并不知道消费者组MECH_VERIFY_LEGAL_group_2
已经不在订阅MERCHANT_REPORT
主题了,所以一直计数,因为如果你再次消费的话,只消费积压的消息。而不需要重头消费。
bin/kafka-consumer-groups.sh --bootstrap-server localhost:9092 --describe --group my-group
用该命令查看kafka指定消费组的消费情况。
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
MANUAL_IMMEDIATE 这个提交的方式虽然是消费一次提交一次 但是kafka每一批都会拉取max.poll.records(默认500条数据) 比如说在消费第300条数据是手动提交的时候 (1-300条数据)消费的总时长超过了max.poll.interval.ms 就会被踢出消费者组
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
MANUAL_IMMEDIATE 这个提交的方式虽然是消费一次提交一次 但是kafka每一批都会拉取max.poll.records(默认500条数据) 比如说在消费第300条数据是手动提交的时候 (1-300条数据)消费的总时长超过了max.poll.interval.ms 就会被踢出消费者组
//设置提交偏移量的方式, MANUAL_IMMEDIATE 表示消费一条提交一次;MANUAL表示批量提交一次
factory.getContainerProperties().setAckMode((ContainerProperties.AckMode.MANUAL_IMMEDIATE));
MANUAL_IMMEDIATE 这个提交的方式虽然是消费一次提交一次 但是kafka每一批都会拉取max.poll.records(默认500条数据) 比如说在消费第300条数据是手动提交的时候 (1-300条数据)消费的总时长超过了max.poll.interval.ms 就会被踢出消费者组
消费者组列表里,显示的名为ConcurrencyConsumer1
的消费者组有2台,
1台ip是192.168.4.11
和192.168.15.2
,所以每个占2个分区,是正常的。
看你的问题,你是想消费全部分区,
所以必须消费者组名不同,否则就会认定为同一个消费者组,消息就会被平均分配。