我想做的是: 将 mqtt 的数据导入 kafka
我是按照如下步骤做的:
- 使用的connector 是
https://github.com/evokly/kafka-connect-mqtt
- 将编译出的jar放入 kafka
connect-distributed.properties
的plugin.path
路径下 - 启动 kafka connect
bin/connect-distributed.sh config/connect-distributed.properties
- 然后使用 rest api 添加一个connector
url: POST - 47.97.199.139/kafka/connectors
{
"name":"mqtt-connector",
"config":{
"connector.class":"com.evokly.kafka.connect.mqtt.MqttSourceConnector",
"tasks.max": "5",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"mqtt.topic": "hello",
"topic": "hello"
}
}
日志显示:
[2018-04-03 01:14:24,826] INFO EnrichedConnectorConfig values:
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector
key.converter = class org.apache.kafka.connect.json.JsonConverter
name = mqtt-connector
tasks.max = 5
transforms = null
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:238)
[2018-04-03 01:14:24,826] ERROR Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:944)
java.lang.NoClassDefFoundError: com/evokly/kafka/connect/mqtt/MqttSourceTask
at com.evokly.kafka.connect.mqtt.MqttSourceConnector.taskClass(MqttSourceConnector.java:62)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:273)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:986)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:936)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$900(DistributedHerder.java:108)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:949)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:946)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:261)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:210)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
想问问这个使用自定义connector 的流程有没有错?
还有这个 NoClassDefFoundError是怎么产生的?
将需要的jar包加入:
/opt/kafka_2.12-1.0.1/libs
后, 解决了这个问题老哥可以详细教我一下 kafka connect 吗
下载 wurstmeister/kafka 的docker 镜像启动, 根据问题中的步骤挂载相应文件(包括 libs目录下你需要加的 jar), 应该可以实现一个简单的 demo
这个connect可以监听多个 mqtt topic吗
都可以, 监听多个topic/过滤/更改消息内容都行
你的答案