以下为代码:
KafkaConsumer<String, String> consumer = KafkaUtil.getConsumer(properties);
consumer.subscribe(Arrays.asList("kafkaSync"));
boolean flag = true;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
if (flag) {
Set<TopicPartition> assignments = consumer.assignment();
Map<TopicPartition, Long> query = new HashMap<>();
for (TopicPartition topicPartition : assignments) {
System.out.println(topicPartition); query.put(topicPartition,Long.valueOf(properties.getProperty("startTime")));
}
Map<TopicPartition, OffsetAndTimestamp> result = consumer.offsetsForTimes(query);
for(Entry<TopicPartition,OffsetAndTimestamp> entry:result.entrySet()){
System.out.println(entry);
consumer.seek(entry.getKey(), entry.getValue().offset());
}
flag = false;
}
for (ConsumerRecord<String, String> record : records){
System.out.printf("offset = %d, key = %s, value = %s%n", record.offset(), record.key(), record.value());
}
返回的result结果为空