你好,我现在遇到了一个问题,我在消费我的topic里面数据的时候,很多次都进入了:TimeUnit.MILLISECONDS.sleep(100);
,但是此时这个topic下面的各个partition是有数据的,这个时候为什么没有拉去到数据呢?然后我把topic换成别人的topic后,我发现消费速度就很快,很少进入:TimeUnit.MILLISECONDS.sleep(100);
,这两个topic都是同一个kafka,都是5个partition。示例代码如下,期待您的回复:
import com.google.common.collect.Lists;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.BytesDeserializer;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.apache.kafka.common.utils.Bytes;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
public class T1 {
public static void main(String[] args) throws InterruptedException {
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer");
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "102.31.117.188:9092");
properties.setProperty(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, "500");
KafkaConsumer<String, Bytes> consumer = new KafkaConsumer<>(properties, new StringDeserializer(), new BytesDeserializer());
consumer.subscribe(Lists.newArrayList("sw-segments"));
try {
while (true) {
long l = System.currentTimeMillis();
ConsumerRecords<String, Bytes> records = consumer.poll(100);
long l1 = System.currentTimeMillis();
if (records.isEmpty()) {
TimeUnit.MILLISECONDS.sleep(100);
} else {
System.out.println(String.format("thread:%s,time: %s,size:%s time:%s", Thread.currentThread().getName(), System.currentTimeMillis() / 1000, records.count(), l1 - l));
}
}
} finally {
consumer.close();
}
}
}