返回到文章

采纳

编辑于

kafka的consumer.offsetsForTimes根据timestamp查找offset返回结果为空

kafka

以下为代码:

KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(properties);
consumer.subscribe(Arrays.asList("kafkaSync"));

boolean flag = true;

while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);

if (flag) {
    Set<TopicPartition> assignments = consumer.assignment();
    Map<TopicPartition, Long> query = new HashMap<>();
    for (TopicPartition topicPartition : assignments) {
        System.out.println(topicPartition);                    query.put(topicPartition,Long.valueOf(properties.getProperty("startTime")));
    }

    Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
    for(Entry<TopicPartition,OffsetAndTimestamp> entry:result.entrySet()){
        System.out.println(entry);
        consumer.seek(entry.getKey(), entry.getValue().offset());
    }
    flag = false;
}

for (ConsumerRecord<String, String> record : records){
    System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}

返回的result结果为空