返回到文章

采纳

编辑于

kafka在异常断开某一台服务器,会导致重复消费

kafka

当我开三台服务器去消费消费一个topic的时候,当我断开某一台服务器消费,那么会导致重复消费。

我建立了一个topic,并且建立了3个分区,我循环了1000次发送消息,但是输出却统计了1002条消息:

public static void main(String args[]) {
    String  topic = "dyq";
    if(args.length >0) {
        topic = args[0];
    }

    Properties props = new Properties();
    props.put("bootstrap.servers", "xxxxxxx:9092");
    props.put("auto.commit.interval.ms", "false");
    props.put("auto.commit.interval.ms", "1000");
    props.put("session.timeout.ms", "30000");
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
    props.put("group.id", "test");


    KafkaConsumer<String, String> consumer = new org.apache.kafka.clients.consumer.KafkaConsumer<String, String>(props);
    List<String> subscribedTopics =  new ArrayList<String>();
    //如果需要订阅多个Topic,则在这里add进去即可
    //subscribedTopics.add(topic1);
    subscribedTopics.add(topic);
    consumer.subscribe(subscribedTopics);
    try {
        int index = 0;
        while(true) {
            ConsumerRecords<String, String> records = consumer.poll(100);
            for (TopicPartition partition : records.partitions()) {
                List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
                for (ConsumerRecord<String, String> record : partitionRecords) {
                    index++;
                }
                long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
                //数据库操作操作
                //....................
                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset - 1)));
                System.out.println("我现在准备入库操作了  || 累计入库数 【" + index +"】" );
            }
        }
    } finally {
      consumer.close();
    }
}