返回到文章

采纳

编辑于 2年前

docker-compose部署kafka,生产者总是无法发送消息给kafka?

kafka java

尝试使用教程中的代码去访问kafka,进行信息发布却没有成功。

1、部署kafka环境:

# kafka是使用docker-compose部署的
version: "3"
services:
    zookeeper:
        image: 'bitnami/zookeeper:latest'
        ports:
            - '2181:2181'
        environment:
            - ALLOW_ANONYMOUS_LOGIN=yes
    kafka:
        image: 'bitnami/kafka:latest'
        ports:
            - '9092:9092'
        environment:
            - KAFKA_BROKER_ID=1
            - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
            - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
            - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
            - ALLOW_PLAINTEXT_LISTENER=yes
        depends_on:
            - zookeeper

2、使用kafka的生产者和消费者能正常生产和消费(这里用的是apache最新版的kafka3.1.0的tar包中bin的sh文件):

./kafka-console-producer.sh --broker-list localhost:9092 --topic test

./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning

3、使用java去生产消息的时候,kafka-console-consumer.sh这个消费者却接收不到(maven使用的是kafka-client:3.1.0):

public class CustomProducer {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        Properties props = new Properties();
        // 192.168.38.134是虚拟机的ip,9092端口用扫描软件扫描过是开通的
        props.put("bootstrap.servers", "192.168.38.134:9092");
        props.put("acks", "all");
        //重试次数
        props.put("retries", 1);
        //批次大小
        props.put("batch.size", 16384);
        //等待时间
        props.put("linger.ms", 1);
        //RecordAccumulator 缓冲区大小
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for (int i = 0; i < 100; i++) {
            System.out.println(i);
            producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
        }
        producer.close();
    }
}

求大神赐教