KafkaStreams客户端(0.10.1.1 API)

半兽人 发表于: 2017-02-15   最后更新时间: 2017-02-15  
  •   214 订阅,9310 游览

Kafka Streams从一个或多个输入topic进行连续的计算并输出到0或多个外部topic中。

可以通过TopologyBuilder类定义一个计算逻辑处理器DAG拓扑。或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。(PS:计算逻辑其实就是自己的代码逻辑)

KafkaStreams类管理Kafka Streams实例的生命周期。一个stream实例可以在配置文件中为处理器指定一个或多个Thread。

KafkaStreams实例可以作为单个streams处理客户端(也可能是分布式的),与其他的相同应用ID的实例进行协调(无论是否在同一个进程中,在同一台机器的其他进程中,或远程机器上)。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以保证负载平衡。

在内部,KafkaStreams实例包含一个正常的KafkaProducerKafkaConsumer实例,用于读取和写入,

一个简单的例子:

    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();






发表于: 1年前   最后更新时间: 1年前   游览量:9310
上一条: Kafka Streams API
下一条: Kafka Connect API

评论…


  • StreamsConfig、KStreamBuilder、KafkaStreams 这几个类怎么没有,idea中没有提示
    纠正一下:上面的Lambda表达式表述有误,应该是value -> value.toString().length()
    请教,编译错误 Error:java: Compilation failed: internal java compiler error 怎么回事呢?
    测试了,发现没有任何输出,请问是什么原因呢?程序如下:
    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-sh1-core3:9092");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.stream("streams-file-input").mapValues(value -> value.toString()).to("streams-file-input");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

    刚接触kafka,一直在拜读您的文章,请教一下,这个例子怎么实现向两个topic输出?比如从my-input-topic 到 my-output-topic1、my-output-topic2
    <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-streams</artifactId>
    <version>0.10.0.0</version>
    </dependency>

    在pom.xml中添加了这个 但还是提示invalid content was found starting with element ‘dependency'
  • 评论…
    • in this conversation
      提问