返回到文章

采纳

编辑于

SpringBoot和kafka集成

kafka SpringBoot

1. 前言

对于使用Apache Kafka的Spring项目,我们在Spring核心提供了Kafka消息的集成。提供了公共的接入“模板”,作为消息发送的高级抽象层,还为消息的POJO提供支持。

3. 介绍

本示例提供一个快速的入门例子,直接运行即可。

3.1. 快速游览(Quick Tour for the Impatient)

这是Spring Kafka的五分钟速览。

先决条件:您的Apache Kafka已经安装并且运行了。然后,您必须有spring-kafka JAR及其所有依赖项。 最简单的方法是在构建工具中声明一个依赖项。以下示例显示了如何使用Maven进行操作:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
  <version>2.4.1.RELEASE</version>
</dependency>

Gradle的引入:

compile 'org.springframework.kafka:spring-kafka:2.4.1.RELEASE'

使用Spring Boot时,如果忽略该版本,则Spring Boot将自动引入与您的Boot版本兼容的正确版本:

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>

Gradle的方式:

compile 'org.springframework.kafka:spring-kafka'

3.1.1. 兼容性

适用于以下的版本:

  • Apache Kafka Clients 2.2.0
  • Spring Framework 5.2.x
  • 最小的 Java 版本: 8

3.1.2. 一个非常非常快速的例子

如下例所示,您可以使用普通Java发送和接收消息:

package com.example.kafka;

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.serialization.StringSerializer;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.KafkaMessageListenerContainer;
import org.springframework.kafka.listener.MessageListener;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

import static org.junit.jupiter.api.Assertions.assertTrue;

@SpringBootTest
class KafkaTests01 {

    private Logger logger = LoggerFactory.getLogger(getClass());
    private String group = "group01";
    private String topic1 = "topic1";

    @Test
    public void testAutoCommit() throws Exception {
        logger.info("Start auto");

        // 启动消费者
        ContainerProperties containerProps = new ContainerProperties("topic1", "topic2");
        final CountDownLatch latch = new CountDownLatch(4);
        containerProps.setMessageListener(new MessageListener<Integer, String>() {
            @Override
            public void onMessage(ConsumerRecord<Integer, String> message) {
                logger.info("received: " + message);
                latch.countDown();
            }
        });
        KafkaMessageListenerContainer<Integer, String> container = createContainer(containerProps);
        container.setBeanName("testAuto");
        container.start();  // 启动消费者

        Thread.sleep(1000); // wait a bit for the container to start

        // 启动生产者
        KafkaTemplate<Integer, String> template = createTemplate();
        template.setDefaultTopic(topic1);
        template.sendDefault(0, "foo");
        template.sendDefault(2, "bar");
        template.sendDefault(0, "baz");
        template.sendDefault(2, "qux");
        template.flush();

        assertTrue(latch.await(60, TimeUnit.SECONDS));
        container.stop(); // 关闭消费者
        logger.info("Stop auto");
    }

    private KafkaMessageListenerContainer<Integer, String> createContainer(ContainerProperties containerProps) {
        Map<String, Object> props = consumerProps();
        DefaultKafkaConsumerFactory<Integer, String> cf = new DefaultKafkaConsumerFactory<Integer, String>(props);
        KafkaMessageListenerContainer<Integer, String> container = new KafkaMessageListenerContainer<>(cf, containerProps);
        return container;
    }

    private KafkaTemplate<Integer, String> createTemplate() {
        Map<String, Object> senderProps = senderProps();
        ProducerFactory<Integer, String> pf = new DefaultKafkaProducerFactory<Integer, String>(senderProps);
        KafkaTemplate<Integer, String> template = new KafkaTemplate<>(pf);
        return template;
    }

    private Map<String, Object> consumerProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, true);
        props.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, "100");
        props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, "15000");
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, IntegerDeserializer.class);
        props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
        return props;
    }

    private Map<String, Object> senderProps() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.RETRIES_CONFIG, 0);
        props.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);
        props.put(ProducerConfig.LINGER_MS_CONFIG, 1);
        props.put(ProducerConfig.BUFFER_MEMORY_CONFIG, 33554432);
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, IntegerSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        return props;
    }
}

3.1.3. 使用Java配置

你也可以使用Java的Spring配置来完成与上一个示例中相同的效果。 以下示例显示了如何执行此操作:

@Autowired
private Listenerprivate Listener listener;

@Autowired
private KafkaTemplate<Integer, String> template;

@Test
public void testSimple() throws Exception {
    template.send("annotated1", 0, "foo");
    template.flush();
    assertTrue(this.listener.latch1.await);
    template.flush();
    assertTrue(this.listener.latch1.await(10, TimeUnit.SECONDS));
}

@Configuration
@EnableKafka
public class Config {

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return
    ConcurrentKafkaListenerContainerFactory

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        returnBean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new@BeanncurrentKafkaListenerContainerFactory<IntegerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public

    @BeanrrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.putBean@BeanncurrentKafkaListenerContainerFactory<IntegerBean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory

    @Bean
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory
    ConcurrentKafkaListenerContainerFactory<Integer, String>
                        kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<Integer, String> factory =
                                new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }

    @Bean
    public ConsumerFactory<Integer, String> consumerFactory() {
        return new DefaultKafkaConsumerFactory<>(consumerConfigs());
    }

    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }

    @Bean
    public Listener listener() {
        return new Listener();
    }

    @Bean
    public ProducerFactory<Integer, String> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }

    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, embeddedKafka.getBrokersAsString());
        ...
        return props;
    }

    @Bean
    public KafkaTemplate<Integer, String> kafkaTemplate() {
        return new KafkaTemplate<Integer, String>(producerFactory());
    }

}
public class Listener {

private final CountDownLatch latch1 = new CountDownLatch(1);

    @KafkaListener(id = "foo", topics = "annotated1")
    public void listen1(String foo) {
        this.latch1.countDown();
    }
}

3.1.4. Spring Boot更简单的方式

Spring Boot可以更加简单。 下面的Spring Boot应用示例将三个消息发送到一个主题,然后接收它们,然后停止:

package com.example.kafkapackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotationpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importm.examplepackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import org.apachepackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframeworke come.kafka;

importfka.clientserRecord;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackagekafkamport orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.bootpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumerpackage com.example.kafka.demo03package com.example.kafka.demo03;

import orgkage compackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import orgpackage.exampleemo03;

import orger.ConsumerRecord;
import org.slf4jimport org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframeworkm.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beanspackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListenerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumerpackage com.example.kafkapackage com.example.kafka.demo03package com.example.kafka.demo03;

import orgpackagepackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframeworkpackage.examplea.demo03 orgfka.clientsconsumer.ConsumerRecordpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.exampleo03;

importmport org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
importpackage com.example
importg.apachea.clientsonsumerRecord;
import org.slf4j.Logger;
import orgpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

importkage com.example.kafka.demo03;

import org.apachepackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframeworke come.kafka;

importe.kafkaumer.ConsumerRecordord;
import.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import orgpackageple.kafka

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4jpackage com.exampleo03;

importt orgache.kafkapackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
importge com.exampleka.demo03.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beanspackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import orgkage compackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
importe com.exampleemo03;

importort org.apacheer.ConsumerRecord;
import orgimportpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecordm.examplepackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import orgpackagepackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafkapackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunnere come.kafka;

importfka.clientsconsumererRecord;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.exampleo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
importom.example
importg.apachea.clients.consumerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecordpackage com.example.kafka.demo03;

import org.apachekage compackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import orgm.examplemple.kafka3;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4jpackage com.example.kafka.demo03;

import org.apache.kafka.clientspackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import orge comemo03;

import org.apache.kafkaerRecord;
importger;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importm.examplepackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4jpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframeworke com.exampleemo03;

import orger.ConsumerRecord;
importimportpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage commple.kafka3;

importpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importkage compackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beanspackage.exampleemo03;

import orgmer.ConsumerRecordort orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Loggerpackagekafka.demo03ts.consumerpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import orgpackagepackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfiguree come.kafka.demo03;

importe.kafka.clients.consumer.ConsumerRecord.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage comkafkao03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beanspackagepackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Loggerge com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactorypackage com.kafka03;

importkafkapackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import orgpackage.exampleka.demo03;

import orghe.kafkapackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import orgkage com.example.kafka.demo03;

import org.apachepackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beanse come.kafkaa.demo03;

import orgfka.clientsconsumererRecord;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import.exampleo03;

importmport org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
importom.exampleple.kafka03;

import
importg.apachea.clientsonsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
importpackage com.example.kafka.demo03;

import org.apache.kafkapackage com.example.kafka.demo03;

import org.apache.kafka.clientspackagepackage com.example.kafka.demo03;

import org.apache.kafkapackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boote com.exampleemo03;

importort org.apacheer.ConsumerRecorderRecord;
importger;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importm.examplepackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage com.example.kafka.demo03;

import orgpackagepackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
importe comemo03;

importmer.ConsumerRecordport orgj.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecordcome.kafkademo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage com.example.kafka.demo03;

import org.apachepackage com.example.kafka.demo03;

import org.apache.kafka.clientspackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumerpackage compackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.bootpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importcom.example.kafkapackage com.example.kafka.demo03;

import orgpackage com.example.kafkapackage com.examplepackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import orge comemo03;

import org.apache.kafka.clients.consumer.ConsumerRecordport orgj.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecordcom.exampledemo03package com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importkage compackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowiredpackage com.example.kafka.demo03;

importorgmer.ConsumerRecordport org.slf4j.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
importpackage comdemo03package com.example.kafka.demo03;

import org.apache.kafkapackage com.example.kafka.demo03;

import org.apachepackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumerkage com.example.kafka.demo03;

import org.apache.kafka.clientspackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafkae comemo03;

importmer.ConsumerRecordport orgj.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importcom.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
importpackage com.example.kafka.demo03;

import org.apachepackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowirede com.exampleemo03;

import orgmer.ConsumerRecordport orgj.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Loggerpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4jpackage com.example.kafka.demo03;

import org.apache.kafkapackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframeworkpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage com.example.kafka.demo03;

import orgkage compackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import orge com.exampleemo03;

import orgmer.ConsumerRecordd;
importport org.slf4jger;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importcome.kafkapackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import orgkage compackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowiredpackage.example

importmer.ConsumerRecordport orgj.Loggerger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunnercom.exampledemo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import orgkage compackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
importe com.exampleemo03;

import orgmer.ConsumerRecordport org.slf4j.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
importpackage com.example.kafkapackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

importkage com.example.kafka.demo03;

import org.apache.kafka.clientspackage com.example.kafka.demo03;

import org.apache.kafkapackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
importpackage.example.kafka.demo03;

import org.apache.kafka.clientsd;
importger;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecordpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

importkage compackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import orgpackagee com.example

import org.apache.kafka.clients.consumerd;
import orgger;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4jcomdemo03package com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
importkage compackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import orge comemo03;

importmer.ConsumerRecordport orgj.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecordcom.example.kafkapackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import orgkage compackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
importe comemo03;

importmer.ConsumerRecordd;
importj.Loggerpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecordpackagee.kafkapackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apachepackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
importpackage com.example.kafkapackage com.example.kafka.demo03;

import orgkage compackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafkapackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunnere com.example.kafka.demo03ort org.apache.kafkamerRecord;
importgger;
import
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
importm.examplepackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Loggerpackagepackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumerpackage com.examplepackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframeworkpackage compackage com.example.kafka.demo03;

import org.apache.kafkapackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beanse come.kafkaorg.apache.kafka.clientsumer.ConsumerRecordpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecordpackageexamplepackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
importpackage com.examplepackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

import orgkage compackage com.example.kafka.demo03;

import orgpackage com.example.kafka.demo03;

importpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
importe come.kafkaorg.apache.kafkaumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframeworkpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.exampleafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Loggerpackage com.example.kafka.demo03;

importpackagepackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import orgpackage com.example.kafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactorye come.kafkaorg.apache.kafka.clientsumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactorypackagepackage com.example.kafka.demo03;

import org.apache.kafka.clientspackage comafka.demo03;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;

import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;

@SpringBootApplication
public class Application implements CommandLineRunner {

    public static Logger logger = LoggerFactory.getLogger(Application.class);

    public static void main(String[] args) {
        SpringApplication.run(Application.class, args).close();
    }

    @Autowired
    private KafkaTemplate<String, String> template;

    private final CountDownLatch latch = new CountDownLatch(3);

    @Override
    public void run(String... args) throws Exception {
        this.template.send("myTopic", "foo1");
        this.template.send("myTopic", "foo2");
        this.template.send("myTopic", "foo3");
        latch.await(60, TimeUnit.SECONDS);
        logger.info("All received");
    }

    @KafkaListener(topics = "myTopic")
    public void listen(ConsumerRecord<?, ?> cr) throws Exception {
        logger.info(cr.toString());
        latch.countDown();
    }
}

配置application.properties

spring.kafka.consumer.group-id=foo
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.listener.missing-topics-fatal=false
  • spring.kafka.consumer.group-id指定消费者组id。
  • spring.kafka.consumer.auto-offset-reset确保新的消费者组能获得我们之前发送的消息,为了测试方便(生产配置latest,只获取最新的消息)。
  • spring.kafka.listener.missing-topics-fatal 监听的topic如果不存在,则不报错