有后续吗 我遇到跟你一样的问题
 
                        
                        
                    
                    
                    
                    
                    
                    
                package com.ai.linkgroup.statistics.mq;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ai.linkgroup.statistics.util.ZipUtils;
import com.common.system.mq.serializer.StringSerializer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
@author blueskyevil->70942 2017年9月21日
/
public class KafkaReceiver implements Watcher
{
 private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
 private ConsumerConnector consumer=null;
 // param
 private String zkConfig;
 private String group;
 private int consumers;
 private KeeperState stateValue;
 private ConsumerConfig cc;
 // 主题对应的消息队列
 private Map
 // 主题对应的消息队列
 private Map
 public void init() //consumer需要连接zookper
 {//https://blog.csdn.net/u010463032/article/details/78892858
 Properties props = new Properties();
 props.put("zookeeper.connect", zkConfig);//指定了zookpeer的connect ,以hostname:port形式,就是zookeeper各节点的hostname和port,为防止某个挂掉,可以指定多个connect string
 props.put("group.id", group);//指定了consumer的group名字,group名一样的进程属于同一个consumer group
 props.put("consumer.numbers", consumers);
 props.put("auto.commit.enable", "true");
 props.put("auto.commit.interval.ms", "60000");
 props.put("derializer.class", "kafka.serializer.DefaultDecoder");
 //新增参数  
 props.put("zookeeper.session.timeout.ms", "70000");//socket请求超时时间,默认值是30*1000
 props.put("rebalance.backoff.ms", "20000");
 props.put("rebalance.max.retries", "10");
 props.put("zookeeper.connection.timeout.ms", "30000");//zookper的session超时时间,没有收到心跳,则认为server挂掉了,设置过低,会被误认为挂了,如果设置过高真的挂了,很长时间才被server得知
 cc = new ConsumerConfig(props);
 topicMap = new ConcurrentHashMap<String, BlockingQueue<String>>();
 topicConsumers = new ConcurrentHashMap<String, ConsumerConnector>();
}
/**
@return    消息队列
*/
public BlockingQueue
{
if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
 {
 return initReadMsg(topic,true);
 }
 else
 {
 return topicMap.get(topic);
 }
}
/**
@return    消息队列
*/
public BlockingQueue
{
 //add by peiyj 防止意外关闭客户端
 if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
 {
 return initReadMsg(topic,false);
 }
 else
 {
 return topicMap.get(topic);
 }
}
/**
@return    消息队列
*/
private synchronized BlockingQueue
{
 //默认消息队列500
 BlockingQueue
 //
 if (null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
 {
 topicMap.put(topic, msgQ);
 try 
 {
     // Consumer
      consumer = Consumer.createJavaConsumerConnector(cc);
     logger.info("consumer...createJavaConsumerConnector topic={}",topic);
     topicConsumers.put(topic, consumer);//一个主题放一个消费者,将主题放入消费者
 } 
 catch (Exception e) 
 {
     logger.error("initReadMsg err.......",e);
 }
 Thread readMsg = new Thread(new ReadMsg(topic, msgQ,zipFlag), "["+topic+"]KafkaStream Reader");
 readMsg.start();
 }
 return msgQ;
}
/**
@param topic
*/
public void shutDownConsumer(String topic)
{
 try
 {
 // Consumer
 topicConsumers.get(topic).shutdown();
 logger.info("consumer...shutdown topic={}",topic);
 topicConsumers.remove(topic);
 topicMap.remove(topic);
 }
 catch (Exception e)
 {
 logger.error("shutDownConsumer err......",e);
 }
}
/**
读取mq消息线程
*/
private class ReadMsg implements Runnable
{
 private String topic;
 private boolean zipFlag;
 private BlockingQueue
 public ReadMsg(String topic,BlockingQueue
 {
 this.topic = topic;
 this.msgQ = msgQ;
 this.zipFlag=zipFlag;
}
 public void run()
 {
 Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
 // value表示consumer thread线程数量
 topicCountMap.put(topic, new Integer(consumers));
 while (true) 
 {
     try 
     {    
         if(null==topicMap.get(topic))
         {
             break;
         }
         Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = topicConsumers.get(topic).createMessageStreams(topicCountMap);
         logger.info("consumer...createMessageStreams topic={}",topic);
         for (KafkaStream<byte[], byte[]> m_stream : consumerMap.get(topic)) 
         {
             ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
             while (it.hasNext()) 
             {
                 //MQ offset 移动
                 MessageAndMetadata<byte[], byte[]> mm = it.next();
                 String msg = String.valueOf(new StringSerializer().<String>deserialize(mm.message()));
                 if(zipFlag)
                 {
                     msg =  ZipUtils.unzip(msg);
                     logger.debug("control receive topic={},msg={}",topic,msg);
                     msgQ.put(msg);
                 }
                 else
                 {
                     logger.debug("topic={},msg={}",topic,msg);
                     msgQ.put(msg);
                 }
             }
         }
     } 
     catch (Exception e)
     {
         e.printStackTrace();
         logger.error("KafkaConsumer Reader Exception", e);
     }
     try 
     {
         Thread.sleep(2000);
     } 
     catch (InterruptedException e) 
     {
         logger.error("ReadMsg sleep InterruptedException......",e);
     }
 }
 }
}
public void setZkConfig(String zkConfig)
{
 this.zkConfig = zkConfig;
}
public void setGroup(String group)
{
 this.group = group;
}
public void setConsumers(int consumers)
{
 this.consumers = consumers;
}
@Override
public void process(WatchedEvent event) {
        stateValue=event.getState();
}
}
 
                        
                        
                    
                    
                    
                    
                    
                    
                