消费者维持TCP连接到必要的broker来获取消息。故障导致消费者关闭使用,会泄露这些连接,消费者不是线程安全的,可以查看更多关于Multi-threaded(多线程)处理的细节。
kafka为每个分区的每条消息保持偏移量的值,这个偏移量是该分区中一条消息的唯一标示符。也表示消费者在分区的位置。也就是说,一个位置是5的消费者,说明已经消费了0到4的消息(记录)并下一个接收消息的偏移量设置为5。关于的消费者,实际上“位置”有2个概念。
消费者将给出下一个消息的偏移量的位置,这个是消费者在分区中能看到的最后的偏移量,消费者收到的数据称为poll(long)[长轮询],每次接收消息,偏移量会自动增长,
“已提交”的位置是已安全保存的最后偏移量,如果处理失败,这个偏移量会恢复并重新开始。消费者可以自动定期提交偏移量,也可以选择通过调用commitSync来控制,这是阻塞的,直到偏移量提交成功或在提交过程中发生致命的错误,commitAsync是非阻塞式的,当成功或失败时,会引发OffsetCommitCallback。
这个区别是当一条消息已认为已被消费,控制权在消费者,下面我们进一步更详细地讨论。
Kafka使用消费者组概念,允许进程池瓜分消费和处理消息的工作。这些进程可以在同一台机器运行,或更可能的是,可以部分到多台机器上,以提供额外的可扩展性和容错性处理。
每个kafka消费者都能配置一个属于它自己的消费者组。并可以动态的配置它需要订阅的主题列表,通过subscribe(List, ConsumerRebalanceListener)
,或订阅匹配特定模式的主题,通过subscribe(Pattern, ConsumerRebalanceListener)
,kafka将已订阅主题的每个消费者组中的每条消息发送给一个进程。这是通过在每个组的消费者进程平衡主题的分区来实现的。 所有,如果一个主题有4个分区,并且一个消费者组有2个进程,每个进程将从2个分区来进行消费,这个是动态维护的:如果一个进程故障,分区将重新分派到同组的其他的进程。如果有新的进程加入该组,分区将现有消费者移动到新的进程。
所以,如果2个进程订阅了一个主题,指定不同的组,他们将获取这个主题所有的消息,如果他们指定相同的组,那么它们将每个获取大约一半的消息。
从概念上讲,你可以把消费者组看作一个单一的逻辑用户(订阅者),碰巧组成了多个进程。作为一个多用户系统。kafka也支持任意数量的消费者组提供一个指定的主题不重复的数据,
这是关于常用消息系统功能的简单的概述,类似于传统消息系统中的队列,所有进程将是一个单独的消费组的一部分,因此,消息的交付由该组进行平衡,类似于队列。与传统的消息传递系统不同的是,你可以有多个这样的组。在传统的消息传递系统中,每个进程都有它自己的消费组,所以每个进程都会订阅发布到主题的所有消息。
此外,当组重新分配自动发生,消费者可以通过调用ConsumerRebalanceListener
通知,这使得他们能够完成必要的应用程序级的逻辑,如状态清除,手动偏移提交(注意,指定的消费者组的偏移量总是已经提交的)
它也有可能为消费者手动指定分配给它通过assign(List)
,这将禁用动态分区分配。
这个消费者API提供了灵活性,以涵盖各种消费场景,下面是一些例子来演示如何使用它们。
这是个【自动提交偏移量】的简单的kafka消费者API。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
enable.auto.commit
,偏移量由auto.commit.interval.ms
控制自动提交的频率。foo
和bar
。消费者组叫test
。ping
集群,告诉进群它还活着。只要消费者能够做到这一点,它就被认为是活着的,并保留分配给它分区的权利,如果它停止心跳的时间超过session.timeout.ms
,那么就会认为是故障的,它的分区将被分配到别的进程。
这个deserializer
设置是指定如何去把byte转为为object类型,例子中,通过指定string的 deserializers,我们告诉我们获取到的消息的key和value只是简单的string类型。
不依赖于定期提交偏移量,你可以自己控制偏移量,当消息认为已消费过了,这个时候再去提交它们的偏移量。这个是很有用的,当消费的消息结合了一些处理逻辑,这个消息就不应该认为是已经消费的,直到它完成了整个处理。在这里例子中,当我们有足够的消息进行批处理时我们将它们插入到数据。它们接收了消费者的消息之后,消息将被认为已经消费了,这个时候我们的过程失败了,我们读取我们的内存缓存区的消息,有可能他们已被插入到数据库中了。为了避免这一点,我们将手动提交的偏移量,当相应的消息已被插入到数据库中。我们准确控制一条消息才被认为是消费的。提出了一个相反的可能性:在插入到数据库中,但在提交之前,这个过程可能会失败(尽管这可能只有几毫秒,但它是一种可能性)。在这种情况下,进程将获取到已提交的偏移量,并会重复插入的最后一批数据。用这种方式,被称为“至少一次”
担保,因为每个消息可能会提供一次,但在故障情况下,可以重复。
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "false");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
final int minBatchSize = 200;
List<ConsumerRecord<String, String>> buffer = new ArrayList<>();
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
buffer.add(record);
}
if (buffer.size() >= minBatchSize) {
insertIntoDb(buffer);
consumer.commitSync();
buffer.clear();
}
}
try {
while(running) {
ConsumerRecords<String, String> records = consumer.poll(Long.MAX_VALUE);
for (TopicPartition partition : records.partitions()) {
List<ConsumerRecord<String, String>> partitionRecords = records.records(partition);
for (ConsumerRecord<String, String> record : partitionRecords) {
System.out.println(record.offset() + ": " + record.value());
}
long lastOffset = partitionRecords.get(partitionRecords.size() - 1).offset();
consumer.commitSync(Collections.singletonMap(partition, new OffsetAndMetadata(lastOffset + 1)));
}
}
} finally {
consumer.close();
}
注:已提交的偏移量应该一直是你的应用程序将读取的下一条消息来的偏移量。因此,调用commitSync(offsets)时,你应该添加最后一条消息的偏移量。
在前面的例子中,我们订阅我们感兴趣的主题,让kafka提供给我们平分后的主题的分区,它提供了一个简单的负载平衡机制,所以我们的程序通过多个实例来瓜分处理这些消息。
在这种模式下,消费者将只会获取它订阅的分区,如果消费者实例故障,不会尝试重新平衡分区到其他的实例。
这种模式很容易指定,不是订阅主题,只需要消费者订阅特定的分区即可:
String topic = "foo";
TopicPartition partition0 = new TopicPartition(topic, 0);
TopicPartition partition1 = new TopicPartition(topic, 1);
consumer.assign(Arrays.asList(partition0, partition1));
它不能即订阅了指定的分区(无负载平衡),又使用相同的消费者实例的主题(负载平衡)。
消费者应用程序不必使用kafka内置的偏移量仓库,它可以自己选择存储偏移量的仓库。主要的一点是允许应用程序存储偏移量和偏移量的结果存储在同一个系统中,原子的消费情况,这并不一定,但将使消费完全原子,并给出“恰好一次”的语义比默认“至少一次”语义更强壮的,你要用kafka的偏移提交功能。
这有结合的例子。
每个消息都有自己的偏移,所以要管理自己的偏移,你只需要做到以下几点:
当分区分配也手动完成,这种类型的使用是最小的。(像上文搜索索引的情况).如果分区分配是自动完成的,需要特别小心处理分区分配变更的情况。可以通过提供的 ConsumerRebalanceListener
调用subscribe(Collection, ConsumerRebalanceListener)
和subscribe(Pattern, ConsumerRebalanceListener)
。例如,当分区从消费者拿一条消息,消息费想要提交这些分区的偏移量,通过执行ConsumerRebalanceListener.onPartitionsRevoked(Collection)
,当分区分配给消费者,消费者通过ConsumerRebalanceListener.onPartitionsAssigned(Collection)
,为新的分区和正确初始化位置的消费者找到偏移。
kafka允许指定位置,使用seek(TopicPartition, long)
来指定新的位置,一些特别的方法寻找最早和最晚的偏移,服务器维护也可用(seekToBeginning(Collection)
和 seekToEnd(Collection))
。
如果多个分区分配一个消费者获取的数据,它将试图同时消费所有的,有效地给这些分区为消费相同的优先级。然而在某些情况下,消费者可能首先想全速专注于获取的一些子集分配分区,当这些分区很少或已经没有消费数据了在去抓取其他分区。
还有这样一种情况,流处理,其中处理器由两个topic获取和执行这两个流的连接。当的topic之一是早已落后于其他落后,处理器想暂停为了得到滞后流赶上从前面的topic取。另一个例子是在消费者的Bootstrap启动,其中有很多历史数据的追赶中,应用程序通常想要得到的一些话题考虑获取其他topic之前的最新数据。
kafka支持动态控制消费流量,分别在future的poll(long)
调用中执行中使用 pause(Collection)
和 resume(Collection)
来暂停消费指定分配的分区,重新开始消费指定暂停的分区。