1、在server.properties中num.partitions=2,配置了两个分区,我存入数据的时候,kafka应该会把数据分别存入到这两个分区里,然后我建了两个线程对应两个消费者,分别指定0分区,和1分区,但是消费不到数据?如果我把这个时间kafkaConsumer.poll(100000)加长就可以消费到500条,最多就是500条,如果设置100的话,就会消费不到数据
2、贴上相关代码(请勿用图片代替代码)
public class MyConsumer extends Thread{
int partition=0;
public MyConsumer(int partition) {
this.partition=partition;
}
@Override
public void run() {
// Logger log=LoggerFactory.getLogger(MyConsumer.class);
long startTime=System.currentTimeMillis()/1000;
Properties properties = new Properties();
properties.put("bootstrap.servers", "127.0.0.1:9093");
properties.put("group.id", "group-1");
properties.put("enable.auto.commit", "true");
properties.put("auto.commit.interval.ms", "1000");
properties.put("auto.offset.reset", "earliest");
properties.put("session.timeout.ms", "30000");
properties.put("partition.assignment.strategy", "org.apache.kafka.clients.consumer.RangeAssignor");
properties.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> kafkaConsumer = new KafkaConsumer<>(properties);
TopicPartition topicPartition=new TopicPartition(KafkaProperties.topic, partition);
kafkaConsumer.assign(Arrays.asList(topicPartition));
kafkaConsumer.seekToBeginning(Arrays.asList(topicPartition));
kafkaConsumer.seek(topicPartition, 0);
// kafkaConsumer.subscribe(Arrays.asList(KafkaProperties.topic));
int total=0;
// while (true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(100000);
total+=records.count();
// for (ConsumerRecord<String, String> record : records) {
// System.out.printf("offset = %d, value = %s", record.offset(), record.value(),record.topic());
// log.info("offset = %d, value = %s"+record.offset()+","+record.value()+","+record.topic());
// System.out.println();
// }
long endTime=System.currentTimeMillis()/1000;
System.out.println("查询"+total+"条,时间:"+(endTime-startTime)+"秒");
// }
}
public static void main(String[] args){
MyConsumer thread1 = new MyConsumer(0);
MyConsumer thread2 = new MyConsumer(1);
thread1.start();
System.out.println("测试");
thread2.start();
}
}
查询500条,时间:1秒
查询500条,时间:1秒
刚开始只设置了一个消费者,每次都是只能消费500条,写上while(true)循环,会一直消费,是时间的问题吗