返回到文章

采纳

编辑于 4年前

kafka poll records数据为空?

kafka

提问说明

你好,我现在遇到了一个问题,我在消费我的topic里面数据的时候,很多次都进入了:TimeUnit.MILLISECONDS.sleep(100);,但是此时这个topic下面的各个partition是有数据的,这个时候为什么没有拉去到数据呢?然后我把topic换成别人的topic后,我发现消费速度就很快,很少进入:TimeUnit.MILLISECONDS.sleep(100);,这两个topic都是同一个kafka,都是5个partition。示例代码如下,期待您的回复:

import com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;

import java.util.Properties;
import java.util.concurrent.TimeUnit;

public class T1 {

public static void main(String[] args) throws InterruptedException {
    Properties properties = new Properties();
    properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
    properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "102.31.117.188:9092");
    properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");

    KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new BytesDeserializer());
    consumer.subscribe(Lists.newArrayList("sw-segments"));
    try {
        while (true) {
            long l = System.currentTimeMillis();
            ConsumerRecords<String, Bytes> records = consumer.poll(100);
            long l1 = System.currentTimeMillis();
            if (records.isEmpty()) {
                TimeUnit.MILLISECONDS.sleep(100);
            } else {
                System.out.println(String.format("thread:%s,time: %s,size:%s time:%s", Thread.currentThread().getName(), System.currentTimeMillis() / 1000, records.count(), l1 - l));
            }
        }
    } finally {
        consumer.close();
    }
}
}