kafka 节点宕机后 客户端生产或者消费报错,重启客户端线程又可以继续消费

一如既往 发表于: 2018-11-06   最后更新时间: 2018-11-06 14:18:28   5,264 游览

上次按照大神要求,修正了__consumer_offsets副本数,集群部分节点宕机可正常进行生产或者消费,但是又遇到新问题 java 客户端生产或者消费数据遇到各别节点宕机遇到以下问题

 java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:152)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:471)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:243)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at kafka.KafkaSource.run(KafkaSource.java:83)
    at kafka.KafkaSource.main(KafkaSource.java:146)
14:10:31,368 DEBUG NetworkClient:804 - [Consumer clientId=consumer-1, groupId=test_3] Node 0 disconnected.
14:10:31,368  WARN NetworkClient:671 - [Consumer clientId=consumer-1, groupId=test_3] Connection to node 0 could not be established. Broker may not be available.
14:10:31,369 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,419 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,469 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,519 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,569 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,619 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
发表于 2018-11-06
添加评论

你把连接都配置上了吗?bootsrap.list

一如既往 -> 半兽人 6年前

都配置了

半兽人 -> 一如既往 6年前

一直持续 无法恢复消费? 代码贴一下

一如既往 -> 半兽人 6年前

代码很长,你有邮箱吗

一如既往 -> 半兽人 6年前

需要把java客户端进程重启才能继续生产或者消费,但是如果生产环境我不可能集群坏了我还要动我的java业务代码吧?

半兽人 -> 一如既往 6年前

你先用官方的例子 测试一下。

一如既往 -> 半兽人 6年前

生产端:
public static void send(){
  Properties props = new Properties();
  props.put("bootstrap.servers", "192.168.1.84:9092,192.168.1.85:9092,192.168.1.86:9092");
  props.put("acks", "all");
  props.put("retries", 0);
  props.put("batch.size", 16384);
  props.put("linger.ms", 1);
  props.put("buffer.memory", 33554432);
  props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
  Producer<String, String> producer = new KafkaProducer<>(props);
  int i =0;
     while(true){
  producer.send(new ProducerRecord<String, String>("spdb-cal", "上海"+i));
         i++;

     }
}

消费端:
 public static void main(String[] args) {
  
  Properties props = new Properties();
      props.put("bootstrap.servers","192.168.1.84:9092,192.168.1.85:9092,192.168.1.86:9092");
      props.put("group.id","test");
      props.put("enable.auto.commit","false");
      props.put("auto.commit.interval.ms","1000"); 
      props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
      props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
      KafkaConsumer <String,String> consumer = new KafkaConsumer <>(props);
      consumer.subscribe(Arrays.asList("spdb-cal"));
      while(true){
       ConsumerRecords <String,String> records = consumer.poll(100);
          for(ConsumerRecord <String,String> record:records){
     String msg= record.value();
     Test.printFile(msg+"\r\n");

          }
      }
 }

一如既往 -> 半兽人 6年前

按照官方例子写的简单案例,也是出现我上述的错误,需要重启这两块代码才能继续正常执行

半兽人 -> 一如既往 6年前

生产者一半以上就无法发送了。

秒空气 -> 半兽人 6年前

大神,什么意思?

一如既往 -> 半兽人 6年前

请细说下?

一如既往 -> 半兽人 6年前

看了半天也没看出来能解决我的问题

具体操作流程我描述下,麻烦大神帮我解答下:
我启动三个节点kafka broker 0 1 2 ,开启生产以及消费者java客户端进程模拟生产消费操作做容错性测试,第一步kill broker 0,客户端能正常生产消费;第二步进一步kill broker 1 ,客户端能正常生产消费;第三步 重启broker 0 并kill broker 2 ,客户端就不能正常生产和消费了,报上述错误。但是我重启客户端生产和消费的进程,又可以正常生产和消费了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题