kafka connect 自定义 connector 报NoClassDefFoundError 错误?

北兮! 发表于: 2018-04-03   最后更新时间: 2018-04-03 10:21:17   5,272 游览

我想做的是: 将 mqtt 的数据导入 kafka

我是按照如下步骤做的:

  1. 使用的connector 是https://github.com/evokly/kafka-connect-mqtt
  2. 将编译出的jar放入 kafka connect-distributed.propertiesplugin.path 路径下
  3. 启动 kafka connect bin/connect-distributed.sh config/connect-distributed.properties
  4. 然后使用 rest api 添加一个connector
    url: POST - 47.97.199.139/kafka/connectors
    {
  "name":"mqtt-connector",
  "config":{
    "connector.class":"com.evokly.kafka.connect.mqtt.MqttSourceConnector",
    "tasks.max": "5",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "mqtt.topic": "hello",
    "topic": "hello"
  }
}

日志显示:

    [2018-04-03 01:14:24,826] INFO EnrichedConnectorConfig values: 
    connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    name = mqtt-connector
    tasks.max = 5
    transforms = null
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:238)
[2018-04-03 01:14:24,826] ERROR Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:944)
java.lang.NoClassDefFoundError: com/evokly/kafka/connect/mqtt/MqttSourceTask
    at com.evokly.kafka.connect.mqtt.MqttSourceConnector.taskClass(MqttSourceConnector.java:62)
    at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:273)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:986)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:936)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$900(DistributedHerder.java:108)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:949)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:946)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:261)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:210)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

想问问这个使用自定义connector 的流程有没有错?

还有这个 NoClassDefFoundError是怎么产生的?

发表于 2018-04-03
添加评论

将需要的jar包加入: /opt/kafka_2.12-1.0.1/libs 后, 解决了这个问题

老哥可以详细教我一下 kafka connect 吗

下载 wurstmeister/kafka 的docker 镜像启动, 根据问题中的步骤挂载相应文件(包括 libs目录下你需要加的 jar), 应该可以实现一个简单的 demo

这个connect可以监听多个 mqtt topic吗

都可以, 监听多个topic/过滤/更改消息内容都行

你的答案

查看kafka相关的其他问题或提一个您自己的问题