用命令行perducer,有没有ssl,都没问题。
下面的java,如果改成端口9092,然后注释掉6行有关ssl,就能成功。
错误提示:
### org.apache.kafka.common.errors.TimeoutException:Failed to update metadata after 600000 ms. [2017-08-09 12:25:55,557] WARN SSL peer is not authenticated, returning ANONYMOUS instead (org.apache.kafka.common.network.SslTransportLayer) [2017-08-09 12:25:56,634]WARN Failed to send SSL Close message (org.apache.kafka.common.network.SslTransportLayer) java.io.IOException: Connection reset by peer at sun.nio.ch.FileDispatcherImpl.write0(Native Method) at sun.nio.ch.SocketDispatcher.write(SocketDispatcher.java:47) at sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at sun.nio.ch.IOUtil.write(IOUtil.java:65) at sun.nio.ch.SocketChannelImpl.write(SocketChannelImpl.java:471) at org.apache.kafka.common.network.SslTransportLayer.flush(SslTransportLayer.java:195) at org.apache.kafka.common.network.SslTransportLayer.close(SslTransportLayer.java:163) at org.apache.kafka.common.utils.Utils.closeAll(Utils.java:690) at org.apache.kafka.common.network.KafkaChannel.close(KafkaChannel.java:47) at org.apache.kafka.common.network.Selector.close(Selector.java:471) at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:348) at org.apache.kafka.common.network.Selector.poll(Selector.java:283) at kafka.network.Processor.poll(SocketServer.scala:472) at kafka.network.Processor.run(SocketServer.scala:412) at java.lang.Thread.run(Thread.java:745)
请问问题大概在哪里?
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "10.10.2.203:9093");
//configure the following three settings for SSL Encryption
props.put(CommonClientConfigs.SECURITY_PROTOCOL_CONFIG, "SSL"); props.put(SslConfigs.SSL_TRUSTSTORE_LOCATION_CONFIG, "d:/certificate/broker-1.truststore.jks"); props.put(SslConfigs.SSL_TRUSTSTORE_PASSWORD_CONFIG, "test1234");**
// configure the following three settings for SSL Authentication
props.put(SslConfigs.SSL_KEYSTORE_LOCATION_CONFIG, "d:/certificate/broker-1.keystore.jks");
props.put(SslConfigs.SSL_KEYSTORE_PASSWORD_CONFIG, "test1234");
props.put(SslConfigs.SSL_KEY_PASSWORD_CONFIG, "test1234");
props.put(ProducerConfig.ACKS_CONFIG, "all");
props.put(ProducerConfig.RETRIES_CONFIG, 0);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");