返回到文章

采纳

编辑于

为什么kafka消费消息不完整?

kafka


为什么kafka消费消息不完整?






我在代码里produce,在Shell里consumer,为什么consumer不完整?只到767,不到999?



代码如下:






package com.lzm.demo.demo_kafka;

import java.util.Arrays;
import java.util.Properties;

import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;

/*
@author @date 2016 1 11 4:18:56
*/

public class kafkaConsumer extends Thread {

private String[] topics;

public kafkaConsumer(String[] topics) {
super();
this.topics = topics;
}

@Override
public void run() {

Consumer consumer = createConsumer();

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("key = %s, value = %s, offset = %d\n", record.key(), record.value(), record.offset());
}
}
}

private Consumer createConsumer() {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.14.237:9091,192.168.14.237:9092,192.168.14.237:9093");
props.put("group.id", "d");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("test", "foo"));
return consumer;
}

public static void main(String[] args) {
new kafkaConsumer(new String[] { "testt", "bar" }).start();
}

}


就是拿不到消息