返回到文章

采纳

编辑于

kafka消费问题

kafka

提问说明

kafka 消费端 设置了 enable.auto.commit=true 自动提交间隔是 1秒钟, 如果一秒钟 我进行多次poll 会不会出现重复消费 ,目前出现了大量重复消费的数据

kafka-clients-0.9.0.0.jar

 public BmsKafkaConsumer(String kafkaUrl, String userName) {
        props = new Properties();
        props.put("group.id", UUID.nameUUIDFromBytes(userName.getBytes()).toString().replaceAll("-", ""));
        props.put("auto.commit.interval.ms", "1000");
        props.put("bootstrap.servers", kafkaUrl);
        props.put("enable.auto.commit", "true");
        props.put("session.timeout.ms", "30000");
        props.put("heartbeat.interval.ms", "29000");
        props.put("auto.offset.reset", "latest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        topic.add("BAYONET_VEHICLEPASS");

        lisenter = new BmsKafkaLisenter();
    }


    /**
     * 鎺ユ敹鏁版嵁
     */
    public void receive() {
        close();
        consumer = new KafkaConsumer<String, String>(props);
        consumer.subscribe(topic);
        isReceive = true;
        try {
            while (isReceive) {
                log.info("start get data--------------------------");
                ConsumerRecords<String, String> records = consumer.poll(100);
                log.info("records  size  is =======[{}]",records.count());
                for (ConsumerRecord<String, String> record : records) {

                    lisenter.receive(new String(record.value().getBytes(), Variable.KAFKA_CHARSET));
                }

            }
        } catch (Exception e) {
            log.error("鎺ユ敹杩囪溅鏁版嵁鏃跺嚭鐜板紓甯革紝鏈兘姝e父鎺ユ敹鏁版嵁", e);
            close();
        }
    }