第一次接触kafka,根据用例,有时候消费得到数据,并且消费到的数据offset为:900-1000,902-1002;之后的数据或之前的数据就消费不到了

Logan 发表于: 2020-11-03   最后更新时间: 2020-11-03 19:59:49   1,220 游览

kafka服务有九个节点

版本是1.1.0

执行kafka消费数据方法,刚开始是消费不到数据的;能够消费到数据时,只能消费到类似这种数据:offset为:900-1000;902-1002;0-900,1002往后的数据都消费不到。实际上的数据是大于1002条的

想问下出现这种情况是我消费代码写的有问题嘛?

执行消费方法如下:

Properties consumeProp = Config.getConsumeConfig();
consumeProp.put("ssl.truststore.location", Config.getTrustStorePath());
consumeProp.put("enable.auto.commit","false");
System.setProperty("java.security.auth.login.config", Config.getSaslConfig());
Consumer<String, String> consumer = new KafkaConsumer<String, String>(consumeProp);
consumer.subscribe(Arrays.asList(consumeProp.getProperty("topic")));
log.info("create consumer succ:"+consumeProp.getProperty("topic"));
ConsumerRecords<String, String> records = (ConsumerRecords<String, String>) consumer.poll(1000);
for (ConsumerRecord<String, String> record : records) {
  //业务逻辑处理
}
发表于 2020-11-03
添加评论

kafka消费数据是已消费组对应分区进行的,主要看对应主题有多少个分区和消费端有多少个消费者,看你描述应该是消费组里多个消费者

我现在是有一个topic。kafka服务端有九个节点,应该是对应九个分区,可以这样理解嘛?groupid也是一个,有多个调用方使用(不同公司),对应的节点ip有九个;topic不同,bootstrap.servers相同,groupid相同;这样去消费数据是互不影响的吧。我现在还碰到一种情况是kafka服务器上有个测试topic,我用同样的代码可以消费到测试topic里面的数据,正式环境下就消费不到了

你给的代码还不够充分,offset怎么提交的我没看到,建议你先参考下官方代码:
https://www.orchome.com/451

Logan -> 半兽人 4年前

好的,谢谢。我之前写的代码没有提交offset...。看到官方代码,是可以指定offset进行消费的,用seek(TopicPartition, long) 方法,我现在是有九个分区,是要九个分区分别重置下offset嘛?重置之后,已经被我消费的数据是可以再次被消费到嘛?

半兽人 -> Logan 4年前

业务这么复杂嘛,要自己提交offset。
1、offset是决定消费者从哪个位置进行消费的,9个分区,里面的数据都拥有自己的offset,所以你有几个,你就要自己控制几个。
2、offset重置之后,如果数据还在(默认保留7天),是可以重新消费的。

Logan -> 半兽人 4年前

那一般情况(简单的使用kafka消费功能)是不用自己提交offset嘛?对接方给了九个dms服务节点,我看打印出来的信息应该是9个分区,类似:topic_0,topic_1这种的。我这边看到这种警告:the configuration "topic" was supplied but isn't a known config 这种警告会影响程序消费数据嘛?

你的答案

查看kafka相关的其他问题或提一个您自己的问题