kafka 没有提交偏移量为什么还消费不到消息
我设置了手动提交偏移量,并且没有提交,但是为什么拉取不到消息,只能重启应用,或者重新加入group的时候才消费的到。
 Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        consumer.subscribe(Collections.singleton(topic));
        ConsumerRecords<String, String> records = null;
        while (true) {
            records = consumer.poll(Duration.ofMillis(8100));
            logger.info("size {}",records.count());
            records.forEach(record -> {
                boolean success = true;
                try {
                    OrderAction orderAction = processor.processor(new MQContent(record.value()));
                    if (orderAction == OrderAction.Suspend) {
                        success = false;
                    }
                } catch (Exception e) {
                    success = false;
                }
                if (success) {
                    //手动提交offset
                    TopicPartition topicPartition = new TopicPartition(record.topic(), record.partition());
                    consumer.commitSync();
                }
            });
        }
    }
 
        
是呀,kafka客户端已经将消息拉下来了,也丢给你了,你不提交offset,那客户端也不会重新去拉取了,它认为你在处理中。
哪有什么好点的重试策略吗
kafka客户端负责把消息丢给你的业务层,这个时候,该消息必须被提交,无论你业务层是否成功,而且kafka是基于offset下标的,如果后面的提交也会跳过之前的消息。另外设计第一原则是不要侵入到业务层。
如果业务处理这条消息失败,重新丢会队列或者其他的处理就可以了,简单不复杂。
感谢
你的答案