返回到文章

采纳

编辑于

kafka 消费者消费不到数据

kafka

提问说明

1、在server.properties中num.partitions=2,配置了两个分区,我存入数据的时候,kafka应该会把数据分别存入到这两个分区里,然后我建了两个线程对应两个消费者,分别指定0分区,和1分区,但是消费不到数据?如果我把这个时间kafkaConsumer.poll(100000)加长就可以消费到500条,最多就是500条,如果设置100的话,就会消费不到数据

2、贴上相关代码(请勿用图片代替代码)

public class MyConsumer extends Thread{

    int partition=0;

    public MyConsumer(int partition) {
        this.partition=partition;
    }

    @Override
    public void run() {
//        Logger log=LoggerFactory.getLogger(MyConsumer.class);
        long startTime=System.currentTimeMillis()/1000;
        Properties properties = new Properties();
        properties.put("bootstrap.servers", "127.0.0.1:9093");
        properties.put("group.id", "group-1");
        properties.put("enable.auto.commit", "true");
        properties.put("auto.commit.interval.ms", "1000");
        properties.put("auto.offset.reset", "earliest");
        properties.put("session.timeout.ms", "30000");
        properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
        properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);

        TopicPartition topicPartition=new TopicPartition(KafkaProperties.topic, partition);
        kafkaConsumer.assign(Arrays.asList(topicPartition));
        kafkaConsumer.seekToBeginning(Arrays.asList(topicPartition));
        kafkaConsumer.seek(topicPartition, 0);
//        kafkaConsumer.subscribe(Arrays.asList(KafkaProperties.topic));
        int total=0;
//        while (true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(100000);
            total+=records.count();
//            for (ConsumerRecord<String, String> record : records) {
//                System.out.printf("offset = %d, value = %s", record.offset(), record.value(),record.topic());
//                log.info("offset = %d, value = %s"+record.offset()+","+record.value()+","+record.topic());
//                System.out.println();
//            }
            long endTime=System.currentTimeMillis()/1000;
            System.out.println("查询"+total+"条,时间:"+(endTime-startTime)+"秒");
//        }
    }

    public static void main(String[] args){
        MyConsumer thread1 = new MyConsumer(0);
        MyConsumer thread2 = new MyConsumer(1);
        thread1.start();
        System.out.println("测试");
        thread2.start();
        }
 }

查询500条,时间:1秒
查询500条,时间:1秒

刚开始只设置了一个消费者,每次都是只能消费500条,写上while(true)循环,会一直消费,是时间的问题吗