KafkaStreams客户端(0.10.0.0 API)

無名 发表于: 2016-07-06   最后更新时间: 2017-02-15  
  •   0 订阅,2540 游览
@InterfaceStability.Unstable
public class KafkaStreams
extends Object

Kafka允许从1个或多个topic进行连续的执行计算,并输出到0或多个topic。可以通过使用TopologyBuilder类来定义指定计算逻辑的DAG拓扑的处理器,或使用KStreamBuilder类,该类提供了高级别的kstream DSL,来定义转换。KafkaStreams类管理kafka Stream实例的生命周期。一个Stream实例可以包含一个或多个Thread(在配置中指定)。

一个KafkaStreams实例可以与任何具有相同应用ID(无论是同一进程,这台机器上的其他进程,或远程的机器)的其它的实例作为一个单一(也可能是分布式的)的Stream处理客户端。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以平衡处理负载。

在内部,KafkaStream实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读和写。

一个简单的例子:

    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();






发表于: 10月前   最后更新时间: 3月前   游览量:2540
上一条: 使用Kafka Stream处理数据
下一条: kafka旧低级消费API
评论…

  • version: 0.10.1
    Map<String, Object> props = new HashMap<>();
        props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
        props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
        StreamsConfig config = new StreamsConfig(props);
    
        KStreamBuilder builder = new KStreamBuilder();
        builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
    
        KafkaStreams streams = new KafkaStreams(builder, config);
        streams.start();
  • 评论…
    • in this conversation
      提问