手把手教你写Kafka Streams程序

半兽人 发表于: 2018-05-16   最后更新时间: 2018-05-24  
  •   252 订阅,107 游览

在本指南中,我们将从头开始帮助你搭建自己的Kafka Streams流处理程序。 强烈建议您首先阅读快速入门,了解如何运行使用Kafka Streams编写的Streams应用程序(如果尚未这样做)。

设置Maven项目

我们将使用Kafka Streams Maven Archetype来创建Streams项目结构:

mvn archetype:generate \
    -DarchetypeGroupId=org.apache.kafka \
    -DarchetypeArtifactId=streams-quickstart-java \
    -DarchetypeVersion=1.1.0 \
    -DgroupId=streams.examples \
    -DartifactId=streams.examples \
    -Dversion=0.1 \
    -Dpackage=myapps

如果你需要,您可以为groupId,artifactId和package设置不同的值。假设您使用上述参数值,该命令将创建一个如下所示的项目结构:

> tree streams.examples
streams-quickstart
|-- pom.xml
|-- src
    |-- main
        |-- java
        |   |-- myapps
        |       |-- LineSplit.java
        |       |-- Pipe.java
        |       |-- WordCount.java
        |-- resources
            |-- log4j.properties

项目中包含的pom.xml文件已经定义了Streams依赖项,并且在src/main/java已经有几个Streams示例程序。 既然我们要从头开始编写这样的程序,现在我们先删除这些例子:

> cd streams-quickstart
> rm src/main/java/myapps/*.java

编写第一个Streams应用程序:Pipe

It's coding time now! Feel free to open your favorite IDE and import this Maven project, or simply open a text editor and create a java file under src/main/java. Let's name it Pipe.java:
现在是编码时间! 随意打开你最喜欢的IDE并导入这个Maven项目,或者直接打开一个文本编辑器并在src/main/java下创建一个java文件。 我们将其命名为Pipe.java

package myapps;

public class Pipe {

    public static void main(String[] args) throws Exception {

    }
}

我们在main中来编写这个pipe程序。请注意,由于IDE通常可以自动添加导入语句,因此我们不会列出导入语句。但是,如果您使用的是文本编辑器,则需要手动添加导入,并且在本节末尾,我们将为您显示带有导入语句的完整代码段。

编写Streams应用程序的第一步是创建一个java.util.Properties映射来指定StreamsConfig中定义的不同Streams执行配置值。 需要设置的几个重要配置值:StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,它指定用于建立初始连接到Kafka集群的host/port列表,以及StreamsConfig.APPLICATION_ID_CONFIG,它提供了Streams的唯一标识符应用程序与其他应用程序进行区分:

Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "streams-pipe");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");    // assuming that the Kafka broker

假设这个应用程序和集群在同一台机器运行。

另外,你也可以自定义其他配置,例如设置消息key-value对的默认序列化和反序列:

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

有关Kafka Streams的完整配置列表,请参阅这里

接下来我们将定义Streams应用程序的计算逻辑。在Kafka Streams中,这种计算逻辑被定义为连接处理器节点的拓扑结构。我们可以使用拓扑构建器来构建这样的拓扑,

final StreamsBuilder builder = new StreamsBuilder();

然后使用此拓扑构建器,创建主题为streams-plaintext-input源流(ps:就是数据的来源):

KStream<String, String> source = builder.stream("streams-plaintext-input");

现在我们得到一个KStream,它不断的从来源主题streams-plaintext-input获取消息。消息是String类型的key-value对。我们可以用这个流做的最简单的事情就是将它写入另一个Kafka主题streams-pipe-output中:

source.to("streams-pipe-output");

请注意,我们也可以将上面两行连接成一行,如下所示:

builder.stream("streams-plaintext-input").to("streams-pipe-output");

我们可以通过执行以下操作来检查此构建器创建的拓扑结构类型:

final Topology topology = builder.build();

将描述输出:

System.out.println(topology.describe());

如果我们现在编译并运行程序,它会输出以下信息:

> mvn clean package
> mvn exec:java -Dexec.mainClass=myapps.Pipe
Sub-topologies:
  Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000(topics: streams-plaintext-input) --> KSTREAM-SINK-0000000001
    Sink: KSTREAM-SINK-0000000001(topic: streams-pipe-output) <-- KSTREAM-SOURCE-0000000000
Global Stores:
  none

如上所示,它说明构建的拓扑有两个处理器节点,源节点KSTREAM-SOURCE-0000000000和sink节点KSTREAM-SINK-0000000001KSTREAM-SOURCE-0000000000连续读取Kafka主题streams-plaintext-input的消息,并将它们传送到其下游节点KSTREAM-SINK-0000000001; KSTREAM-SINK-0000000001会将其接收到的每条消息写入另一个Kafka主题streams-pipe-output中( --><-- 箭头指示该节点的下游和上游处理器节点,即在拓扑图中的“children”和“parents“)。 它还说明,这种简单的拓扑没有与之相关联的全局状态存储(我们将在后面的章节中更多地讨论状态存储)。

请注意,我们总是可以像在上面那样在任何给定点上描述拓扑,而我们正在代码中构建它,因此作为用户,您可以交互式地“尝试并品尝”拓扑中定义的计算逻辑,直到你满意为止。假设我们已经完成了这个简单的拓扑结构,它只是以一种无尽的流式方式将数据从一个Kafka主题管道传输到另一个主题,我们现在可以使用我们刚刚构建的两个组件构建Streams客户端:配置map和拓扑对象(也可以从props map构造一个StreamsConfig对象,然后将该对象传递给构造函数,可以重载KafkaStreams构造函数来实现任一类型)。

final KafkaStreams streams = new KafkaStreams(topology, props);

通过调用它的start()函数,我们可以触发这个客户端的执行。在此客户端上调用close()之前,执行不会停止。 例如,我们可以添加一个带有倒计时的shutdown hook来捕获用户中断,并在终止该程序时关闭客户端:

final CountDownLatch latch = new CountDownLatch(1);

// attach shutdown handler to catch control-c
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook") {
    @Override
    public void run() {
        streams.close();
        latch.countDown();
    }
});

try {
    streams.start();
    latch.await();
} catch (Throwable e) {
    System.exit(1);
}
System.exit(0);






发表于: 7天前   最后更新时间: 24分钟前   游览量:107
上一条: Kafka Stream演示程序
下一条:

评论…


  • 评论…
    • in this conversation
      提问