客户端版本0.10.2.1-2.11(本地代码)
服务端版本0.8.2.1-2.10(192.168.137.131)
package com.yu.test;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.ConsumerTimeoutException;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
public class HighLevelConsumerTest extends Thread {
public int sendPersize = 1;
public String consumerName = "test_consumer03051";
public String topic = "ywl_test";
public Map<String, Integer> topicCountMap;
public ConsumerConfig config;
public ConsumerConnector consumer;
public Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = null;
public KafkaStream<byte[], byte[]> stream = null;
public ConsumerIterator<byte[], byte[]> it = null;
public List<byte[]> bytes;
public Properties properties;
private AtomicInteger sendCount;
public HighLevelConsumerTest() {
initProperties();
topicCountMap = new HashMap<String, Integer>();
topicCountMap.put(topic, new Integer(1));
config = new ConsumerConfig(properties);
sendCount = new AtomicInteger(0);
}
public void initProperties() {
properties = new Properties();
properties.put("zookeeper.connect", "192.168.137.131:2181");
properties.put("group.id", "tes11t1");
properties.put("auto.commit.enable", "false");
properties.put("zookeeper.session.timeout.ms", "4000");
properties.put("zookeeper.sync.time.ms", "200");
properties.put("consumer.timeout.ms", "10000");
}
public static void main(String[] args) throws Exception {
new HighLevelConsumerTest().start();
}
@Override
public void run() {
while (true) {
System.out.println(consumerName + " start to consumer Messages.");
bytes = new ArrayList<byte[]>();
try {
consumer = Consumer.createJavaConsumerConnector(config);
consumerMap = consumer.createMessageStreams(topicCountMap);
stream = consumerMap.get(topic).get(0);
it = stream.iterator();
while (true) {
try {
while (it.hasNext()) {
bytes.add(it.next().message());
if (bytes.size() >= 8) {
submitMessage(bytes);
}
}
} catch (ConsumerTimeoutException e) {
System.out.println("------------------------ConsumerTimeoutException--------------------------------------------");
if (bytes.size() > 0) {
submitMessage(bytes);
}
System.out.println(consumerName + " consumer Messages time out for 10 seconds, and will re-connect after 1 minute.");
}
}
} catch (Exception e) {
e.printStackTrace();
close();
sleepSeconds(60);
}
}
}
public void submitMessage(List<byte[]> bytes) {
int size = bytes.size();
if (exceute(bytes)) {
consumer.commitOffsets();
sendCount.addAndGet(size);
System.out.println(sendCount.get());
bytes.clear();
}
}
public boolean exceute(List<byte[]> bytes) {
for (byte[] str : bytes) {
System.out.println(str);
}
return true;
}
public void sleepSeconds(int seconds) {
try {
sleep(seconds * 1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public void close() {
if (consumer != null) {
consumer.shutdown();
consumer = null;
}
if (it != null && it.hasNext()) {
it.clearCurrentChunk();
it = null;
}
if (stream != null && stream.size() > 0) {
stream.clear();
stream = null;
}
if (consumerMap != null) {
consumerMap = null;
}
}
}