KafkaStreams客户端(0.10.1.1 API)

半兽人 发表于: 2017-02-15   最后更新时间: 2017-02-15  
  •   252 订阅,11201 游览

Kafka Streams从一个或多个输入topic进行连续的计算并输出到0或多个外部topic中。

可以通过TopologyBuilder类定义一个计算逻辑处理器DAG拓扑。或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。(PS:计算逻辑其实就是自己的代码逻辑)

KafkaStreams类管理Kafka Streams实例的生命周期。一个stream实例可以在配置文件中为处理器指定一个或多个Thread。

KafkaStreams实例可以作为单个streams处理客户端(也可能是分布式的),与其他的相同应用ID的实例进行协调(无论是否在同一个进程中,在同一台机器的其他进程中,或远程机器上)。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以保证负载平衡。

在内部,KafkaStreams实例包含一个正常的KafkaProducerKafkaConsumer实例,用于读取和写入,

一个简单的例子:

    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();






发表于: 1年前   最后更新时间: 1年前   游览量:11201
上一条: Kafka Streams API
下一条: Kafka Connect API

评论…


  •  props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    中StreamsConfig.KEY_SERDE_CLASS_CONFIG,StreamsConfig.VALUE_SERDE_CLASS_CONFIG已经@deprecated了....
    Exception in thread "StreamThread-1" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String
     at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
    这是啥原因呢
    • 嗯嗯,这个我知道,但是我现在不知道这是怎么引起的
      public class KafkaStream {

          public static void main(String[] args){
              Map<String, Object> props = new HashMap<>();
              props.put(StreamsConfig.STATE_DIR_CONFIG,"{test}");
              props.put(StreamsConfig.APPLICATION_ID_CONFIG,"my-stream-processing-application");
              props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "140.143.67.114:9092");
              props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
              props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
              StreamsConfig config = new StreamsConfig(props);

              KStreamBuilder builder = new KStreamBuilder();
              builder.stream("test").mapValues(value -> value.toString().length()).to("test-out");

              KafkaStreams streams = new KafkaStreams(builder, config);
              streams.start();
          }
      }
      上面是kafkaStream的配置
      public class MyProducer {
          public static void main(String args[]){
              Properties props = new Properties();
              props.put("bootstrap.servers", "140.143.67.114:9092");
              props.put("acks", "all");
              props.put("retries", 0);
              props.put("batch.size", 16384);
              props.put("linger.ms", 1);
              props.put("buffer.memory", 33554432);
              props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
              props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");

              Producer<String, String> producer = new KafkaProducer<>(props);
              for(int i = 100; i < 200; i++)
                  producer.send(new ProducerRecord<>("my-test", Integer.toString(i)+"key", Integer.toString(i)+"val"));

              producer.close();
          }
      }
      上面是Producer的配置
      不知道是什么原因引起的类型转换错误
        <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>0.10.0.0</version>
        </dependency>

        在pom.xml中添加了这个 但还是提示invalid content was found starting with element ‘dependency'
        刚接触kafka,一直在拜读您的文章,请教一下,这个例子怎么实现向两个topic输出?比如从my-input-topic 到 my-output-topic1、my-output-topic2
        • 我在例子中这样将结果输出到两个topic的
            wordCounts.toStream().to("streams-plaintext-output", Produced.with(Serdes.String(), Serdes.Long()));
            wordCounts.toStream().to("my-replicated-topic", Produced.with(Serdes.String(), Serdes.Long()));
            测试了,发现没有任何输出,请问是什么原因呢?程序如下:
            Map<String, Object> props = new HashMap<>();
            props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
            props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-sh1-core3:9092");
            props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
            StreamsConfig config = new StreamsConfig(props);

            KStreamBuilder builder = new KStreamBuilder();
            builder.stream("streams-file-input").mapValues(value -> value.toString()).to("streams-file-input");

            KafkaStreams streams = new KafkaStreams(builder, config);
            streams.start();

            请教,编译错误 Error:java: Compilation failed: internal java compiler error 怎么回事呢?
            纠正一下:上面的Lambda表达式表述有误,应该是value -> value.toString().length()
            StreamsConfig、KStreamBuilder、KafkaStreams 这几个类怎么没有,idea中没有提示
          • 评论…
            • in this conversation
              提问