返回到文章

采纳

编辑于

kafka connect 自定义 connector 报NoClassDefFoundError 错误?

kafka

我想做的是: 将 mqtt 的数据导入 kafka

我是按照如下步骤做的:

  1. 使用的connector 是https://github.com/evokly/kafka-connect-mqtt
  2. 将编译出的jar放入 kafka connect-distributed.propertiesplugin.path 路径下
  3. 启动 kafka connect bin/connect-distributed.sh config/connect-distributed.properties
  4. 然后使用 rest api 添加一个connector
    url: POST - 47.97.199.139/kafka/connectors
    {
  "name":"mqtt-connector",
  "config":{
    "connector.class":"com.evokly.kafka.connect.mqtt.MqttSourceConnector",
    "tasks.max": "5",
    "key.converter": "org.apache.kafka.connect.json.JsonConverter",
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",
    "mqtt.topic": "hello",
    "topic": "hello"
  }
}

日志显示:

    [2018-04-03 01:14:24,826] INFO EnrichedConnectorConfig values: 
    connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector
    key.converter = class org.apache.kafka.connect.json.JsonConverter
    name = mqtt-connector
    tasks.max = 5
    transforms = null
    value.converter = class org.apache.kafka.connect.json.JsonConverter
 (org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:238)
[2018-04-03 01:14:24,826] ERROR Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:944)
java.lang.NoClassDefFoundError: com/evokly/kafka/connect/mqtt/MqttSourceTask
    at com.evokly.kafka.connect.mqtt.MqttSourceConnector.taskClass(MqttSourceConnector.java:62)
    at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:273)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:986)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:936)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$900(DistributedHerder.java:108)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:949)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:946)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:261)
    at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:210)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

想问问这个使用自定义connector 的流程有没有错?

还有这个 NoClassDefFoundError是怎么产生的?