KafkaStreams客户端(0.10.0.0 API)

@InterfaceStability.Unstable
public class KafkaStreams
extends Object

Kafka允许从1个或多个topic进行连续的执行计算,并输出到0或多个topic。可以通过使用TopologyBuilder类来定义指定计算逻辑的DAG拓扑的处理器,或使用KStreamBuilder类,该类提供了高级别的kstream DSL,来定义转换。KafkaStreams类管理kafka Stream实例的生命周期。一个Stream实例可以包含一个或多个Thread(在配置中指定)。

一个KafkaStreams实例可以与任何具有相同应用ID(无论是同一进程,这台机器上的其他进程,或远程的机器)的其它的实例作为一个单一(也可能是分布式的)的Stream处理客户端。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以平衡处理负载。

在内部,KafkaStream实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读和写。

一个简单的例子:

    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();






发表于: 2年前   最后更新时间: 1年前   游览量:6635
上一条: 使用Kafka Stream处理数据
下一条: Kafka Broker配置

评论…


  • 在吗?我想问下为什么我照这个例子打的我的输出端topic接收不到值。
    •  Properties properties=new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-pipe1");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
        properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
        properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
        final KStreamBuilder  builder=new KStreamBuilder();
        KStream<String,String> source=builder.stream("zjc-input");
        source.to("zjc-output");
        //final Topology topology=builder.build();
        KafkaStreams kafkaStreams=new KafkaStreams(builder,properties);
        final CountDownLatch latch =new CountDownLatch(1);
        Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
         public void run(){
          kafkaStreams.close();
          latch.countDown();
         }
        });
        try{
         kafkaStreams.start();
        • 13:42:12.432 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Group coordinator lookup for group streams-pipe1 failed: The group coordinator is not available.
            version: 0.10.1
            Map<String, Object> props = new HashMap<>();
                props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
                props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
                props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
                props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
                StreamsConfig config = new StreamsConfig(props);
            
                KStreamBuilder builder = new KStreamBuilder();
                builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
            
                KafkaStreams streams = new KafkaStreams(builder, config);
                streams.start();
          • 评论…
            • in this conversation