返回到文章

采纳

编辑于 4年前

虚拟机CentOS 7下的docker(CentOS 7)内的kafka消费者无法消费到来自windows下Java客户端(IDE)生产者生产的消息

kafka

环境

位置关系如下:

                        |----------------------------------------|
                        |                                        |
                        |   |--------------------------------|   |
                        |   |                                |   |
                        |   |    docker(CentOS) 172.18.0.2   |   |
                        |   |--------------------------------|   |
                        |                                        |
                        | Virtual Machine(CentOS) 192.168.27.143 |
                        |----------------------------------------|

                                 Windows 192.168.137.123
  1. 我在虚拟机里创建了3个docker容器并分别部署了kafka和ZooKeeper(172.18.0.2,172.18.0.3,172.18.0.4 在同一个network)

  2. windows下hosts文件已做IP映射(172.18.0.2 master,172.18.0.3 slave1,172.18.0.4 slave2)

  3. 两两可以互相ping通

  4. 虚拟机2181,9092端口已开启

  5. docker 172.18.0.2 的2181,9092端口已映射

  6. 虚拟机防火墙已关,docker没有安装防火墙

出现问题

我在windows下Java客户端用代码模拟生产者向kafka发消息,kafka里面的消费者没有收到消息。

尝试一

我按照网上说的将advertised.listeners设为PLAINTEXT://<容器的hostname或IP>:9092,依旧不行

尝试二

listeners=INTERNAL://0.0.0.0:29094,EXTERNAL://0.0.0.0:9092
advertised.listeners=INTERNAL://master:29094,EXTERNAL://192.168.27.143:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
inter.broker.listener.name=INTERNAL

命令行

bin/kafka-server-start.sh -daemon /opt/kafka-2.6.0/config/server.properties

bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic one-kafka

bin/kafka-console-producer.sh --broker-list master:9092 --topic one-kafka

bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic one-kafka

命令行生产者消费者没问题,Java客户端生产的消息依然不能被kafka消费

生产者代码

package com.kafka;

import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class MyProducer {

    public static void main(String[] args) {
        Properties properties = new Properties();
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.27.143:9092");// 这里写master也不行
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
                "org.apache.kafka.common.serialization.StringSerializer");
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
        for (int i = 1; i <= 10; i++) {
            Future<RecordMetadata> future = producer.send(new ProducerRecord<>("one-kafka", "666"));
            try {
                RecordMetadata metadata = future.get();
                System.out.println(metadata.topic());
            } catch (InterruptedException e) {
                e.printStackTrace();
            } catch (ExecutionException e) {
                e.printStackTrace();
            }
        }
        producer.close();
    }

}

客户端报错

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic one-kafka not present in metadata after 60000 ms.
    at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1314)
    at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:970)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:870)
    at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:758)
    at com.kafka.MyProducer.main(MyProducer.java:22)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic one-kafka not present in metadata after 60000 ms.