返回到文章

采纳

编辑于

kafka重复消费了主题的所有分区的所有偏移量

kafka

消费端部署了大概七八台服务,设置了10个分区,每个消费端只启动了一个线程在循环消费,所有消费者服务器代码是一样的,也就是说消费者组是相同的。

观察了日志消费端从新消费了以前消费过的偏移量。如果是生产者重复发送了数据,消费端应该不会消费到到相同分区的相同偏移量。

public class Consumer implements  Runnable{
    private static Logger logger = LoggerFactory.getLogger(Consumer.class);
    private final KafkaConsumer<String, String> consumer;
    private final String topic;
    private static final String GROUPID = "groupA";
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    private boolean flag;

    public boolean isFlag() {
        return flag;
    }

    public void setFlag(boolean flag) {
        this.flag = flag;
    }

    public Consumer(String servers, String topicName, boolean flag) {
        Properties props = new Properties();
        props.put("bootstrap.servers", servers);
        props.put("group.id", GROUPID);
        props.put("enable.auto.commit", "false");
        props.put("auto.commit.interval.ms", "1000");
        props.put("session.timeout.ms", "30000");
        props.put("auto.offset.reset", "earliest");
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");//获取记录数
        props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");//处理消息最大间隔时间
        props.put("key.deserializer", StringDeserializer.class.getName());
        props.put("value.deserializer", StringDeserializer.class.getName());
        this.consumer = new KafkaConsumer<String, String>(props);
        this.topic = topicName;
        this.flag=flag;
        this.consumer.subscribe(Arrays.asList(topic));
    }

    @Override
    public void run() {
        logger.info("---------kafka消费端开始消费---------");
        Gson gson = new Gson();
        try {
            while(flag){
                ConsumerRecords<String, String> msgList = consumer.poll(1000);
                if(null!=msgList&&msgList.count()>0){
                    for (TopicPartition partition : msgList.partitions()) {
                        List<ConsumerRecord<String, String>> partitionRecords = msgList.records(partition);
                        for (ConsumerRecord<String, String> record : partitionRecords) {
                            long start = System.currentTimeMillis();
                            try {
                                consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
                            } catch (Exception e) {
                                logger.error("kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},commit time:{},error:{}", topic, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e);
                                break;
                            } catch (Throwable e) {
                                logger.error("fatal Error:kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},message:{},error:{}", topic, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e);
                                break;
                            }
                            String reqJson = record.value();
                            ProposalSaveFor3105RequestDto baseRequestDto = null;
                            String proposalNo = "";
                            try {
                                baseRequestDto = gson.fromJson(reqJson,ProposalSaveFor3105RequestDto.class);
                                proposalNo = baseRequestDto.getPolicyDataDto().getInsuredItemProcutDtoList().get(0).getInsuredDto().getProposalNo();
                                MDC.put("uuid", proposalNo);
                                logger.info("kafka拉取了投保单号:"+proposalNo+",分区编码:"+record.partition()+",偏移量:"+record.offset());
                            }catch (Exception e){
                                e.printStackTrace();
                                logger.error("kafka消费端异常---转换报文报错---报文内容:"+reqJson+",异常信息:",e);
                            }
                            if(baseRequestDto!=null){
                                try {
                                    logger.info("kafka开始调用业务,投保单号"+proposalNo);
                                    KafkaProposalSaveFor3105ServiceImpl service = new KafkaProposalSaveFor3105ServiceImpl();
                                    service.saveProposal(baseRequestDto);
                                    logger.info("kafka结束调用业务,投保单号"+proposalNo);
                                }catch (Exception e){
                                    e.printStackTrace();
                                    logger.error("kafka消费端业务逻辑执行异常---异常信息:",e);
                                }
                            }
                        }
                    }
                }
            }
        } catch (Exception e) {
            logger.error("kafka消费端消费信息时异常---异常信息:",e);
            e.printStackTrace();
        } finally {
            consumer.close();
        }
    }
}

日志:

 ts:2019-11-01 21:12:11.496 msg:kafka拉取了投保单号:9127264100190000072000,分区编码:6,偏移量:6 
 ts:2019-11-04 14:25:17.075 msg:kafka拉取了投保单号:9127264100190000072000,分区编码:6,偏移量:6 
 ts:2019-11-05 14:40:11.835 msg:kafka拉取了投保单号:9127264100190000072000,分区编码:6,偏移量:6 
观察日志,过了几天竟然重复去拉去了以前的消息,实在搞不懂是什么情况。我只写了消费端,所以只贴了消费端代码。