KafkaStreams客户端(0.10.1.1 API)

原创
半兽人 发表于: 2017-02-15   最后更新时间: 2020-08-10 18:37:09  
{{totalSubscript}} 订阅, 29,769 游览

Kafka Streams从一个或多个输入topic进行连续的计算并输出到0或多个外部topic中。

可以通过TopologyBuilder类定义一个计算逻辑处理器DAG拓扑。或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。(PS:计算逻辑其实就是自己的代码逻辑)

KafkaStreams类管理Kafka Streams实例的生命周期。一个stream实例可以在配置文件中为处理器指定一个或多个Thread。

KafkaStreams实例可以作为单个streams处理客户端(也可能是分布式的),与其他的相同应用ID的实例进行协调(无论是否在同一个进程中,在同一台机器的其他进程中,或远程机器上)。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以保证负载平衡。

在内部,KafkaStreams实例包含一个正常的KafkaProducerKafkaConsumer实例,用于读取和写入,

一个简单的例子:

Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsConfig config = new StreamsConfig(props);

KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
更新于 2020-08-10

案例该换了

好的,我最近更新一下

光年 3年前

案例是不是不符合新版本了?

半兽人 -> 光年 3年前

符合的。

Nekonata 6年前
 props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
 props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());

StreamsConfig.KEY_SERDE_CLASS_CONFIG,StreamsConfig.VALUE_SERDE_CLASS_CONFIG已经@deprecated了....

半兽人 -> Nekonata 6年前

收到,稍后我看看官网是否已经更新。

liebe41 6年前
Exception in thread "StreamThread-1" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
 at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)

这是啥原因呢

半兽人 -> liebe41 6年前

类型转换错误。

liebe41 -> 半兽人 6年前

嗯嗯,这个我知道,但是我现在不知道这是怎么引起的

public class KafkaStream {

    public static void main(String[] args){
        Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.STATE_DIR_CONFIG,"{test}");
        props.put(StreamsConfig.APPLICATION_ID_CONFIG,"my-stream-processing-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "140.143.67.114:9092");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        StreamsConfig config = new StreamsConfig(props);

        KStreamBuilder builder = new KStreamBuilder();
        builder.stream("test").mapValues(value -> value.toString().length()).to("test-out");

        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
    }
}

上面是kafkaStream的配置

public class MyProducer {
    public static void main(String args[]){
        Properties props = new Properties();
        props.put("bootstrap.servers", "140.143.67.114:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

        Producer<String, String> producer = new KafkaProducer<>(props);
        for(int i = 100; i < 200; i++)
            producer.send(new ProducerRecord<>("my-test", Integer.toString(i)+"key", Integer.toString(i)+"val"));

        producer.close();
    }
}

上面是Producer的配置
不知道是什么原因引起的类型转换错误

半兽人 -> liebe41 6年前

流方法实现中,int转string错误。无法转。

cunonmh 7年前
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-streams</artifactId>
<version>0.10.0.0</version>
</dependency>

在 pom.xml 中添加了这个 但还是提示 invalid content was found starting with element ‘dependency'

半兽人 -> cunonmh 7年前

这个错误,可是很少见那。检查pom文件额。

落樱留独殇 7年前

刚接触kafka,一直在拜读您的文章,请教一下,这个例子怎么实现向两个topic输出?比如从my-input-topic 到 my-output-topic1、my-output-topic2

我在例子中这样将结果输出到两个topic的

wordCounts.toStream().to("streams-plaintext-output", Produced.with(Serdes.String(), Serdes.Long()));
wordCounts.toStream().to("my-replicated-topic", Produced.with(Serdes.String(), Serdes.Long()));
孤鹜齐飞 7年前

测试了,发现没有任何输出,请问是什么原因呢?程序如下:

Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-sh1-core3:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsConfig config = new StreamsConfig(props);

KStreamBuilder builder = new KStreamBuilder();
builder.stream("streams-file-input").mapValues(value -> value.toString()).to("streams-file-input");

KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
Ntt晨 -> 孤鹜齐飞 7年前
builder.stream("streams-file-input").mapValues(value -> value.toString()).to("streams-file-input");

输出不对吧

孤鹜齐飞 7年前

请教,编译错误 Error:java: Compilation failed: internal java compiler error 怎么回事呢?

找到解决方法了,原来是java complier 设置的问题 

孤鹜齐飞 7年前

纠正一下:上面的Lambda表达式表述有误,应该是

value -> value.toString().length()

StreamsConfig、KStreamBuilder、KafkaStreams 这几个类怎么没有,idea中没有提示

有的

org.apache.kafka.streams.StreamsConfig
org.apache.kafka.streams.kstream.KStreamBuilder
org.apache.kafka.streams.KafkaStream

哦,我看 《消费者客户端(0.10.0.1API)》这章节是用的0.10.0.1,没注意这章节用的是0.10.1.1 最好能统一下版本,不知道是不是笔误,正在拜读,谢谢大神!

不好意思我看错了,我没有引入 kafka-streams 依赖

没事 多多交流

查看kafka更多相关的文章或提一个关于kafka的问题,也可以与我们一起分享文章