返回到文章

采纳

编辑于 4年前

Kafka整合springboot报java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.poll

kafka

问题

我的项目springboot整合kafka消费者出现异常

以下是我的kafka版本 ,远程kafka-clients规定使用0.11.0.0

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.2.9.RELEASE</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>0.11.0.0</version>
</dependency>

 <-- springboot的版本为 -->

 <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.3.4.RELEASE</version>
    <relativePath/>
</parent>

springcloud为 Hoxton.SR8

错误信息为:

2021-02-03 16:06:13 [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-C-1] ERROR o.s.kafka.listener.KafkaMessageListenerContainer - Error while stopping the container: 
java.lang.NoSuchMethodError: org.apache.kafka.clients.consumer.Consumer.poll(Ljava/time/Duration;)Lorg/apache/kafka/clients/consumer/ConsumerRecords;
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:747)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:703)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266)
    at java.util.concurrent.FutureTask.run(FutureTask.java)

我使用TEST版本的kafka确启动正常(这个是正常可以连接到kafka并且可以进行数据消费)

 public static void main(String[] args) {
        Properties  props = null;
        try {
            props = new Properties();
            props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, KAFKA_BROKER);
            // 指定消费者所属群组
            props.put(ConsumerConfig.GROUP_ID_CONFIG, GROUP_ID);
            // 自动提交
            props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "true");
            props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
            props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer");
            props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, "100000000");
            /**kafka鉴权**/
            props.put("sasl.jaas.config", "org.apache.kafka.common.security.plain.PlainLoginModule required username=\"hanqi\" password=\"123456\";");
            props.put("security.protocol", "SASL_PLAINTEXT");
            props.put("sasl.mechanism", "PLAIN");
        } catch (Exception e) {
            log.error("消费者" + e);
        }

        KafkaConsumer<String, String> consumer = null;
        try {
            consumer = new KafkaConsumer<String, String> (props);
            consumer.subscribe(Arrays.asList(new String[]{TOPIC_NAME}));
            while (true) {
                ConsumerRecords<String, String> records = consumer.poll(50);
                for (ConsumerRecord<String, String> record :records){
                    //消费数据
                    System.out.println(record.value().toString() + ":sys打印的数据");
                    log.info(record.value());
                }
                Thread.sleep(10);
            }
        } catch (Exception e) {
            System.out.println("报错了:" + e);
            log.error("消费者" + e);
        } finally {
            consumer.close();
        }
    }

这个是不正常的启动(已经对照过上面正常配置,这点可以排除)

@KafkaListener(topics = "MisOnekeyBindKafkaTogd11")
  public void topic_test(ConsumerRecord<?, ?> record) {

      log.error("kafka的key: " + record.key());
      System.out.println("kafka的key: " + record.key());
      log.error("kafka的value: " + record.value().toString());
      System.out.println("kafka的value: " + record.value().toString());
  }

我怀疑我初步是我的版本号问题 请大佬帮忙看看是什么问题导致的.