为什么kafka消费消息不完整?
	
我在代码里produce,在Shell里consumer,为什么consumer不完整?只到767,不到999?
代码如下:
	
package com.lzm.demo.demo_kafka;
import java.util.Arrays;
import java.util.Properties;
import org.apache.kafka.clients.consumer.Consumer;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
/**
 * 
 * @author       
 * @date 2016  1  11       4:18:56
 */
public class kafkaConsumer extends Thread {
	private String[] topics;
	public kafkaConsumer(String[] topics) {
		super();
		this.topics = topics;
	}
	@Override
	public void run() {
		Consumer consumer = createConsumer();
		while (true) {
			ConsumerRecords<String, String> records = consumer.poll(100);
			for (ConsumerRecord<String, String> record : records) {
				System.out.printf("key = %s, value = %s, offset = %d\n", record.key(), record.value(), record.offset());
			}
		}
	}
	private Consumer createConsumer() {
		Properties props = new Properties();
		props.put("bootstrap.servers", "192.168.14.237:9091,192.168.14.237:9092,192.168.14.237:9093");
		props.put("group.id", "d");
		props.put("enable.auto.commit", "true");
		props.put("auto.commit.interval.ms", "1000");
		props.put("session.timeout.ms", "30000");
		props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
		consumer.subscribe(Arrays.asList("test", "foo"));
		return consumer;
	}
	public static void main(String[] args) {
		new kafkaConsumer(new String[] { "testt", "bar" }).start();
	}
}
就是拿不到消息

 
        
在程序main方法中,增加休眠,
Thread.sleep(2000);
因为kafka发送的消息,还在缓存中,还没发送。
而主进程跑完结束了,导致线程终结,消息丢失。
楼主集群中kafka的版本是多少?我的代码跟你一样,但是老报错
你的答案