提问说明
SpringBoot项目整合了Kafka,Kafka Broker都开启了SSL(没有开启验证主机名),并且在Linux下用producer以及consumer的命令行可以在SSL状态下使用,但是在SpringBoot中启动就会报错.导致无法启动.证书也都有
spring:
kafka:
ssl:
key-store-location: file:C:/Users/eatheryu/Desktop/ca/server/server.keystore.jks
key-store-password: 1111111
key-password: 1111111
trust-store-location: file:C:/Users/eatheryu/Desktop/ca/client/client.truststore.jks
trust-store-password: 1111111
key-store-type: JKS
bootstrap-servers: 111.111.111.111:8089
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
acks: all
retries: 30
ssl:
protocol: SSL
consumer:
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
enable-auto-commit: false
group-id: test3
auto-offset-reset: latest
max-poll-records: 20
ssl:
protocol: SSL
报错信息如下:
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2019-04-11 16:25:41.095 ERROR 18668 --- [ restartedMain] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is java.lang.OutOfMemoryError: Java heap space
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:316) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1260) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at com.example.kafka.KafkaApplication.main(KafkaApplication.java:10) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43cation.java:1260) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1248) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at com.example.kafka.KafkaApplication.main(KafkaApplication.java:10) [classes/:na]
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_191]
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_191]
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_191]
at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_191]
at org.springframework.boot.devtools.restart.RestartLauncher.run(RestartLauncher.java:49) [spring-boot-devtools-2.1.0.RELEASE.jar:2.1.0.RELEASE]
Caused by: java.lang.OutOfMemoryError: Java heap space
at java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57) ~[na:1.8.0_191]
at java.nio.ByteBuffer.allocate(ByteBuffer.java:335) ~[na:1.8.0_191]
at org.apache.kafka.common.memory.MemoryPool$1.tryAllocate(MemoryPool.java:30) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:112) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:335) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:296) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.Selector.attemptRead(Selector.java:560) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:496) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.common.network.Selector.poll(Selector.java:425) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.consumer.internals.Fetcher.getTopicMetadata(Fetcher.java:274) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1774) ~[kafka-clients-2.0.0.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.partitionsFor(KafkaConsumer.java:1742) ~[kafka-clients-2.0.0.jar:na]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.checkTopics(AbstractMessageListenerContainer.java:275) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:135) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:257) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:289) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:238) ~[spring-kafka-2.2.0.RELEASE.jar:2.2.0.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:879) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.finishRefresh(ServletWebServerApplicationContext.java:161) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:549) ~[spring-context-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:140) ~[spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:775) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397) [spring-boot-2.1.0.RELEASE.jar:2.1.0.RELEASE]
纠结很久了但还不知道错误在哪里
看样子大概率是 SSL的问题。
https://stackoverflow.com/questions/58218493/kafka-and-ssl-java-lang-outofmemoryerror-java-heap-space-when-using-kafka-top
看下 kafka Server.log 就会发现 SSL认证失败。
猜测原因是 SSL 认证失败, 数据始终没有发送出去, 最后 OOM。
希望对其他人有所帮助。
看的人不少就是没人回答.....目前问题还未解决..一点思路没有
老兄,这个调整下jvm。你的内存不足了。
关掉ssl验证立马就好啦
你先调整下试试。
jvm初始值我从128调到了512...还是报错....有一点.我把连接broker的ip地址换成hosts配置过的服务器名称,然后开启ssl通过服务器名称验证,这样子报的是Timeout expired while fetching topic metadata,好像都提示Broker may not be available. 都连不上... 用ip是能连上.但是会报jvm堆溢出....烦
Caused by: java.lang.OutOfMemoryError: null 设置了jvm内存后会是这样子...
那还不是因为设置的小了。。
我调到2048都不行.....
有毒。你是不是死循环了,内存有多少吃多少。
冤枉啊...我的代码都是正常的...
Producer代码
@RequestMapping("/send") public String sendMessage(){ String msg="producer"; ListenableFuture test = kafkaTemplate.send(topic,msg); test.addCallback(new ListenableFutureCallback() { @Override public void onFailure(Throwable throwable) { System.out.println("发送失败"); } @Override public void onSuccess(Object o) { System.out.println("发送成功"); } }); return "success"; }
Consumer代码
@KafkaListener(topics = "${pro.topic2}", groupId = "${spring.kafka.consumer.group-id}", containerFactory = "kafkaListenerContainerFactory", errorHandler = "consumerAwareErrorHandler") public void listen2(ConsumerRecord<?, ?> consumerRecord, Acknowledgment acknowledgment) { System.out.println("testInQT====key======" + consumerRecord.key() + "=====Value===" + consumerRecord.value() + "=====offset=====" + consumerRecord.offset() + "consumerRecord.partition();" + consumerRecord.partition()); acknowledgment.acknowledge(); }
这个代码不开ssl就可以跑通,开了就报异常...
你的答案