Kafka Streams开发者指南

半兽人 发表于: 2016-08-08   最后更新时间: 2016-11-24  
  •   40 订阅,1537 游览

9. Kafka Streams

9.1 概述

Kafka Streams是一个客户端程序库,用于处理和分析存储在Kafka中的数据,并将得到的数据写回Kafka或发送到外部系统。是基于一个重要的流处理概念,正确的区分事件时间和处理时间,窗口支持,简单而有效地管理应用程序状态。Kafka Streams有一个低门栏的入口: 你可以快速的编写和在单台机器上运行一个小规模的概念证明(proof-of-concept);而你只需要运行你的应用程序部署到多台机器上,以扩展高容量的生产负载。Kafka Stream利用kafka的并行模型来透明的处理相同的应用程序作负载平衡。

Kafka Stream 的亮点:

  • 设计一个简单的、轻量级的客户端库,可以很容易地嵌入在任何java应用程序与任何现有应用程序封装集成。
  • Apache Kafka本身作为内部消息层,没有外部的依赖,还有,它使用kafka的分区模型水平扩展处理,并同时保证有序。
  • 支持本地状态容错,非常快速、高效的状态操作(join和窗口的聚合)。
  • 采用 one-record-at-a-time 处理, 以达到低处理延迟,并支持基于事件时间(even-time)窗口操作
  • 提供必要的流处理原语(primitive),以及一个高级别的Steram DSL低级别的Processor API

9.2 开发者指南

有一个快速入门的示例代码,提供了如何运行一个流处理程序。本节重点介绍如何编写,配置和执行Kafka Streams应用程序。

核心概念

我们首先总结Kafka Streams的关键概念。

Stream Processing Topology
  • stream是Kafka Stream提供的最重要的抽象:它代表一个无限的,不断更新的数据集。stream是一个有序的,可重复的,不可变数据记录的容错的序列,其中数据记录是一个键值对(key-value)。

  • Kafka Stream中,stream写入处理程序定义了它的计算逻辑,通过一个或多个processor topology。其中,processor topology 是一个由stream(边缘)连接到stream processors(节点)图表。

  • stream processorprocessor topology的一个节点;它代表了一个处理步骤,stream通过在同一时间接收一个从upstream processors(上游处理器)topology中的输入记录来改变数据。应用其操作,随后可能产生一个或多个输出记录到它的 downstream processors(下游处理器)。

Kafka Stream提供两个方法来定义stream处理topology:Kafka Stream DSL提供最常用数据转换操作,如mapfilter; 相对较低级的 Processor API 允许开发人员定义和连接自定义处理器,以及状态存储。

时间

在stream处理方面有一个重要的时间概念,以及它是如何建模和集成。例如:一些操作,如 windowing 是基于时间界限定义的。

时间在stream中的常见概念如下:

  • 事件时间 - 当一个事件或数据记录发生的时间点,即最初创建的“源”。

  • 处理时间 - 事件或数据记录发生在stream程序处理的时间点。即当记录已经被消费了,处理时间可能是毫秒,小时,或天等,比原始的事件时间要晚。

Kafka Stream对每个数据记录(通过TimestampExtractor接口)分配了一个时间戳,这个接口具体实现了检索或根据实际计算时间戳数据记录的内容,如嵌入式时间戳字段提供的事件时间语义。或使用任何其他的方法,例如在处理时返回当前的wall-clock(墙钟)时间,从而产生stream程序的处理时间语义。因此,开发者可以根据自己的业务需求执行不同的时间。例如,每条记录时间戳描述了stream的时间增长(尽管记录在stream中是无序的)和利用时间依赖性的操作,如join。

状态

一些stream处理程序不需要状态,这意味着消息处理是独立于所有其他的消息处理。然而,能够保持状态,为复杂的stream处理程序打开了许多可能性:你可以加入输入流,或组和汇总数据记录。许多这样的状态操作者的提供者,如 Kafka Streams DSL。

Kafka Stream 提供了所谓的状态存储,它可以通过使用stream处理程序来存储和查询数据。当执行状态操作时,这是一个重要的能力。在Kafka Stream中的每一个任务嵌入了一个或多个状态存储,这个状态存储可以通过API进行存储和查询数据的处理。状态存储可以是一个持久的key/value存储,内存中的HashMap,或者是其他的数据结构。Kafka Stream提供了本地状态存储的故障容错和自动恢复。

正如我们上面提到的,一个Kafka Stream程序的计算逻辑定义为一个processor topology。目前 Kafka Stream 提供了两组API来定义processor topology,将在后面的章节中描述。

Low-Level Processor API

Processor(处理器)

开发者可以通过Processor接口来实现自己的自定义处理逻辑,接口提供了 processpunctuate 方法。process方法执行接收的消息;并根据时间进行周期性地执行punctuate方法。此外,processor可以保持当前的ProcessorContext实例变量,在init方法中初始化。利用上下文来计划puncuation周期(context().schedule),转发修改后的/新的键值对(key-value)到downstream processors (context().forward),提交当前的处理进度(context().commit)等。

public class MyProcessor extends Processor {
        private ProcessorContext context;
        private KeyValueStore kvStore;

        @Override
        @SuppressWarnings("unchecked")
        public void init(ProcessorContext context) {
            this.context = context;
            this.context.schedule(1000);
            this.kvStore = (KeyValueStore) context.getStateStore("Counts");
        }

        @Override
        public void process(String dummy, String line) {
            String[] words = line.toLowerCase().split(" ");

            for (String word : words) {
                Integer oldValue = this.kvStore.get(word);

                if (oldValue == null) {
                    this.kvStore.put(word, 1);
                } else {
                    this.kvStore.put(word, oldValue + 1);
                }
            }
        }

        @Override
        public void punctuate(long timestamp) {
            KeyValueIterator iter = this.kvStore.all();

            while (iter.hasNext()) {
                KeyValue entry = iter.next();
                context.forward(entry.key, entry.value.toString());
            }

            iter.close();
            context.commit();
        }

        @Override
        public void close() {
            this.kvStore.close();
        }
    };

在上面的实现中,执行以下操作:

  • 在init方法,每1秒调度 punctuation ,并通过其名称“Counts”检索本地状态存储。

  • 在process方法中,在每个接收记录,字符串的值分割成单词,并更新他们的数量到状态存储(稍后我们将讨论这个特性的部分)。

  • 在puncuate方法,迭代本地状态仓库和发送总量数到下游的processor,并提交当前的stream状态。

Processor Topology

用Processor API定义一个自定义的processor,开发人员可以使用TopologyBuilder通过连接这些processor共同构建一个processor topology。

    TopologyBuilder builder = new TopologyBuilder();

    builder.addSource("SOURCE", "src-topic")

        .addProcessor("PROCESS1", MyProcessor1::new /* the ProcessorSupplier that can generate MyProcessor1 */, "SOURCE")
        .addProcessor("PROCESS2", MyProcessor2::new /* the ProcessorSupplier that can generate MyProcessor2 */, "PROCESS1")
        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

        .addSink("SINK1", "sink-topic1", "PROCESS1")
        .addSink("SINK2", "sink-topic2", "PROCESS2")
        .addSink("SINK3", "sink-topic3", "PROCESS3");

上面代码,通过几个步骤来构建topology:

  • 首先,源节点命名为“SOURCE”并使用addSource方法添加到topology,主题“src-topic”提供记录(消息)。

  • 3个processor节点,使用addProcessor方法添加;这里的第一个processor是”SOURCE”节点的子节点,但其他两个processor是父类的。

  • 最后,使用addSink方法添加完整的3个sink节点到topology,每个管道从不同的父processor节点输出到一个单独topic。

本地状态存储

注意,Processor API 不仅仅限制访问当前记录,同时也可以使用状态处理操作维护刚抵达的记录维护到本地状态仓库,如聚合或窗口连接。利用本地状态,开发者可以使用TopologyBuilder.addStateStore方法构建processor topology来创建本地状态,并将其与需要访问的processor节点相关联;或者他们可以通过 TopologyBuilder.connectProcessorAndStateStores 连接创建本地状态仓库与现有的processor节点。

  TopologyBuilder builder = new TopologyBuilder();

    builder.addSource("SOURCE", "src-topic")

        .addProcessor("PROCESS1", MyProcessor1::new, "SOURCE")
        // create the in-memory state store "COUNTS" associated with processor "PROCESS1"
        .addStateStore(Stores.create("COUNTS").withStringKeys().withStringValues().inMemory().build(), "PROCESS1")
        .addProcessor("PROCESS2", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")
        .addProcessor("PROCESS3", MyProcessor3::new /* the ProcessorSupplier that can generate MyProcessor3 */, "PROCESS1")

        // connect the state store "COUNTS" with processor "PROCESS2"
        .connectProcessorAndStateStores("PROCESS2", "COUNTS");

        .addSink("SINK1", "sink-topic1", "PROCESS1")
        .addSink("SINK2", "sink-topic2", "PROCESS2")
        .addSink("SINK3", "sink-topic3", "PROCESS3");

在下一节中,我们用另一种方式来建立processor topology: Kafka Streams DSL.

High-Level Streams DSL

使用Streams DSL构建一个processor topology,开发者可以用KStreamBuilder类,这是TopologyBuilder的扩展。一个简单的例子在Kafka的源码的streams/examples包下。本节的其余部分将通过一些代码来演示使用 Stream DSL创建topylogy的关键步骤,但我们建议开发者阅读完整示例的源代码(更详细)。

KStream 和 KTable

DSL使用2个主要抽象。KStream是一个记录流的抽象,其中每个记录代表无限数据集的自包含数据。KTable是更改日志的抽象,其中每个记录代表一个更新。更准确的说,在数据记录中的值,如果有,数据记录中的值是同一记录key的最后的更新(如果相同的key不存在,则将创建)。为了说明KStreams和KTables之间的区别,让我们想象以下两个数据记录被发送到流:(“alice”,1) - >(“alice”,3)。如果这些记录是KStream,流处理应用总和的值将返回4,如果这些记录是KTable,则将返回3,因为最后的记录将被认为是一个更新。

Create Source Streams from Kafka

记录流(定义为KStream)或更新日志流(定义为KTable)可以从一个或多个Kafka主题创建消息流作为source stream (KTable只能从单个topic创建源流)。

    KStreamBuilder builder = new KStreamBuilder();

    KStream source1 = builder.stream("topic1", "topic2");
    KTable source2 = builder.table("topic3");

Windowing a stream

流处理器可能需要将数据记录划分为时间段。即,通过时间窗口。通常用于连接和聚合操作等。Kafka Streams当前定义了一下的类型窗口:

  • 跳跃时间窗口是基于时间间隔的窗口。 他们模型固定大小,(可能)重叠的窗口。跳跃时间窗口是由2个属性定义的:窗口的大小和其前进间隔(也叫“跳跃”)。前进间隔由窗口相对前一个窗口来指定向前移动多少。例如,你可以配置一个跳跃窗口,大小5分钟,前进间隔1分钟。由于跳跃窗口可以重叠,因此数据记录可大于一个这样的窗口。

  • 翻转时间窗口是跳跃时间窗口的特殊情况,并且像后者一样,是基于时间间隔的窗口。其模型固定大小,非重叠,无间隔窗口。翻转窗口是由单个属性定义的:窗口的大小。翻转窗口是一个跳跃开窗,它的窗口大小等于其前进间隔的跳跃窗口。由于翻转窗口绝不会重叠,数据记录只属于1个窗口。

  • 滑动窗口模式是一个在时间轴连续滑动的固定大小的窗口。在这里,两个数据记录,如果它们的时间戳差异在窗口大小之内,包括在同一个窗口。因此,滑动窗口与其他不一致,而是与数据记录时间戳对准。在kafka streams中,滑动窗口仅用于连接操作,可通过JoinWindows类指定。

Transform a stream

这些操作产生一个或多个KStream和KTable对象,可以转换成一个或多个处理器连接到底层processor topology。所有这些转化方法可以链接起来组成一个复杂的processor topology。因为KStream和KTable是强类型,所有这些转换操作被定义为泛型功能,其中用户可以指定的输入和输出的数据类型。

这些转换中,filter,map,mapValues等,都是无状态的转换操作,并可以同时适用KStream和KTable,用户通常可以自定义函数,这些函数作为参数,如Predicate过滤,KeyValueMapper地图,等:

// written in Java 8+, using lambda expressions
    KStream mapped = source1.mapValue(record -> record.get("category"));

无状态的转换,根据定义,不依赖任何状态进行处理,它们(明智的)不需要一个与stream processor相关的状态存储;状态的装换,另一方面,需要访问一个相关的状态进行处理并输出。例如,在join和聚合操作,窗口状态通常是用来接收所有的记录(窗口到目前为止的记录)。然后,操作着可以访问在仓库累积的记录并进行计算。

 // written in Java 8+, using lambda expressions
    KTable, Long> counts = source1.aggregateByKey(
        () -> 0L,  // initial value
        (aggKey, value, aggregate) -> aggregate + 1L,   // aggregating value
        TimeWindows.of("counts",5000L).advanceBy(1000L), // intervals in milliseconds
    );

    KStream joined = source1.leftJoin(source2,
        (record1, record2) -> record1.get("user") + "-" + record2.get("region");
    );

Write streams back to Kafka

最后处理结束后,用户可以选择(连续不断的)把最终结果流写回到一个Kafka主题,通过KStream toKTable to

 joined.to("topic4");

如果你的应用程序需要继续读取和处理这些记录,从输出主题构建一个新流,Kafka Stream提供了一个便利的方法through

    // equivalent to
    //
    // joined.to("topic4");
    // materialized = builder.stream("topic4");
    KStream materialized = joined.through("topic4");

除了定义的topology,开发者还将需要在运行它之前在StreamsConfig配置他们的应用,Kafka Stream配置的完整列表可以在这里找到。







发表于: 5月前   最后更新时间: 1月前   游览量:1537
上一条: Kafka Connector开发指南
下一条: kafka命令大全
评论…

  • 评论…
    • in this conversation
      提问