@InterfaceStability.Unstable
public class KafkaStreams
extends Object
Kafka允许从1个或多个topic进行连续的执行计算,并输出到0或多个topic。可以通过使用TopologyBuilder类来定义指定计算逻辑的DAG拓扑的处理器,或使用KStreamBuilder类,该类提供了高级别的kstream DSL,来定义转换。KafkaStreams类管理kafka Stream实例的生命周期。一个Stream实例可以包含一个或多个Thread(在配置中指定)。
一个KafkaStreams实例可以与任何具有相同应用ID(无论是同一进程,这台机器上的其他进程,或远程的机器)的其它的实例作为一个单一(也可能是分布式的)的Stream处理客户端。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以平衡处理负载。
在内部,KafkaStream实例包含一个正常的KafkaProducer和KafkaConsumer实例,用于读和写。
一个简单的例子:
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(StreamsConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(StreamsConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
builder.from("my-input-topic").mapValue(value -> value.length().toString()).to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
在吗?我想问下为什么我照这个例子打的我的输出端topic接收不到值。
my-output-topic没有值?
是的。
Properties properties=new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG,"streams-pipe1");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,"localhost:9092");
properties.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
properties.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG,Serdes.String().getClass());
final KStreamBuilder builder=new KStreamBuilder();
KStream<String,String> source=builder.stream("zjc-input");
source.to("zjc-output");
//final Topology topology=builder.build();
KafkaStreams kafkaStreams=new KafkaStreams(builder,properties);
final CountDownLatch latch =new CountDownLatch(1);
Runtime.getRuntime().addShutdownHook(new Thread("streams-shutdown-hook"){
public void run(){
kafkaStreams.close();
latch.countDown();
}
});
try{
kafkaStreams.start();
13:42:12.432 [StreamThread-1] DEBUG org.apache.kafka.clients.consumer.internals.AbstractCoordinator - Group coordinator lookup for group streams-pipe1 failed: The group coordinator is not available.
看下集群是否正常,看着不像是客户端的问题。
正常呀。我集群就一个broker topic 也是只有一个分区。
集群和程序装在同一台机器?
先测试一下。
1、换个新的zk,3.4.9
2、换个新版本的kafka。
3、什么都不用动,复制上面代码执行。
version: 0.10.1
thanks,我更新一版。
最新的:https://www.orchome.com/512