Kafka Streams从一个或多个输入topic进行连续的计算并输出到0或多个外部topic中。
可以通过TopologyBuilder类定义一个计算逻辑处理器
DAG拓扑。或者也可以通过提供的高级别KStream DSL来定义转换的KStreamBuilder。(PS:计算逻辑其实就是自己的代码逻辑)
KafkaStreams类管理Kafka Streams实例的生命周期。一个stream实例可以在配置文件中为处理器
指定一个或多个Thread。
KafkaStreams实例可以作为单个streams处理客户端(也可能是分布式的),与其他的相同应用ID的实例进行协调(无论是否在同一个进程中,在同一台机器的其他进程中,或远程机器上)。这些实例将根据输入topic分区的基础上来划分工作,以便所有的分区都被消费掉。如果实例添加或失败,所有实例将重新平衡它们之间的分区分配,以保证负载平衡。
在内部,KafkaStreams实例包含一个正常的KafkaProducer
和KafkaConsumer
实例,用于读取和写入,
一个简单的例子:
Map<String, Object> props = new HashMap<>();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
StreamsConfig config = new StreamsConfig(props);
KStreamBuilder builder = new KStreamBuilder();
builder.stream("my-input-topic").mapValues(value -> value.length().toString()).to("my-output-topic");
KafkaStreams streams = new KafkaStreams(builder, config);
streams.start();
案例该换了
好的,我最近更新一下
案例是不是不符合新版本了?
符合的。
props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
中
StreamsConfig.KEY_SERDE_CLASS_CONFIG,StreamsConfig.VALUE_SERDE_CLASS_CONFIG
已经@deprecated
了....收到,稍后我看看官网是否已经更新。
Exception in thread "StreamThread-1" java.lang.ClassCastException: java.lang.Integer cannot be cast to java.lang.String at org.apache.kafka.common.serialization.StringSerializer.serialize(StringSerializer.java:24)
这是啥原因呢
类型转换错误。
嗯嗯,这个我知道,但是我现在不知道这是怎么引起的
public class KafkaStream { public static void main(String[] args){ Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.STATE_DIR_CONFIG,"{test}"); props.put(StreamsConfig.APPLICATION_ID_CONFIG,"my-stream-processing-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "140.143.67.114:9092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.stream("test").mapValues(value -> value.toString().length()).to("test-out"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start(); } }
上面是kafkaStream的配置
public class MyProducer { public static void main(String args[]){ Properties props = new Properties(); props.put("bootstrap.servers", "140.143.67.114:9092"); props.put("acks", "all"); props.put("retries", 0); props.put("batch.size", 16384); props.put("linger.ms", 1); props.put("buffer.memory", 33554432); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); Producer<String, String> producer = new KafkaProducer<>(props); for(int i = 100; i < 200; i++) producer.send(new ProducerRecord<>("my-test", Integer.toString(i)+"key", Integer.toString(i)+"val")); producer.close(); } }
上面是Producer的配置
不知道是什么原因引起的类型转换错误
流方法实现中,int转string错误。无法转。
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka-streams</artifactId> <version>0.10.0.0</version> </dependency>
在 pom.xml 中添加了这个 但还是提示
invalid content was found starting with element ‘dependency'
这个错误,可是很少见那。检查pom文件额。
刚接触kafka,一直在拜读您的文章,请教一下,这个例子怎么实现向两个topic输出?比如从my-input-topic 到 my-output-topic1、my-output-topic2
我在例子中这样将结果输出到两个topic的
wordCounts.toStream().to("streams-plaintext-output", Produced.with(Serdes.String(), Serdes.Long())); wordCounts.toStream().to("my-replicated-topic", Produced.with(Serdes.String(), Serdes.Long()));
测试了,发现没有任何输出,请问是什么原因呢?程序如下:
Map<String, Object> props = new HashMap<>(); props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop-sh1-core3:9092"); props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName()); StreamsConfig config = new StreamsConfig(props); KStreamBuilder builder = new KStreamBuilder(); builder.stream("streams-file-input").mapValues(value -> value.toString()).to("streams-file-input"); KafkaStreams streams = new KafkaStreams(builder, config); streams.start();
builder.stream("streams-file-input").mapValues(value -> value.toString()).to("streams-file-input");
输出不对吧
请教,编译错误 Error:java: Compilation failed: internal java compiler error 怎么回事呢?
找到解决方法了,原来是java complier 设置的问题
纠正一下:上面的Lambda表达式表述有误,应该是
value -> value.toString().length()
StreamsConfig、KStreamBuilder、KafkaStreams 这几个类怎么没有,idea中没有提示
有的
哦,我看 《消费者客户端(0.10.0.1API)》这章节是用的0.10.0.1,没注意这章节用的是0.10.1.1 最好能统一下版本,不知道是不是笔误,正在拜读,谢谢大神!
不好意思我看错了,我没有引入 kafka-streams 依赖
没事 多多交流