KafkaStreams客户端(0.10.1.1 API)

半兽人 发表于: 2017-02-15   最后更新时间: 2017-02-15  
  •   133 订阅,4429 游览

Kafka Streams从一个或多个输入topic进行连续的计算并输出到0或多个外部topic中。

可以通过TopologyBuilder类定义一个计算逻辑处理器DAG拓扑。或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。(PS:计算逻辑其实就是自己的代码逻辑)

KafkaStreams类管理Kafka Streams实例的生命周期。一个stream实例可以在配置文件中为处理器指定一个或多个Thread。

KafkaStreams实例可以作为单个streams处理客户端(也可能是分布式的),与其他的相同应用ID的实例进行协调(无论是否在同一个进程中,在同一台机器的其他进程中,或远程机器上)。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以保证负载平衡。

在内部,KafkaStreams实例包含一个正常的KafkaProducerKafkaConsumer实例,用于读取和写入,

一个简单的例子:

    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();






发表于: 7月前   最后更新时间: 7月前   游览量:4429
上一条: Kafka Streams API
下一条: Kafka Connect API
评论…

  • StreamsConfig、KStreamBuilder、KafkaStreams 这几个类怎么没有,idea中没有提示
  • 评论…
    • in this conversation
      提问