使用注解@KafkaListener(topics= "binlog", id="consumer")
消费不到消息
但是使用
@KafkaListener(topics = "binlog", id="consumer",
topicPartitions = {
@TopicPartition(topic = "binlog", partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0"))
})
这种写法可以获取到,但是每次consumer重启都从offset 0 开始抓数据,各位大神帮忙看看什么原因要设置offset之后才能正常获取消息,有没有办法让他自动获取最大的偏移量开始接收消息.困扰两天了
依赖环境:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.1</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>org.example</groupId>
<artifactId>kafka</artifactId>
<version>1.0-SNAPSHOT</version>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter</artifactId>
</dependency>
<!--kafka-->
<!-- <dependency>-->
<!-- <groupId>org.springframework.kafka</groupId>-->
<!-- <artifactId>spring-kafka</artifactId>-->
<!-- <version>2.2.4.RELEASE</version>-->
<!-- </dependency>-->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.4.1.RELEASE</version>
</dependency>
<!-- elasticsearch http api client -->
<dependency>
<groupId>io.searchbox</groupId>
<artifactId>jest</artifactId>
<version>5.3.3</version>
</dependency>
<!--fastjson-->
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.51</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
代码
/**
*实现单数据消费逻辑
*/
@Component
public class SingleDataConsumer {
private static final Logger logger = LoggerFactory.getLogger(SingleDataConsumer.class);
@KafkaListener(topics = Config.KAFKA_JSON_TOPICS)
public void listener(ConsumerRecord<?, ?> record){
logger.info("topic.quick.consumer receive : " + record.toString());
}
}
配置信心
#=============== consumer =======================
# 指定默认消费者group id
spring.kafka.consumer.group-id=consumer
#spring.kafka.consumer.auto-offset-reset=latest
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=true
#spring.kafka.consumer.auto-commit-interval=100
# 指定消息key和消息体的编解码方式
spring.kafka.consumer.key-deserializer=org.apache.kafka.common.serialization.StringDeserializer
spring.kafka.consumer.value-deserializer=org.apache.kafka.common.serialization.StringDeserializer
使用注解@KafkaListener(topics= "binlog", id="consumer") 消费不到消息
但是使用
@KafkaListener(topics = "binlog", id="consumer",
topicPartitions = {
@TopicPartition(topic = "binlog", partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0"))
})
这种写法可以获取到,但是每次consumer重启都从offset 0 开始抓数据,各位大神帮忙看看什么原因要设置offset之后才能正常获取消息,有没有办法让他自动获取最大的偏移量开始接收消息.困扰两天了
使用注解@KafkaListener(topics= "binlog", id="consumer") 消费不到消息
但是使用
@KafkaListener(topics = "binlog", id="consumer",
topicPartitions = {
@TopicPartition(topic = "binlog", partitionOffsets = @PartitionOffset(partition = "0" , initialOffset = "0"))
})
这种写法可以获取到,但是每次consumer重启都从offset 0 开始抓数据,各位大神帮忙看看什么原因要设置offset之后才能正常获取消息,有没有办法让他自动获取最大的偏移量开始接收消息.困扰两天了