观察了日志消费端从新消费了以前消费过的偏移量。如果是生产者重复发送了数据,消费端应该不会消费到到相同分区的相同偏移量。
public class Consumer implements Runnable{
private static Logger logger = LoggerFactory.getLogger(Consumer.class);
private final KafkaConsumer<String, String> consumer;
private final String topic;
private static final String GROUPID = "groupA";
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
private boolean flag;
public boolean isFlag() {
return flag;
}
public void setFlag(boolean flag) {
this.flag = flag;
}
public Consumer(String servers, String topicName, boolean flag) {
Properties props = new Properties();
props.put("bootstrap.servers", servers);
props.put("group.id", GROUPID);
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("auto.offset.reset", "earliest");
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "1");//获取记录数
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "300000");//处理消息最大间隔时间
props.put("key.deserializer", StringDeserializer.class.getName());
props.put("value.deserializer", StringDeserializer.class.getName());
this.consumer = new KafkaConsumer<String, String>(props);
this.topic = topicName;
this.flag=flag;
this.consumer.subscribe(Arrays.asList(topic));
}
@Override
public void run() {
logger.info("---------kafka消费端开始消费---------");
Gson gson = new Gson();
try {
while(flag){
ConsumerRecords<String, String> msgList = consumer.poll(1000);
if(null!=msgList&&msgList.count()>0){
for (TopicPartition partition : msgList.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = msgList.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
long start = System.currentTimeMillis();
try {
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(record.offset() + 1)));
} catch (Exception e) {
logger.error("kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},commit time:{},error:{}", topic, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e);
break;
} catch (Throwable e) {
logger.error("fatal Error:kafka is timeout since maybe business code processing to low,topicName:{},currentName:{},message:{},error:{}", topic, Thread.currentThread().getName(), (System.currentTimeMillis() - start), record.value(), e);
break;
}
String reqJson = record.value();
ProposalSaveFor3105RequestDto baseRequestDto = null;
String proposalNo = "";
try {
baseRequestDto = gson.fromJson(reqJson,ProposalSaveFor3105RequestDto.class);
proposalNo = baseRequestDto.getPolicyDataDto().getInsuredItemProcutDtoList().get(0).getInsuredDto().getProposalNo();
MDC.put("uuid", proposalNo);
logger.info("kafka拉取了投保单号:"+proposalNo+",分区编码:"+record.partition()+",偏移量:"+record.offset());
}catch (Exception e){
e.printStackTrace();
logger.error("kafka消费端异常---转换报文报错---报文内容:"+reqJson+",异常信息:",e);
}
if(baseRequestDto!=null){
try {
logger.info("kafka开始调用业务,投保单号"+proposalNo);
KafkaProposalSaveFor3105ServiceImpl service = new KafkaProposalSaveFor3105ServiceImpl();
service.saveProposal(baseRequestDto);
logger.info("kafka结束调用业务,投保单号"+proposalNo);
}catch (Exception e){
e.printStackTrace();
logger.error("kafka消费端业务逻辑执行异常---异常信息:",e);
}
}
}
}
}
}
} catch (Exception e) {
logger.error("kafka消费端消费信息时异常---异常信息:",e);
e.printStackTrace();
} finally {
consumer.close();
}
}
}
日志:
ts:2019-11-01 21:12:11.496 msg:kafka拉取了投保单号:9127264100190000072000,分区编码:6,偏移量:6
ts:2019-11-04 14:25:17.075 msg:kafka拉取了投保单号:9127264100190000072000,分区编码:6,偏移量:6
ts:2019-11-05 14:40:11.835 msg:kafka拉取了投保单号:9127264100190000072000,分区编码:6,偏移量:6
观察日志,过了几天竟然重复去拉去了以前的消息,实在搞不懂是什么情况。我只写了消费端,所以只贴了消费端代码。