KafkaStreams客户端(0.10.0.0 API)

無名 发表于: 2016-07-06   最后更新时间: 2017-02-15 23:20:08  
{{totalSubscript}} 订阅, 12,498 游览
@InterfaceStability.Unstable
public class KafkaStreams
extends Object

Kafka允许从1个或多个topic进行连续的执行计算,并输出到0或多个topic。可以通过使用TopologyBuilder类来定义指定计算逻辑的DAG拓扑的处理器,或使用KStreamBuilder类,该类提供了高级别的kstream DSL,来定义转换。KafkaStreams类管理kafka Stream实例的生命周期。一个Stream实例可以包含一个或多个Thread(在配置中指定)。

一个KafkaStreams实例可以与任何具有相同应用ID(无论是同一进程,这台机器上的其他进程,或远程的机器)的其它的实例作为一个单一(也可能是分布式的)的Stream处理客户端。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以平衡处理负载。

在内部,KafkaStream实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读和写。

一个简单的例子:

    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
    props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
更新于 2017-02-15

zjc 7年前

在吗?我想问下为什么我照这个例子打的我的输出端topic接收不到值。

半兽人 -> zjc 7年前

my-output-topic没有值?

zjc -> 半兽人 7年前

是的。

zjc -> 半兽人 7年前

 Properties properties=new Properties();
  properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-pipe1");
  properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
  properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
  properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
  final KStreamBuilder  builder=new KStreamBuilder();
  KStream<String,String> source=builder.stream("zjc-input");
  source.to("zjc-output");
  //final Topology topology=builder.build();
  KafkaStreams kafkaStreams=new KafkaStreams(builder,properties);
  final CountDownLatch latch =new CountDownLatch(1);
  Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
   public void run(){
    kafkaStreams.close();
    latch.countDown();
   }
  });
  try{
   kafkaStreams.start();

zjc -> 半兽人 7年前

13:42:12.432 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Group coordinator lookup for group streams-pipe1 failed: The group coordinator is not available.

半兽人 -> zjc 7年前

看下集群是否正常,看着不像是客户端的问题。

zjc -> 半兽人 7年前

正常呀。我集群就一个broker topic 也是只有一个分区。

半兽人 -> zjc 7年前

集群和程序装在同一台机器?

半兽人 -> zjc 7年前

先测试一下。
1、换个新的zk,3.4.9
2、换个新版本的kafka。
3、什么都不用动,复制上面代码执行。

库特 7年前

version: 0.10.1

Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();
    builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();
半兽人 -> 库特 7年前

thanks,我更新一版。

查看history更多相关的文章或提一个关于history的问题,也可以与我们一起分享文章