我的项目springboot整合kafka消费者出现异常
以下是我的kafka版本 ,远程kafka-clients规定使用0.11.0.0
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.2.9.RELEASE</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>0.11.0.0</version>
</dependency>
<-- springboot的版本为 -->
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.3.4.RELEASE</version>
<relativePath/>
</parent>
springcloud为 Hoxton.SR8
错误信息为:
2021-02-03 16:06:13 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.kafka.listener.KafkaMessageListenerContainer - Error while stopping the container:
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:747)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
at java.util.concurrent.FutureTask.run(FutureTask.java)
我使用TEST版本的kafka确启动正常(这个是正常可以连接到kafka并且可以进行数据消费)
public static void main(String[] args) {
Properties props = null;
try {
props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
// 指定消费者所属群组
props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
// 自动提交
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000000");
/**kafka鉴权**/
props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"hanqi\" password=\"123456\";");
props.put("security.protocol", "SASL_PLAINTEXT");
props.put("sasl.mechanism", "PLAIN");
} catch (Exception e) {
log.error("消费者" + e);
}
KafkaConsumer<String, String> consumer = null;
try {
consumer = new KafkaConsumer<String, String> (props);
consumer.subscribe(Arrays.asList(new String[]{TOPIC_NAME}));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(50);
for (ConsumerRecord<String, String> record :records){
//消费数据
System.out.println(record.value().toString() + ":sys打印的数据");
log.info(record.value());
}
Thread.sleep(10);
}
} catch (Exception e) {
System.out.println("报错了:" + e);
log.error("消费者" + e);
} finally {
consumer.close();
}
}
这个是不正常的启动(已经对照过上面正常配置,这点可以排除)
@KafkaListener(topics = "MisOnekeyBindKafkaTogd11")
public void topic_test(ConsumerRecord<?, ?> record) {
log.error("kafka的key: " + record.key());
System.out.println("kafka的key: " + record.key());
log.error("kafka的value: " + record.value().toString());
System.out.println("kafka的value: " + record.value().toString());
}
我怀疑我初步是我的版本号问题 请大佬帮忙看看是什么问题导致的.