docker-compose部署kafka,生产者总是无法发送消息给kafka?

CONMI千水楼台 发表于: 2022-04-27   最后更新时间: 2022-04-27 15:15:03   2,473 游览

尝试使用教程中的代码去访问kafka,进行信息发布却没有成功。

1、部署kafka环境:

# kafka是使用docker-compose部署的
version: "3"
services:
    zookeeper:
        image: 'bitnami/zookeeper:latest'
        ports:
            - '2181:2181'
        environment:
            - ALLOW_ANONYMOUS_LOGIN=yes
    kafka:
        image: 'bitnami/kafka:latest'
        ports:
            - '9092:9092'
        environment:
            - KAFKA_BROKER_ID=1
            - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
            - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
            - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
            - ALLOW_PLAINTEXT_LISTENER=yes
        depends_on:
            - zookeeper

2、使用kafka的生产者和消费者能正常生产和消费(这里用的是apache最新版的kafka3.1.0的tar包中bin的sh文件):

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

3、使用java去生产消息的时候,kafka-console-consumer.sh这个消费者却接收不到(maven使用的是kafka-client:3.1.0):

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        // 192.168.38.134是虚拟机的ip,9092端口用扫描软件扫描过是开通的
        props.put("bootstrap.servers", "192.168.38.134:9092");
        props.put("acks", "all");
        //重试次数
        props.put("retries", 1);
        //批次大小
        props.put("batch.size", 16384);
        //等待时间
        props.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            System.out.println(i);
            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
}

求大神赐教

发表于 2022-04-27
添加评论
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092

你配置的是127.0.0.1,所以只有localhost127.0.0.1才可以访问。

理论上,这个地址要变成你的容器ip地址,就可以了,以下有几种方式,你可以尝试一下以下方法:

方式一:

- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092

方式二:

删除这个配置

- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092

方式三:

environment:
  KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  KAFKA_LISTENERS: INTERNAL://0.0.0.0:9092,OUTSIDE://0.0.0.0:9094
  KAFKA_ADVERTISED_LISTENERS: INTERNAL://kafka:9092,OUTSIDE://localhost:9094
  KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: INTERNAL:PLAINTEXT,OUTSIDE:PLAINTEXT
  KAFKA_INTER_BROKER_LISTENER_NAME: INTERNAL

哇!!!感谢大神!!!膜拜

大神,我试了,不行😂

你补充下报错信息。

大神,我的image是bitnami的,是bitnami优化过的,有一些配置可能不一样。但是能测试通过说明是能用的。

这里是我的测试结果:

方法一:

# 这个命令行报错
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

# 报错内容:
[2022-04-28 00:14:52,525] WARN [Consumer clientId=console-consumer, groupId=console-consumer-11293] Error connecting to node b3f45b5d91ff:9092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient)
java.net.UnknownHostException: b3f45b5d91ff: 未知的名称或服务
        at java.base/java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
        at java.base/java.net.InetAddress$PlatformNameService.lookupAllHostAddr(InetAddress.java:933)
        at java.base/java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1519)
        at java.base/java.net.InetAddress$NameServiceAddresses.get(InetAddress.java:852)
        at java.base/java.net.InetAddress.getAllByName0(InetAddress.java:1509)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1367)
        at java.base/java.net.InetAddress.getAllByName(InetAddress.java:1301)
        at org.apache.kafka.clients.DefaultHostResolver.resolve(DefaultHostResolver.java:27)
        at org.apache.kafka.clients.ClientUtils.resolve(ClientUtils.java:110)
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.currentAddress(ClusterConnectionStates.java:511)
        at org.apache.kafka.clients.ClusterConnectionStates$NodeConnectionState.access$200(ClusterConnectionStates.java:468)
        at org.apache.kafka.clients.ClusterConnectionStates.currentAddress(ClusterConnectionStates.java:173)
        at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:984)
        at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:73)
        at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1157)
        at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1045)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:558)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:227)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:164)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:258)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:483)
        at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1262)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1231)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1211)
        at kafka.tools.ConsoleConsumer$ConsumerWrapper.receive(ConsoleConsumer.scala:456)
        at kafka.tools.ConsoleConsumer$.process(ConsoleConsumer.scala:101)
        at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:75)
        at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:52)
        at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala)

方法二:错误同上

方法三:

因为是bitnami的版本,所以没有这个配置。但是我在github上找到了一模一样的配置。

# docker-compose.ymlversion: "3"
services:
  zookeeper:
    image: 'bitnami/zookeeper:latest'
    ports:
      - '2181:2181'
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
  kafka:
    image: 'bitnami/kafka:latest'
    ports:
      - '9092:9092'
      - '9093:9093'
    environment:
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CLIENT:PLAINTEXT,EXTERNAL:PLAINTEXT
      - KAFKA_CFG_LISTENERS=CLIENT://:9092,EXTERNAL://:9093
      - KAFKA_CFG_ADVERTISED_LISTENERS=CLIENT://kafka:9092,EXTERNAL://localhost:9093
      - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=CLIENT
    depends_on:
      - zookeeper

使用同一指令,错误同上。

bitnami版使用9093端口可以使用命令行进行生产和接收:

./kafka-console-producer.sh --broker-list localhost:9093 --topic test

./kafka-console-consumer.sh --bootstrap-server localhost:9093 --topic test --from-beginning

bitnami版使用java进行消息生产:

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        // 192.168.38.134是虚拟机的ip,9092端口用扫描软件扫描过是开通的
        props.put("bootstrap.servers", "192.168.38.134:9093");
        props.put("acks", "all");
        //重试次数
        props.put("retries", 1);
        //批次大小
        props.put("batch.size", 16384);
        //等待时间
        props.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            System.out.println(i);
            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i))).get();
        }
        producer.close();
    }
}

无限卡死在0号信息

192.168.38.134是虚拟机的ip?你应该访问的是docker容器的ip吧,如:

# 查看容器ip
docker inspect --format='{{.NetworkSettings.IPAddress}}' mycentos3

问题的核心,就是要想办法将容器ip配置到KAFKA_CFG_ADVERTISED_LISTENERS上,如:

- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://容器ip:9092

方法一和二就是因为无法解析hostname报的错,是因为kafka需要通过hostname获取到容器内部ip。

方法三的问题是你配置了一个CLIENT://kafka:9092,其中kafka是域名,你解析不开,自然会报错的。

就是说“CLIENT://kafka:9092”要设置为“CLIENT://0.0.0.0:9092”?

我的docker是在虚拟机里面安装的,就是docker的地址还在虚拟机的下一层~
但我在虚拟机里面用kafka的sh文件能直接访问docker的kafka,在主机用java却访问不了,ufw已经打开端口了

你看看这篇文章,有介绍为什么会导致客户端超时的原因:kafka外网转发

震惊!原来是如此原理,即刻尝试

等你的结果。

你的答案

查看kafka相关的其他问题或提一个您自己的问题