用的是0.10.1.1的API中最简单的第一个例子
kafka版本0.10.1.1
生产者可以生产消息,用命令行也是可以消费消息的,但就是在代码中无法消费
运行发现ConsumerRecords都获取到了,但是接下来的
for (ConsumerRecord
进不去了
不知道问题出在哪里了???望大神指点一下
具体的consumer代码,如下:
public class MyConsumer {
public static void main(String[] args) throws IOException {
Consumer<String, String> consumer = KFKUtil.getConsumer();
consumer.subscribe(Arrays.asList("my-topic"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(500);
System.out.println(records);
for (ConsumerRecord<String, String> record : records) {
System.out.println("Message: " + record.value());
System.out.println("Fetche message from partition " + record.partition() + ",offset = " + record.offset());
// consumer.commitSync();//手动提交偏移量
}
}
}
}