问题:我一个kafka集群里有很多topic。我创建了很多consumer去订阅每一个topic,一对一的消费,但是问题是:我启动第一个消费者没问题,后面再启动其他的消费者就消费不到数据了,而且也没报错。不知道啥原因呢?我consumer写的是多例模式的。
@Component
@Scope("prototype")
public class KafkaMqConsumer {
@Resource(name = "receiveExecutor")
private Executor receiveExecutor;
public void consumer(Integer dataSourceId,
String dataSourceName,
String metaDataCode,
String metaDataCnName,
KafkaConfig kafkaConfig,
String format,
String formatType) {
JaasConfig jaasConfig = kafkaConfig.getJaasConfig();
Properties props = new Properties();
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaConfig.getBootstrapServers());
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaConfig.getGroupId());
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, jaasConfig.getAutoOffsetResetConfig());
props.put(ConsumerConfig.CLIENT_ID_CONFIG, jaasConfig.getClientIdConfig());
props.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, jaasConfig.getMaxPollRecordsConfig());
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, jaasConfig.getEnableAutoCommitConfig());
try {
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(
Stream.of(kafkaConfig.getTopic().split(",")).collect(Collectors.toList()));
Runnable runnable =
() -> {
while (true) {
ConsumerRecords<String, String> records =
consumer.poll(Duration.ofSeconds(1L));
log.info(
"kafka : The number of records for all topics :{}",
records.count());
records.forEach(
record -> {……
调用方式:
@Resource private ObjectFactory<KafkaMqConsumer> objectFactory;
void method(){
objectFactory.getObject().consumer(dataSourceId,
dataSource.getName(),
dataSource.getMetaDataCode(),
metaDataMapper.selectByPrimaryKey(dataSource.getMetaDataCode()).getCnName(),
kafkaConfig,
dataSource.getFormat(),
dataSource.getFormatType());
}
2.最开始报错,
WARN o.a.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.consumer:type=app-info,id=gx-test-20170629
然后我百度,改了clientid。每一个consumer分别去订阅不同topic,就出现不报错,也没法消费到数据。
心态崩了啊老铁!