返回到文章

采纳

编辑于 3年前

kafka启动多个消费者,为啥只有第一个能消费到数据。

kafka

问题:我一个kafka集群里有很多topic。我创建了很多consumer去订阅每一个topic,一对一的消费,但是问题是:我启动第一个消费者没问题,后面再启动其他的消费者就消费不到数据了,而且也没报错。不知道啥原因呢?我consumer写的是多例模式的。

@Component
@Scope("prototype")
public class KafkaMqConsumer {

    @Resource(name = "receiveExecutor")
    private Executor receiveExecutor;

    public void consumer(Integer dataSourceId,
            String dataSourceName,
            String metaDataCode,
            String metaDataCnName,
            KafkaConfig kafkaConfig,
            String format,
            String formatType) {
        JaasConfig jaasConfig = kafkaConfig.getJaasConfig();
        Properties props = new Properties();
        props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
        props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
        props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(
                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, jaasConfig.getAutoOffsetResetConfig());
        props.put(ConsumerConfig.CLIENT_ID_CONFIG, jaasConfig.getClientIdConfig());
        props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, jaasConfig.getMaxPollRecordsConfig());
        props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, jaasConfig.getEnableAutoCommitConfig());

        try {
            KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
            consumer.subscribe(
                    Stream.of(kafkaConfig.getTopic().split(",")).collect(Collectors.toList()));
            Runnable runnable =
                    () -> {
                        while (true) {
                            ConsumerRecords<String, String> records =
                                    consumer.poll(Duration.ofSeconds(1L));
                            log.info(
                                    "kafka : The number of records for all topics :{}",
                                    records.count());
                            records.forEach(
                                    record -> {……

调用方式:

@Resource private ObjectFactory<KafkaMqConsumer> objectFactory;
void method(){
 objectFactory.getObject().consumer(dataSourceId,
                dataSource.getName(),
                dataSource.getMetaDataCode(),
                metaDataMapper.selectByPrimaryKey(dataSource.getMetaDataCode()).getCnName(),
                kafkaConfig,
                dataSource.getFormat(),
                dataSource.getFormatType());
}

2.最开始报错,

WARN o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean

javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=gx-test-20170629

然后我百度,改了clientid。每一个consumer分别去订阅不同topic,就出现不报错,也没法消费到数据。

心态崩了啊老铁!