位置关系如下:
|----------------------------------------|
| |
| |--------------------------------| |
| | | |
| | docker(CentOS) 172.18.0.2 | |
| |--------------------------------| |
| |
| Virtual Machine(CentOS) 192.168.27.143 |
|----------------------------------------|
Windows 192.168.137.123
我在虚拟机里创建了3个docker容器并分别部署了kafka和ZooKeeper(172.18.0.2,172.18.0.3,172.18.0.4 在同一个network)
windows下hosts文件已做IP映射(172.18.0.2 master,172.18.0.3 slave1,172.18.0.4 slave2)
两两可以互相ping通
虚拟机2181,9092端口已开启
docker 172.18.0.2 的2181,9092端口已映射
虚拟机防火墙已关,docker没有安装防火墙
我在windows下Java客户端用代码模拟生产者向kafka发消息,kafka里面的消费者没有收到消息。
我按照网上说的将advertised.listeners设为PLAINTEXT://<容器的hostname或IP>:9092
,依旧不行
listeners=INTERNAL://0.0.0.0:29094,EXTERNAL://0.0.0.0:9092
advertised.listeners=INTERNAL://master:29094,EXTERNAL://192.168.27.143:9092
listener.security.protocol.map=INTERNAL:PLAINTEXT,EXTERNAL:PLAINTEXT
inter.broker.listener.name=INTERNAL
命令行
bin/kafka-server-start.sh -daemon /opt/kafka-2.6.0/config/server.properties
bin/kafka-topics.sh --create --zookeeper master:2181 --replication-factor 1 --partitions 1 --topic one-kafka
bin/kafka-console-producer.sh --broker-list master:9092 --topic one-kafka
bin/kafka-console-consumer.sh --bootstrap-server master:9092 --topic one-kafka
命令行生产者消费者没问题,Java客户端生产的消息依然不能被kafka消费
package com.kafka;
import java.util.Properties;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
public class MyProducer {
public static void main(String[] args) {
Properties properties = new Properties();
properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "192.168.27.143:9092");// 这里写master也不行
properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
"org.apache.kafka.common.serialization.StringSerializer");
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);
for (int i = 1; i <= 10; i++) {
Future<RecordMetadata> future = producer.send(new ProducerRecord<>("one-kafka", "666"));
try {
RecordMetadata metadata = future.get();
System.out.println(metadata.topic());
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}
producer.close();
}
}
客户端报错
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
java.util.concurrent.ExecutionException: org.apache.kafka.common.errors.TimeoutException: Topic one-kafka not present in metadata after 60000 ms.
at org.apache.kafka.clients.producer.KafkaProducer$FutureFailure.<init>(KafkaProducer.java:1314)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:970)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:870)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:758)
at com.kafka.MyProducer.main(MyProducer.java:22)
Caused by: org.apache.kafka.common.errors.TimeoutException: Topic one-kafka not present in metadata after 60000 ms.