kafka connect 启动后,报错:Task is being killed and will not recover until manually restarted?

在kafka connect 启动执行命令:

bin/connect-standalone.sh config/connect-standalone.properties \
 config/connect-file-source.properties

之后,报错,然后一直刷:

[2018-09-19 11:07:51,308] ERROR WorkerSinkTask{id=local-file-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
[2018-09-19 11:08:00,855] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-09-19 11:08:00,856] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-09-19 11:08:00,864] INFO WorkerSourceTask{id=local-file-source-0} Finished commitOffsets successfully in 9 ms (org.apache.kafka.connect.runtime.WorkerSourceTask:496)
[2018-09-19 11:08:10,865] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)
[2018-09-19 11:08:10,866] INFO WorkerSourceTask{id=local-file-source-0} flushing 0 outstanding messages for offset commit (org.apache.kafka.connect.runtime.WorkerSourceTask:414)
[2018-09-19 11:08:20,867] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)

消费消息能够显示:

{"schema":{"type":"string","optional":false},"payload":"foo"}
{"schema":{"type":"string","optional":false},"payload":"bar"}
{"schema":{"type":"string","optional":false},"payload":"Another line"}
{"schema":{"type":"string","optional":false},"payload":"lijieine"}
{"schema":{"type":"string","optional":false},"payload":"lindcvsde"}

但是test.sink.txt中无值:

[root@localhost data]# cat test.sink.txt 
[root@localhost data]#






发表于: 1月前   最后更新时间: 1月前   游览量:798
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 任务被强杀,并不会恢复,直到你手动重启。
    只有这一个错误吗?你看看broker里的日志。
    • [2018-09-19 14:16:16,596] ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
      org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
       at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
       at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
       at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
       at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
       at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
       at java.util.concurrent.FutureTask.run(FutureTask.java:266)
       at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
       at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
       at java.lang.Thread.run(Thread.java:748)
      Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
       at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
       at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
       at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
       at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
       ... 13 more
      [2018-09-19 14:16:16,598] ERROR WorkerSinkTask{id=local-file-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)


      还有这个
        • [2018-09-19 14:16:16,596] ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
          org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
           at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
           at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
           at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:510)
           at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
           at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
           at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
           at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
           at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
           at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
           at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
           at java.util.concurrent.FutureTask.run(FutureTask.java:266)
           at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
           at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
           at java.lang.Thread.run(Thread.java:748)
          Caused by: org.apache.kafka.connect.errors.DataException: JsonConverter with schemas.enable requires "schema" and "payload" fields and may not contain additional fields. If you are trying to deserialize plain JSON data, set schemas.enable=false in your converter configuration.
           at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:338)
           at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$0(WorkerSinkTask.java:510)
           at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
           at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
           ... 13 more
          [2018-09-19 14:16:16,598] ERROR WorkerSinkTask{id=local-file-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)


          还有这个
            • 配置:
              1:
              [root@localhost config]# cat connect-file-source.properties 

              name=local-file-source
              connector.class=FileStreamSource
              tasks.max=1
              file=/usr/local/kafka/data/test.txt
              topic=test
              2:
              [root@localhost config]# cat connect-file-sink.properties 
              name=local-file-sink
              connector.class=FileStreamSink
              tasks.max=1
              file=/usr/local/kafka/data/test.sink.txt
              topics=test

              3:
              [root@localhost config]# cat connect-standalone.properties 
              bootstrap.servers=192.168.16.129:19092,192.168.16.131:19092,192.168.16.132:19092
              key.converter=org.apache.kafka.connect.json.JsonConverter
              value.converter=org.apache.kafka.connect.json.JsonConverter
              key.converter.schemas.enable=true
              value.converter.schemas.enable=true
              offset.storage.file.filename=/tmp/connect.offsets
              # Flush much faster than normal, which is useful for testing/debugging
              offset.flush.interval.ms=10000
              # plugin.path=/usr/local/share/java,/usr/local/share/kafka/plugins,/opt/connectors,
              #plugin.path=

              4:运行
              [root@localhost bin]# ./connect-standalone.sh ../config/connect-standalone.properties ../config/connect-file-source.properties ../config/connect-file-sink.properties
                • 更改为false也报错
                  ERROR WorkerSinkTask{id=local-file-sink-0} Task threw an uncaught and unrecoverable exception (org.apache.kafka.connect.runtime.WorkerTask:177)
                  org.apache.kafka.connect.errors.ConnectException: Tolerance exceeded in error handler
                   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:178)
                   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:104)
                   at org.apache.kafka.connect.runtime.WorkerSinkTask.convertAndTransformRecord(WorkerSinkTask.java:513)
                   at org.apache.kafka.connect.runtime.WorkerSinkTask.convertMessages(WorkerSinkTask.java:490)
                   at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:321)
                   at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:225)
                   at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:193)
                   at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:175)
                   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
                   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                   at java.lang.Thread.run(Thread.java:748)
                  Caused by: org.apache.kafka.connect.errors.DataException: Converting byte[] to Kafka Connect data failed due to serialization error: 
                   at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:334)
                   at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:513)
                   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
                   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:162)
                   ... 13 more
                  Caused by: org.apache.kafka.common.errors.SerializationException: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lijie': was expecting ('true', 'false' or 'null')
                   at [Source: (byte[])"lijie"; line: 1, column: 11]
                  Caused by: com.fasterxml.jackson.core.JsonParseException: Unrecognized token 'lijie': was expecting ('true', 'false' or 'null')
                   at [Source: (byte[])"lijie"; line: 1, column: 11]
                   at com.fasterxml.jackson.core.JsonParser._constructError(JsonParser.java:1804)
                   at com.fasterxml.jackson.core.base.ParserMinimalBase._reportError(ParserMinimalBase.java:679)
                   at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._reportInvalidToken(UTF8StreamJsonParser.java:3526)
                   at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._handleUnexpectedValue(UTF8StreamJsonParser.java:2621)
                   at com.fasterxml.jackson.core.json.UTF8StreamJsonParser._nextTokenNotInObject(UTF8StreamJsonParser.java:826)
                   at com.fasterxml.jackson.core.json.UTF8StreamJsonParser.nextToken(UTF8StreamJsonParser.java:723)
                   at com.fasterxml.jackson.databind.ObjectMapper._readTreeAndClose(ObjectMapper.java:4042)
                   at com.fasterxml.jackson.databind.ObjectMapper.readTree(ObjectMapper.java:2571)
                   at org.apache.kafka.connect.json.JsonDeserializer.deserialize(JsonDeserializer.java:50)
                   at org.apache.kafka.connect.json.JsonConverter.toConnectData(JsonConverter.java:332)
                   at org.apache.kafka.connect.runtime.WorkerSinkTask.lambda$convertAndTransformRecord$1(WorkerSinkTask.java:513)
                   at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:128)
                   at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:219)
                   at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
                   at java.util.concurrent.FutureTask.run(FutureTask.java:266)
                   at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
                   at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
                   at java.lang.Thread.run(Thread.java:748)
                  [2018-09-19 15:21:59,930] ERROR WorkerSinkTask{id=local-file-sink-0} Task is being killed and will not recover until manually restarted (org.apache.kafka.connect.runtime.WorkerTask:178)
                  [2018-09-19 15:22:09,537] INFO WorkerSourceTask{id=local-file-source-0} Committing offsets (org.apache.kafka.connect.runtime.WorkerSourceTask:397)
                    • 这是解析异常了。Unrecognized token 'lijie': was expecting ('true', 'false' or 'null')
                       at [Source: (byte[])"lijie"; line: 1, column: 11]
                      • 评论…
                        • in this conversation