我想做的是: 将 mqtt 的数据导入 kafka
我是按照如下步骤做的:
https://github.com/evokly/kafka-connect-mqtt
connect-distributed.properties
的plugin.path
路径下bin/connect-distributed.sh config/connect-distributed.properties
url: POST - 47.97.199.139/kafka/connectors
{
"name":"mqtt-connector",
"config":{
"connector.class":"com.evokly.kafka.connect.mqtt.MqttSourceConnector",
"tasks.max": "5",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"mqtt.topic": "hello",
"topic": "hello"
}
}
日志显示:
[2018-04-03 01:14:24,826] INFO EnrichedConnectorConfig values:
connector.class = com.evokly.kafka.connect.mqtt.MqttSourceConnector
key.converter = class org.apache.kafka.connect.json.JsonConverter
name = mqtt-connector
tasks.max = 5
transforms = null
value.converter = class org.apache.kafka.connect.json.JsonConverter
(org.apache.kafka.connect.runtime.ConnectorConfig$EnrichedConnectorConfig:238)
[2018-04-03 01:14:24,826] ERROR Failed to reconfigure connector's tasks, retrying after backoff: (org.apache.kafka.connect.runtime.distributed.DistributedHerder:944)
java.lang.NoClassDefFoundError: com/evokly/kafka/connect/mqtt/MqttSourceTask
at com.evokly.kafka.connect.mqtt.MqttSourceConnector.taskClass(MqttSourceConnector.java:62)
at org.apache.kafka.connect.runtime.Worker.connectorTaskConfigs(Worker.java:273)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnector(DistributedHerder.java:986)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.reconfigureConnectorTasksWithRetry(DistributedHerder.java:936)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$900(DistributedHerder.java:108)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:949)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder$17$1.call(DistributedHerder.java:946)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.tick(DistributedHerder.java:261)
at org.apache.kafka.connect.runtime.distributed.DistributedHerder.run(DistributedHerder.java:210)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
想问问这个使用自定义connector 的流程有没有错?
还有这个 NoClassDefFoundError是怎么产生的?