返回到文章

采纳

编辑于

手把手教你写Kafka Streams程序

kafka


在本指南中,我们将从头开始帮助你搭建自己的Kafka Streams流处理程序。 强烈建议您首先阅读快速入门,了解如何运行使用Kafka Streams编写的Streams应用程序(如果尚未这样做)。

设置Maven项目

我们将使用Kafka Streams Maven Archetype来创建Streams项目结构:

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.kafka \
    -DarchetypeArtifactId=streams-quickstart-java \
    -DarchetypeVersion=2.8.0 \
    -DgroupId=streams.examples \
    -DartifactId=streams.examples \
    -Dversion=0.1 \
    -Dpackage=myapps

如果你需要,您可以为groupId,artifactId和package设置不同的值。假设您使用上述参数值,该命令将创建一个如下所示的项目结构:

> tree streams.examples
streams-quickstart
|-- pom.xml
|-- src
    |-- main
        |-- java
        |   |-- myapps
        |       |-- LineSplit.java
        |       |-- Pipe.java
        |       |-- WordCount.java
        |-- resources
            |-- log4j.properties

项目中包含的pom.xml文件已经定义了Streams依赖项,并且在src/main/java已经有几个Streams示例程序。 既然我们要从头开始编写这样的程序,现在我们先删除这些例子:

> cd streams-quickstart
> rm src/main/java/myapps/*.java

编写第一个Streams应用程序:Pipe

It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under src/main/java. Let's name it Pipe.java:
现在是编码时间! 随意打开你最喜欢的IDE并导入这个Maven项目,或者直接打开一个文本编辑器并在src/main/java下创建一个java文件。 我们将其命名为Pipe.java

package myapps;

public class Pipe {

    public static void main(String[] args) throws Exception {

    }
}

我们在main中来编写这个pipe程序。请注意,由于IDE通常可以自动添加导入语句,因此我们不会列出导入语句。但是,如果您使用的是文本编辑器,则需要手动添加导入,并且在本节末尾,我们将为您显示带有导入语句的完整代码段。

编写Streams应用程序的第一步是创建一个java.util.Properties映射来指定StreamsConfig中定义的不同Streams执行配置值。 需要设置的几个重要配置值:StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,它指定用于建立初始连接到Kafka集群的host/port列表,以及StreamsConfig.APPLICATION_ID_CONFIG,它提供了Streams的唯一标识符应用程序与其他应用程序进行区分:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");  // 假设这个应用程序和集群在同一台机器运行。

另外,你也可以自定义其他配置,例如设置消息key-value对的默认序列化和反序列:

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

有关Kafka Streams的完整配置列表,请参阅这里

接下来我们将定义Streams应用程序的计算逻辑。在Kafka Streams中,这种计算逻辑被定义为连接处理器节点的拓扑结构。我们可以使用拓扑构建器来构建这样的拓扑,

final StreamsBuilder builder = new StreamsBuilder();

然后使用此拓扑构建器,创建主题为streams-plaintext-input源流(ps:就是数据的来源):

KStream<String, String> source = builder.stream("streams-plaintext-input");

现在我们得到一个KStream,它不断的从来源主题streams-plaintext-input获取消息。消息是String类型的key-value对。我们可以用这个流做的最简单的事情就是将它写入另一个Kafka主题streams-pipe-output中:

source.to("streams-pipe-output");

请注意,我们也可以将上面两行连接成一行,如下所示:

builder.stream("streams-plaintext-input").to("streams-pipe-output");

我们可以通过执行以下操作来检查此构建器创建的拓扑结构类型:

final Topology topology = builder.build();

将描述输出:

System.out.println(topology.describe());

如果我们现在编译并运行程序,它会输出以下信息:

> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.Pipe
Sub-topologies:
  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
Global Stores:
  none

如上所示,它说明构建的拓扑有两个处理器节点,源节点KSTREAM-SOURCE-0000000000和sink节点KSTREAM-SINK-0000000001KSTREAM-SOURCE-0000000000连续读取Kafka主题streams-plaintext-input的消息,并将它们传送到其下游节点KSTREAM-SINK-0000000001; KSTREAM-SINK-0000000001会将其接收到的每条消息写入另一个Kafka主题streams-pipe-output中( --><-- 箭头指示该节点的下游和上游处理器节点,即在拓扑图中的“children”和“parents“)。 它还说明,这种简单的拓扑没有与之相关联的全局状态存储(我们将在后面的章节中更多地讨论状态存储)。

请注意,我们总是可以像在上面那样在任何给定点上描述拓扑,而我们正在代码中构建它,因此作为用户,您可以交互式地“尝试并品尝”拓扑中定义的计算逻辑,直到你满意为止。假设我们已经完成了这个简单的拓扑结构,它只是以一种无尽的流式方式将数据从一个Kafka主题管道传输到另一个主题,我们现在可以使用我们刚刚构建的两个组件构建Streams客户端:配置map和拓扑对象(也可以从props map构造一个StreamsConfig对象,然后将该对象传递给构造函数,可以重载KafkaStreams构造函数来实现任一类型)。

final KafkaStreams streams = new KafkaStreams(topology, props);

通过调用它的start()函数,我们可以触发这个客户端的执行。在此客户端上调用close()之前,执行不会停止。 例如,我们可以添加一个带有倒计时的shutdown hook来捕获用户中断,并在终止该程序时关闭客户端:

final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
        streams.close();
        latch.countDown();
    }
});

try {
    streams.start();
    latch.await();
} catch (Throwable e) {
    System.exit(1);
}
System.exit(0);

到目前为止,完整的代码如下所示:

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;

import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class Pipe {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        builder.stream("streams-plaintext-input").to("streams-pipe-output");

        final Topology topology = builder.build();

        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // attach shutdown handler to catch control-c
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
            @Override
            public void run() {
                streams.close();
                latch.countDown();
            }
        });

        try {
            streams.start();
            latch.await();
        } catch (Throwable e) {
            System.exit(1);
        }
        System.exit(0);
    }
}

如果您已经在localhost:9092上运行了Kafka,并且创建了主题streams-plaintext-inputstreams-pipe-output,则可以在IDE或命令行上使用Maven运行此代码:

> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.Pipe

有关如何运行Streams应用程序并观察计算结果的详细说明,请阅读Play with a Streams部分。本节的其余部分我们不会谈论这一点。

编写第二个Streams应用程序:Line Split

我们已经学会了如何构建Streams客户端及其两个关键组件:StreamsConfig和Topology。 现在让我们继续通过增加当前拓扑来添加一些实际的处理逻辑。我们可以首先复制现有的Pipe.java类来创建另一个程序:

    > cp src/main/java/myapps/Pipe.java src/main/java/myapps/LineSplit.java

并更改其类名以及应用程序ID配置以,与之前的程序区分开来:

public class LineSplit {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        // ...
    }
}

由于每个源流的消息都是一个字符串类型的键值对,因此让我们将值字符串视为文本行,并使用FlatMapValues运算符将其分成单词:

KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.split("\\W+"));
            }
        });

操作员将把source流作为输入,并通过按顺序处理源流中的每条消息并将其值字符串分解为一个单词列表,并生成每个单词作为输出的新消息,从而生成一个名为单词的新流。这是一个无状态的操作,无需跟踪以前收到的消息或处理结果。请注意,如果您使用的是JDK 8,则可以使用lambda表达式并简化上面的代码:

KStream<String, String> source = builder.stream("streams-plaintext-input");
KStream<String, String> words = source.flatMapValues(value -> Arrays.asList(value.split("\\W+")));

最后,我们可以将单词流写回另一个Kafka主题,比如说stream-linesplit-output。 再次,这两个步骤可以如下所示连接(假设使用lambda表达式):

KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
      .to("streams-linesplit-output");

如果我们现在将此扩展拓扑描述打印出来System.out.println(topology.describe()),我们将得到以下结果:

> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.LineSplit
Sub-topologies:
  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-SINK-0000000002 <-- KSTREAM-SOURCE-0000000000
    Sink: KSTREAM-SINK-0000000002(topic: streams-linesplit-output) <-- KSTREAM-FLATMAPVALUES-0000000001
  Global Stores:
    none

正如我们上面看到的,一个新的处理器节点KSTREAM-FLATMAPVALUES-0000000001被注入到原始源节点和sink节点之间的拓扑中。 它将源节点作为其父节点,将sink节点作为其子节点。换句话说,源节点获取的每个消息,将首先遍历新加入的KSTREAM-FLATMAPVALUES-0000000001节点进行处理,并且结果将生成一个或多个新消息。它们将继续往下走到sink节点回写给kafka。注意这个处理器节点是“无状态的”,因为它不与任何仓库相关联(即(stores:[]))。

完整的代码如下所示(假设使用lambda表达式):

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class LineSplit {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-linesplit");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.split("\\W+")))
              .to("streams-linesplit-output");

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // ... same as Pipe.java above
    }
}

编写第三个Streams应用程序:Wordcount

现在让我们进一步通过计算源文本流中单词的出现,来向拓扑中添加一些“有状态”计算。按照类似的步骤,我们创建另一个基于LineSplit.java类的程序:

public class WordCount {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        // ...
    }
}

为了计算单词,我们可以首先修改flatMapValues,将它们全部作为小写字母(假设使用lambda表达式):

source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        });

我们必须首先指定我们要关键流的字符串value,即小写单词,用groupBy操作。该运算符生成一个新的分组流,然后可以由一个计数操作员汇总,该操作员可以在每个分组键上生成一个运行计数:

KTable<String, Long> counts =
source.flatMapValues(new ValueMapper<String, Iterable<String>>() {
            @Override
            public Iterable<String> apply(String value) {
                return Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+"));
            }
        })
      .groupBy(new KeyValueMapper<String, String, String>() {
           @Override
           public String apply(String key, String value) {
               return value;
           }
        })
      // Materialize the result into a KeyValueStore named "counts-store".
      // The Materialized store is always of type <Bytes, byte[]> as this is the format of the inner most store.
      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>> as("counts-store"));

请注意,count运算符有Materialized参数,该参数指定运行计数应存储在名为counts-store的状态存储中。 此Counts仓库可以实时查询,详情请参阅开发者手册。

请注意,为了从主题streams-wordcount-output读取changelog流,需要将值反序列化设置为org.apache.kafka.common.serialization.LongDeserializer。假设可以使用JDK 8的lambda表达式,上面的代码可以简化为:

KStream<String, String> source = builder.stream("streams-plaintext-input");
source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
      .groupBy((key, value) -> value)
      .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
      .toStream()
      .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long()));

如果我们再次将这种扩展拓扑描述为System.out.println(topology.describe()),我们将得到以下结果:

> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.WordCount
Sub-topologies:
  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FLATMAPVALUES-0000000001(stores: []) --> KSTREAM-KEY-SELECT-0000000002 <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-KEY-SELECT-0000000002(stores: []) --> KSTREAM-FILTER-0000000005 <-- KSTREAM-FLATMAPVALUES-0000000001
    Processor: KSTREAM-FILTER-0000000005(stores: []) --> KSTREAM-SINK-0000000004 <-- KSTREAM-KEY-SELECT-0000000002
    Sink: KSTREAM-SINK-0000000004(topic: Counts-repartition) <-- KSTREAM-FILTER-0000000005
  Sub-topology: 1
    Source: KSTREAM-SOURCE-0000000006(topics: Counts-repartition) --> KSTREAM-AGGREGATE-0000000003
    Processor: KSTREAM-AGGREGATE-0000000003(stores: [Counts]) --> KTABLE-TOSTREAM-0000000007 <-- KSTREAM-SOURCE-0000000006
    Processor: KTABLE-TOSTREAM-0000000007(stores: []) --> KSTREAM-SINK-0000000008 <-- KSTREAM-AGGREGATE-0000000003
    Sink: KSTREAM-SINK-0000000008(topic: streams-wordcount-output) <-- KTABLE-TOSTREAM-0000000007
Global Stores:
  none

如上所述,拓扑现在包含两个断开的子拓扑。第一个子拓扑的接收节点KSTREAM-SINK-0000000004将写入一个重新分区主题Counts-repartition,它将由第二个子拓扑的源节点KSTREAM-SOURCE-0000000006读取。重分区topic通过使用聚合键“shuffle”的源流,在这种情况下,聚合键为值字符串。此外,在第一个子拓扑结构内部,在分组KSTREAM-KEY-SELECT-0000000002节点和sink节点之间注入无状态的KSTREAM-FILTER-0000000005节点,以过滤出聚合key为空的任何中间记录。

在第二个子拓扑中,聚合节点KSTREAM-AGGREGATE-0000000003与名为Counts的状态存储相关联(名称由用户在count运算符中指定)。在即将到来的流源节点接收到每个消息时,聚合处理器将首先查询其关联的Counts存储以获得该密钥的当前计数,并将其增加1,然后将新计数写回仓库。将每个更新的key计数传送到KTABLE-TOSTREAM-0000000007节点,KTABLE-TOSTREAM-0000000007节点将该更新流解释为消息流,然后再传输到汇聚节点KSTREAM-SINK-0000000008以写回Kafka。

完整的代码如下所示(假设使用lambda表达式):

package myapps;

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.KStream;

import java.util.Arrays;
import java.util.Locale;
import java.util.Properties;
import java.util.concurrent.CountDownLatch;

public class WordCount {

    public static void main(String[] args) throws Exception {
        Properties props = new Properties();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-wordcount");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

        final StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> source = builder.stream("streams-plaintext-input");
        source.flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split("\\W+")))
              .groupBy((key, value) -> value)
              .count(Materialized.<String, Long, KeyValueStore<Bytes, byte[]>>as("counts-store"))
              .toStream()
              .to("streams-wordcount-output", Produced.with(Serdes.String(), Serdes.Long());

        final Topology topology = builder.build();
        final KafkaStreams streams = new KafkaStreams(topology, props);
        final CountDownLatch latch = new CountDownLatch(1);

        // ... same as Pipe.java above
    }
}