生产者类是用于创建新消息为一个特定的主题和可选的分区。
这个例子是java的,你需要引入相关的包。
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
第一步是定义属性,如果使producer发现集群,序列化消息,如果适当的指导消息到一个特定的分区。
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
metadata.broker.list:broker集群的地址,不用配置全部的broker地址,它会关联到其他的broker。你也不用担心topic或分区在哪个broker下,它会找到对应的broker。
serializer.class:指定采用哪种序列化方式将消息传输给Broker,你也可以在发送消息的时候指定序列化类型,不指定则以此为默认序列化类型。
partitioner.clss:指定消息发送对应分区方式,若不指定,则随机发送到一个分区,也可以在发送消息的时候指定分区类型。
request.required.acks:指定消息是否确定已发送成功,如果不设置值,则默认为“发送或不确认‘,可能会导致数据丢失。
现在开始定义producer对象:
Producer<String, String> producer = new Producer<String, String>(config);
生产者实例是一个java泛型,有两个参数类型,第一个分区key的类型,第二个是消息的类型。
现在构建要发送的消息。
Random rnd = new Random();
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,www.example.com,” + ip;
最后写发送broker的消息类:
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
import java.util.*;
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
public class TestProducer {
public static void main(String[] args) {
long events = Long.parseLong(args[0]);
Random rnd = new Random();
Properties props = new Properties();
props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");
ProducerConfig config = new ProducerConfig(props);
Producer<String, String> producer = new Producer<String, String>(config);
for (long nEvents = 0; nEvents < events; nEvents++) {
long runtime = new Date().getTime();
String ip = “192.168.2.” + rnd.nextInt(255);
String msg = runtime + “,www.example.com,” + ip;
KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
producer.send(data);
}
producer.close();
}
}
分区代码:
import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;
public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {
}
public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}
}
这段逻辑的关键,我们得到的IP地址,取得最后一个字节,并进行分区数模运算,得出相应的分区,好处是相同的源ip划分到相同的分区里。但是你在消费的时候,要知道如何处理。
运行此之前,确保您已经创建topic:"page_visits"。从命令行:
bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic page_visits
使用--partition选项创建1个以上的分区。
现在编译运行程序。
执行下面命令,确认是否已经有数据了。
bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginning
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>