尝试使用教程中的代码去访问kafka,进行信息发布却没有成功。
1、部署kafka环境:
# kafka是使用docker-compose部署的
version: "3"
services:
zookeeper:
image: 'bitnami/zookeeper:latest'
ports:
- '2181:2181'
environment:
- ALLOW_ANONYMOUS_LOGIN=yes
kafka:
image: 'bitnami/kafka:latest'
ports:
- '9092:9092'
environment:
- KAFKA_BROKER_ID=1
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://127.0.0.1:9092
- KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
- ALLOW_PLAINTEXT_LISTENER=yes
depends_on:
- zookeeper
2、使用kafka的生产者和消费者能正常生产和消费(这里用的是apache最新版的kafka3.1.0的tar包中bin的sh文件):
./kafka-console-producer.sh --broker-list localhost:9092 --topic test
./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
3、使用java去生产消息的时候,kafka-console-consumer.sh
这个消费者却接收不到(maven使用的是kafka-client:3.1.0):
public class CustomProducer {
public static void main(String[] args) throws ExecutionException, InterruptedException {
Properties props = new Properties();
// 192.168.38.134是虚拟机的ip,9092端口用扫描软件扫描过是开通的
props.put("bootstrap.servers", "192.168.38.134:9092");
props.put("acks", "all");
//重试次数
props.put("retries", 1);
//批次大小
props.put("batch.size", 16384);
//等待时间
props.put("linger.ms", 1);
//RecordAccumulator 缓冲区大小
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for (int i = 0; i < 100; i++) {
System.out.println(i);
producer.send(new ProducerRecord<>("test", Integer.toString(i), Integer.toString(i)));
}
producer.close();
}
}
求大神赐教