kafka connect 启动后,报错:Task is being killed and will not recover until manually restarted?

电杆GG 发表于: 2018-09-19   最后更新时间: 2018-09-19 14:09:13   7,663 游览

在kafka connect 启动执行命令:

bin/connect-standalone.sh config/connect-standalone.properties \
 config/connect-file-source.properties

之后,报错,然后一直刷:

[2018-09-19 11:07:51,308] ERROR WorkerSinkTask{id=local-file-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2018-09-19 11:08:00,855] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-09-19 11:08:00,856] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-09-19 11:08:00,864] INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 9 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:496)
[2018-09-19 11:08:10,865] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-09-19 11:08:10,866] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-09-19 11:08:20,867] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)

消费消息能够显示:

{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}
{"schema":{"type":"string","optional":false},"payload":"lijieine"}
{"schema":{"type":"string","optional":false},"payload":"lindcvsde"}

但是test.sink.txt中无值:

[root@localhost data]# cat test.sink.txt 
[root@localhost data]#
发表于 2018-09-19
添加评论

你好,请教下 ./bin/connect-standalone.sh config/connect-standalone.properties ./config_connection/sink_1.properties在此任务命令下假如有一条数据异常了,如何使整个任务不被killed,只是记录下这条异常数据,继续执行下一条数据??谢谢!!

任务被强杀,并不会恢复,直到你手动重启。
只有这一个错误吗?你看看broker里的日志。

电杆GG -> 半兽人 6年前

[2018-09-19 14:16:16,596] ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
 at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 ... 13 more
[2018-09-19 14:16:16,598] ERROR WorkerSinkTask{id=local-file-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)


还有这个

电杆GG -> 半兽人 6年前

[2018-09-19 14:16:16,596] ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
 at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 ... 13 more
[2018-09-19 14:16:16,598] ERROR WorkerSinkTask{id=local-file-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)


还有这个

半兽人 -> 电杆GG 6年前

你把schemas.enable = false试试。
还有,你的操作步骤也提供一下、

电杆GG -> 半兽人 6年前

配置:
1:
[root@localhost config]# cat connect-file-source.properties 

name=local-file-source
connector.class=FileStreamSource
tasks.max=1
file=/usr/local/kafka/data/test.txt
topic=test
2:
[root@localhost config]# cat connect-file-sink.properties 
name=local-file-sink
connector.class=FileStreamSink
tasks.max=1
file=/usr/local/kafka/data/test.sink.txt
topics=test

3:
[root@localhost config]# cat connect-standalone.properties 
bootstrap.servers=192.168.16.129:19092,192.168.16.131:19092,192.168.16.132:19092
key.converter=org.apache.kafka.connect.json.JsonConverter
value.converter=org.apache.kafka.connect.json.JsonConverter
key.converter.schemas.enable=true
value.converter.schemas.enable=true
offset.storage.file.filename=/tmp/connect.offsets
# Flush much faster than normal, which is useful for testing/debugging
offset.flush.interval.ms=10000
# plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
#plugin.path=

4:运行
[root@localhost bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties

电杆GG -> 电杆GG 6年前

更改为false也报错
ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
 at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
 at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:513)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
 ... 13 more
Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lijie': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"lijie"; line: 1, column: 11]
Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lijie': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"lijie"; line: 1, column: 11]
 at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
 at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
 at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
 at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
 at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
 at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
 at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
 at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
 at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
 at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)
 at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:513)
 at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
 at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
 at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)
[2018-09-19 15:21:59,930] ERROR WorkerSinkTask{id=local-file-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2018-09-19 15:22:09,537] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)

半兽人 -> 电杆GG 6年前

这是解析异常了。

Unrecognized token 'lijie': was expecting ('true', 'false' or 'null')
 at [Source: (byte[])"lijie"; line: 1, column: 11]
电杆GG -> 半兽人 6年前

解析异常应该怎么处理呢?这方面初学者,不太会进行更改

半兽人 -> 电杆GG 6年前

你是按照 https://www.orchome.com/6 第7步来的吗?

请问这个问题解决了吗? 我也遇到了这个问题

你的答案

查看kafka相关的其他问题或提一个您自己的问题