返回到文章

采纳

编辑于 4年前

第一次接触kafka,根据用例,有时候消费得到数据,并且消费到的数据offset为:900-1000,902-1002;之后的数据或之前的数据就消费不到了

kafka

kafka服务有九个节点

版本是1.1.0

执行kafka消费数据方法,刚开始是消费不到数据的;能够消费到数据时,只能消费到类似这种数据:offset为:900-1000;902-1002;0-900,1002往后的数据都消费不到。实际上的数据是大于1002条的

想问下出现这种情况是我消费代码写的有问题嘛?

执行消费方法如下:

Properties consumeProp = Config.getConsumeConfig();
consumeProp.put("ssl.truststore.location", Config.getTrustStorePath());
consumeProp.put("enable.auto.commit","false");
System.setProperty("java.security.auth.login.config", Config.getSaslConfig());
Consumer<String, String> consumer = new KafkaConsumer<String, String>(consumeProp);
consumer.subscribe(Arrays.asList(consumeProp.getProperty("topic")));
log.info("create consumer succ:"+consumeProp.getProperty("topic"));
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
  //业务逻辑处理
}