kafka的背景知识已经讲了很多了,让我们现在开始实践吧,假设你现在没有Kafka
和ZooKeeper
环境。
Step 1: 下载
下载并且解压它。
$ tar -xzf kafka_2.13-3.8.1.tgz
$ cd kafka_2.13-3.8.1
Step 2: 启动服务
注意:你的本地环境必须安装有
Java 8+
。
运行kafka需要使用Zookeeper,所以你需要先启动Zookeeper,如果你没有Zookeeper,你可以使用kafka自带打包和配置好的Zookeeper。
# 注意:Apache Kafka2.8版本之后可以不需要使用ZooKeeper,内测中,文章末尾有体验的安装方式。
$ bin/zookeeper-server-start.sh config/zookeeper.properties
...
打开另一个命令终端启动kafka服务:
$ bin/kafka-server-start.sh config/server.properties &
一旦所有服务成功启动,那Kafka已经可以使用了。
Step 3: 创建一个主题(topic)
创建一个名为“test”的Topic,只有一个分区和一个备份:
$ bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic test
创建好之后,可以通过运行以下命令,查看已创建的topic信息:
$ bin/kafka-topics.sh --describe --topic test --bootstrap-server localhost:9092
Topic:test PartitionCount:1 ReplicationFactor:1 Configs:
Topic: test Partition: 0 Leader: 0 Replicas: 0 Isr: 0
或者,除了手工创建topic外,你也可以配置你的broker,当发布一个不存在的topic时自动创建topic,点击这里查看如何配置自动创建topic时设置默认的分区和副本数。
Step 4: 发送消息
Kafka提供了一个命令行的工具,可以从输入文件或者命令行中读取消息并发送给Kafka集群。每一行是一条消息。
运行 producer(生产者)
,然后在控制台输入几条消息到服务器。
$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic test
This is a message
This is another message
Step 5: 消费消息
Kafka也提供了一个消费消息的命令行工具,将存储的信息输出出来,新打开一个命令控制台,输入:
$ bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --from-beginning
This is a message
This is another message
如果你有2台不同的终端上运行上述命令,那么当你在运行生产者时,消费者就能消费到生产者发送的消息。
可以使用
Ctrl-C
停止消费者客户端。
Step 6: 使用 Kafka Connect 来 导入/导出 数据
你可能在现有的系统中拥有大量的数据,如关系型数据库
或传统的消息传递系统
,以及许多已经使用这些系统的应用程序。Kafka Connect允许你不断地从外部系统提取数据到Kafka,反之亦然。用Kafka整合现有的系统是非常容易的。为了使这个过程更加容易,有数百个这样的连接器现成可用。
查看Kafka Connect部分,了解更多关于如何将你的数据持续导入和导出Kafka。
Step 7: 使用Kafka Stream来处理数据
一旦你的数据存储在Kafka中,你就可以用Kafka Streams客户端库来处理这些数据,该库适用于Java/Scala
。它允许你实现自己的实时应用程序和微服务,其中输入和/或输出数据存储在Kafka主题中。Kafka Streams将在客户端编写和部署标准Java和Scala应用程序的简单性与Kafka服务器端集群技术的优势相结合,使这些应用程序具有可扩展性
、弹性
、容错性
和分布式
。该库支持精确的一次性处理、有状态操作和聚合、窗口化、连接、基于事件时间的处理等等。
为了给你一个初步的体验,这里是如何实现流行的WordCount
算法的:
KStream<String, String> textLines = builder.stream("quickstart-events");
KTable<String, Long> wordCounts = textLines
.flatMapValues(line -> Arrays.asList(line.toLowerCase().split(" ")))
.groupBy((keyIgnored, word) -> word)
.count();
wordCounts.toStream().to("output-topic", Produced.with(Serdes.String(), Serdes.Long()));
Kafka流演示例子和应用开发教程展示了如何从头到尾编码和运行这样一个流式应用。
Step 8:停止Kafka
现在你已经完成了快速入门,可以随时卸载Kafka环境了,或者继续玩下去。
- 使用
Ctrl-C
停止生产者和消费者客户端。 - 使用
Ctrl-C
停止Kafka broker。 - 最后,用
Ctrl-C
停止ZooKeeper。
如果你还想删除你的本地Kafka环境的数据,包括你创建的消息,运行以下命令:
$ rm -rf /tmp/kafka-logs /tmp/zookeeper
你好,请教下,能用win cmd窗口跑上面的启动命令吗?启动zookeeper,kafka服务器,创建topic,我run完‘查看topci状态’的命令后,前面两个命令的窗口都自动关闭了。
windowns的命令后缀是
.bat
的呀。想请教一下,为什么有的帖子创建Topic的时候是指定的
localhost/ip:2181
,但是咱们这个教程里是用的9092端口。我自己本地用单台服务器部署3个zookeeper做集群,3个不同的kakfa配置文件起kakfa集群,到创建Topic的时候如果是找2181就是提示超时:Error while executing topic command : Timed out waiting for a node assignment. Call: createTopics [2022-02-17 10:57:42,193] ERROR org.apache.kafka.common.errors.TimeoutException: Timed out waiting for a node assignment. Call: createTopics (kafka.admin.TopicCommand$)
反而使用9092是可以成功的,不太理解这个区别~
我从 https://www.orchome.com/454 上看,好像是因为zookeeper版本的改变,使得一些语法的使用上发生了变更,是通信方式上也有变化吗?
对的,kafka版本的问题,与zk没关系的。
好的,谢谢大佬
楼主辛苦了,不错的教程
多提意见。
你好,请问server.properties已经修改过,但是启动kafka时没反应,创建topic报警,这该怎么解决
默认是127.0.0.1,所以需要修改
server.properties
配置中的listeners
:listeners= PLAINTEXT://10.25.110.107:9092
参考:
https://www.orchome.com/472
转到这里说吧:https://www.orchome.com/10421
listeners= PLAINTEXT://[我的服务器对内ip]:9092
我已经修改了这个配置,但还是报这个错
Connection to node -1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
到问题专区提吧,把你的配置贴一下。
现在kafka还需要zookeper吗?
现在还是需要的,新版不需要的已经在内测了。
可以关注我的公众号,有新动态我会发的。(现在懒了,只发重要的事情)
此处的wordcount输入主题不对吧,看流那一节的演示案例,没有问题
感谢提醒,晚会我们进行核实。
已经将文章重新整理了,迁移到流那一节了。
大佬,replicas 和ISR中有-1是表示有节点挂了吗
是的
那样的话会不会造成数据丢失
如果分区的副本都是
-1
了,就会丢消息。https://www.orchome.com/6#item-6
明白了,谢谢大佬
最新更新2021年12月的么,我是穿越了吗
取得是系统时间,程序有bug!!!
我又更新了一下,是正确的时间....
您好,输入命令
>bin\windows\connect-standalone.bat config\connect-standalone .properties config\connect-file-source.properties config\connext-file-sink.properties
后提示
ERROR Stopping after connector error
应该怎么处理呀?
我在上步操作的时候多打了一个
> echo foo>test.txt
因为没有加空格,也没反应我以为是空格问题,所以就加空格重新输入了一遍cmd .\windows\connect-standalone.bat .\config\connect-standalone.properties .\config\connect-bbdd.properties
你好,bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic testof1 --from-beginning命令输入完,什么反应都没有,这是咋回事
发送消息的时候,确认成功了吗?
是的,使用java的消费者都可以接受到消息,在服务器上面执行这条命令,像是啥也不输入,只按了回车键一样
应该最后会报超时,java既然成功了,应该配置的不是
localhost:9092
这个,换成跟java一样具体的ip试试。[root@localhost kafka_2.12-2.0.1]# bin/kafka-console-consumer.sh --bootstrap-server *****:9092 --topic testof1 --from-beginning [root@localhost kafka_2.12-2.0.1]#
这是命令执行之后的,之前正常的时候应该像这篇文章中说到的那样,我这里就直接没反应
如果没有任何数据 也没有报错 那这个topic里没有任何消息。