我在代码里produce,在Shell里consumer,为什么consumer不完整?只到767,不到999?
代码如下:
package com.lzm.demo.demo_kafka;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/*
@author @date 2016 1 11 4:18:56
*/
public class kafkaConsumer extends Thread {
private String[] topics;
public kafkaConsumer(String[] topics) {
super();
this.topics = topics;
}
@Override
public void run() {
Consumer consumer = createConsumer();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("key = %s, value = %s, offset = %d\n", record.key(), record.value(), record.offset());
}
}
}
private Consumer createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.14.237:9091,192.168.14.237:9092,192.168.14.237:9093");
props.put("group.id", "d");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test", "foo"));
return consumer;
}
public static void main(String[] args) {
new kafkaConsumer(new String[] { "testt", "bar" }).start();
}
}
就是拿不到消息