返回到文章

采纳

编辑于

kafka生产者客户端(0.9.0.1API)

生产者客户端(0.9.0.1API)
history
kafka_history


KafkaProducer(客户端0.9.0.1API)


这是kafka版本新0.9.0.1,最新的生产者客户端。建议各位使用这个。






kafka客户端发布消息到kafka集群。



这个生产者是线程安全的,在线程之间共享单个生产者实例,通常单例比多个实例要快。






一个简单的例子,使用生产者发送有序的键值对消息。

Properties props = new Properties();
props.put("bootstrap.servers", "localhost:4242");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));

producer.close();


生产者的缓冲空间池保留尚未发送到服务器的消息,后台I/O线程负责将这些消息转化成请求发送到集群。如果使用后不关闭生产者,则会泄露(丢失)这些资源。







send() 方法是异步的,添加消息到缓冲区等待发送,并立即返回。这使生产者通过批量发送消息来提高效率。







ack 配置,控制请求被视为完整的标准。我们已经指定了“all”,这会阻塞提交完整的消息,这种设置性能最低,但是是最可靠的。







retries,如果请求失败,生产者会自动重试,我们指定是0次,如果启用多次,则会有重复消息的可能性。







生产者缓存每个分区未发送消息。这些缓存的大小是通过 batch.size 配置指定的。值较大的话将会产生更多的批。但是需要更多的内存(通常每个“活动”分区都有缓冲区)。







默认缓冲可以立即发送,即使有额外未使用的缓冲空间,但是,如果你想减少请求的数量,可以设置 linger.ms 大于0。这将指示生产者发送请求之前等待一段时间,希望更多的消息填补到同一个批次。这类似于TCP的算法,例如上面的代码段,可能100条消息在一个请求发送,因为我们设置了linger(逗留)时间为1毫秒,然后,如果我们没有填满缓冲区,这个设置将增加1毫秒的延迟请求来等待更多的消息。需要注意的是,在高负载下,相近的时间一般也会组成批,不管 linger.ms=0,然后,设置比0大,将会有更少的,更有效的请求,在最大负荷时少量的延迟的成本。







buffer.memory 控制生产者可用内存缓冲的总量,如果消息发送速度比他们快可以传输到服务器的快,将会耗尽这个缓冲区空间。当缓冲区空间耗尽,其他发送调用将被阻塞,如果不想任何阻塞,你可以设置block.on.buffer.full=false,但是这将会导致发送调用异常。







key.serializer和value.serializer示例,如何将用户提供的key和value对象ProducerRecord转换成字节,您可以使用附带的ByteArraySerializaer或StringSerializer简单的string或byte类型。







send()方法介绍



public java.util.concurrent.Future<RecordMetadata> send(ProducerRecord<K,V> record,Callback callback)





异步发送消息到一个主题,然后调用提供的callback,发送确认结果。

send是异步的,并且一旦该消息已经被保存在等待发送的消息缓冲区,此方法就立即返回。这允许并行发送多条消息儿不阻塞等待每一条消息的响应。

发送的结果是一个RecordMetadata,指定的消息被发送到分配的偏移量的分区。

由于发送调用是异步的,它返回的Future被指定给该消息的RecordMetadata。如果future调用get(),将阻塞消息,直到相关请求完成并返回该消息的元数据,或抛出异常。



如果要模拟一个简单的阻塞调用,你可以立刻调用get()方法。


 byte[] key = "key".getBytes();
byte[] value = "value".getBytes();
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("my-topic", key, value)
producer.send(record).get();

完全无阻塞的话,可以利用回调参数提供的请求完成时将调用的回调通知。
ProducerRecord<byte[],byte[]> record = new ProducerRecord<byte[],byte[]>("the-topic", key, value);
producer.send(myRecord,
new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null)
e.printStackTrace();
System.out.println("The offset of the record we just sent is: " + metadata.offset());
}
});

发送到同一个分区的消息回调保证按一定的顺序执行,也就是说,在线面的例子中callback1保证执行callback2之前:
 producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key1, value1), callback1);
producer.send(new ProducerRecord<byte[],byte[]>(topic, partition, key2, value2), callback2);

注意:callback一般在生产者的I/O线程中执行,所以是相当的快的,否则会影响其他的线程的消息发送。如果你需要执行阻塞或计算昂贵(消耗)的回调,建议使用自己的Executor在callback body中并行处理。






pecified by:

send in interface Producer<K,V>

Parameters:

record - 发送的记录(消息)

callback - 用户提供的callback,由服务器来调用消息应答(空表示没有回调)。

Throws:

InterruptException - 如果线程终端而阻塞

SerializationException - 如果key或value不是配置的serializers。

BufferExhaustedException - 如果block.on.buffer.full=false,buffer是满的。







更多信息访问:https://kafka.apache.org/090/javadoc/index.html?org/apache/kafka/clients/producer/KafkaProducer.html