返回到文章

采纳

编辑于

kafka生产者实例

kafka生产者实例
history
kafka_history


kafka生产者实例


相信你对kafka已经有一定的了解了,是时候编写一些代码啦。

JAVA客户端实例


Producers(生产者)


生产者类是用于创建新消息为一个特定的主题和可选的分区。

这个例子是java的,你需要引入相关的包。


import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;


第一步是定义属性,如果使producer发现集群,序列化消息,如果适当的指导消息到一个特定的分区。


Properties props = new Properties();

props.put("metadata.broker.list", "broker1:9092,broker2:9092");
props.put("serializer.class", "kafka.serializer.StringEncoder");
props.put("partitioner.class", "example.producer.SimplePartitioner");
props.put("request.required.acks", "1");

ProducerConfig config = new ProducerConfig(props);


metadata.broker.list:broker集群的地址,不用配置全部的broker地址,它会关联到其他的broker。你也不用担心topic或分区在哪个broker下,它会找到对应的broker。



serializer.class:指定采用哪种序列化方式将消息传输给Broker,你也可以在发送消息的时候指定序列化类型,不指定则以此为默认序列化类型。



partitioner.clss:指定消息发送对应分区方式,若不指定,则随机发送到一个分区,也可以在发送消息的时候指定分区类型。



request.required.acks:指定消息是否确定已发送成功,如果不设置值,则默认为“发送或不确认‘,可能会导致数据丢失。







现在开始定义producer对象:


Producer<String, String> producer = new Producer<String, String>(config);


生产者实例是一个java泛型,有两个参数类型,第一个分区key的类型,第二个是消息的类型。

现在构建要发送的消息。


Random rnd = new Random();

long runtime = new Date().getTime();

String ip = “192.168.2.” + rnd.nextInt(255);

String msg = runtime + “,www.example.com,” + ip;

在这个例子中我们假消息的网站访问IP地址。以逗号分隔,时间戳,第二个是网站地址,第三是请求者的IP地址。我们这里使用Java类随机的最后八位字节的IP不同,所以我们可以看到分区是如何工作的。


最后写发送broker的消息类:


KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);


producer.send(data);

Topic是“page_visits”。这里我们通过IP作为分区键。注意,如果您不包括一个partition,即使你已经定义了一个partitioner,kafka将消息分配给一个随机的分区。






完整源码


import java.util.*;
 
import kafka.javaapi.producer.Producer;
import kafka.producer.KeyedMessage;
import kafka.producer.ProducerConfig;
 
public class TestProducer {
    public static void main(String[] args) {
        long events = Long.parseLong(args[0]);
        Random rnd = new Random();
 
        Properties props = new Properties();
        props.put("metadata.broker.list", "broker1:9092,broker2:9092 ");
        props.put("serializer.class", "kafka.serializer.StringEncoder");
        props.put("partitioner.class", "example.producer.SimplePartitioner");
        props.put("request.required.acks", "1");
 
        ProducerConfig config = new ProducerConfig(props);
 
        Producer<String, String> producer = new Producer<String, String>(config);
 
        for (long nEvents = 0; nEvents < events; nEvents++) { 
               long runtime = new Date().getTime();  
               String ip = “192.168.2.” + rnd.nextInt(255); 
               String msg = runtime + “,www.example.com,” + ip; 
               KeyedMessage<String, String> data = new KeyedMessage<String, String>("page_visits", ip, msg);
               producer.send(data);
        }
        producer.close();
    }
}


分区代码:


import kafka.producer.Partitioner;
import kafka.utils.VerifiableProperties;

public class SimplePartitioner implements Partitioner {
public SimplePartitioner (VerifiableProperties props) {

}

public int partition(Object key, int a_numPartitions) {
int partition = 0;
String stringKey = (String) key;
int offset = stringKey.lastIndexOf('.');
if (offset > 0) {
partition = Integer.parseInt( stringKey.substring(offset+1)) % a_numPartitions;
}
return partition;
}

}


这段逻辑的关键,我们得到的IP地址,取得最后一个字节,并进行分区数模运算,得出相应的分区,好处是相同的源ip划分到相同的分区里。但是你在消费的时候,要知道如何处理。



运行此之前,确保您已经创建topic:"page_visits"。从命令行:


bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 5 --topic page_visits


使用--partition选项创建1个以上的分区。



现在编译运行程序。



执行下面命令,确认是否已经有数据了。


bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic page_visits --from-beginning

Maven
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.9.2</artifactId>
<version>0.8.1.1</version>
<scope>compile</scope>
<exclusions>
<exclusion>
<artifactId>jmxri</artifactId>
<groupId>com.sun.jmx</groupId>
</exclusion>
<exclusion>
<artifactId>jms</artifactId>
<groupId>javax.jms</groupId>
</exclusion>
<exclusion>
<artifactId>jmxtools</artifactId>
<groupId>com.sun.jdmk</groupId>
</exclusion>
</exclusions>
</dependency>