当我开三台服务器去消费消费一个topic的时候,当我断开某一台服务器消费,那么会导致重复消费。
我建立了一个topic,并且建立了3个分区,我循环了1000次发送消息,但是输出却统计了1002条消息:
public static void main(String args[]) {
String topic = "dyq";
if(args.length >0) {
topic = args[0];
}
Properties props = new Properties();
props.put("bootstrap.servers", "xxxxxxx:9092");
props.put("auto.commit.interval.ms", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("group.id", "test");
KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
List<String> subscribedTopics = new ArrayList<String>();
//如果需要订阅多个Topic,则在这里add进去即可
//subscribedTopics.add(topic1);
subscribedTopics.add(topic);
consumer.subscribe(subscribedTopics);
try {
int index = 0;
while(true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
index++;
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
//数据库操作操作
//....................
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset - 1)));
System.out.println("我现在准备入库操作了 || 累计入库数 【" + index +"】" );
}
}
} finally {
consumer.close();
}
}