代码如下:
Properties props = new Properties();
props.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.setProperty(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100000");
props.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
props.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG
, "org.apache.kafka.common.serialization.StringDeserializer");
Consumer consumer = new KafkaConsumer(props);
consumer.subscribe(Arrays.asList("test"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
for (ConsumerRecord<String, String> record : records) {
System.out.println("consumer信息:" + "value:" + record.value() + "-" + "partition:" + record.partition() + "-" + "offset:" + record.offset());
}
}
consumer已启动,并且也已经接收到消息了
可以看见broker节点的信息了,但是没有group信息的