flink kafka Interrupted while joining ioThread

在名 发表于: 2018-01-15   最后更新时间: 2018-01-15 22:49:13   7,915 游览

在使用flink 操作 kafka的时候出现如下问题,求助一下

2018-01-15 18:45:04,387 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5).
2018-01-15 18:45:04,387 INFO  org.apache.flink.runtime.taskmanager.Task                     - Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5) switched from RUNNING to CANCELING.
2018-01-15 18:45:04,391 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5).
2018-01-15 18:45:04,391 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0).
2018-01-15 18:45:04,391 INFO  org.apache.flink.runtime.taskmanager.Task                     - SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0) switched from RUNNING to CANCELING.
2018-01-15 18:45:04,394 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-01-15 18:45:04,396 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0).
2018-01-15 18:45:04,396 ERROR org.apache.kafka.clients.producer.KafkaProducer               - Interrupted while joining ioThread
java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1253)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:672)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:651)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:630)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:118)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:428)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:333)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
2018-01-15 18:45:04,397 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Unnam
ed, Filter -> Map -> Map, Filter -> Map -> Sink: Unnamed) (420e754665eace3e01125689feb47f59)
2018-01-15 18:45:04,397 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.
2018-01-15 18:45:04,400 INFO  org.apache.zookeeper.ZooKeeper                                - Session: 0x260833c2b62f786 closed
2018-01-15 18:45:04,400 INFO  org.apache.zookeeper.ClientCnxn                               - EventThread shut down
2018-01-15 18:45:04,407 INFO  org.apache.flink.runtime.taskmanager.Task                     - SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0) switched from CANCELING to CANCELED.
2018-01-15 18:45:04,407 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0).
2018-01-15 18:45:04,407 INFO  org.apache.zookeeper.ZooKeeper                                - Session: 0x1605d320327352c closed
2018-01-15 18:45:04,407 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fca
f0) [CANCELED]
2018-01-15 18:45:04,407 INFO  org.apache.zookeeper.ClientCnxn                               - EventThread shut down
2018-01-15 18:45:04,412 INFO  org.apache.flink.runtime.taskmanager.Task                     - Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5) switched from CANCELING to CANCELED.
2018-01-15 18:45:04,412 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5).
2018-01-15 18:45:04,415 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state CANCELED to JobManager for task SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (50
f7a2a179bb19676c324bc2b82fcaf0)
2018-01-15 18:45:04,415 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5
d5) [CANCELED]
2018-01-15 18:45:04,415 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state CANCELED to JobManager for task Reduce -> Sink: SinkToZK (63
29cf26aac32624aa6eefe3227fa5d5)
2018-01-15 18:45:04,417 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:702)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:651)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:630)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:118)
发表于 2018-01-15
添加评论

什么原因啊  解决了没  我这边也出现这个问题了

我的系统里,有一段逻辑是会把数据往另一个kafka 的 topic里写
这个写是不间断的
所以在我写kafka之前系统出现了其他问题,导致宕机
日志里就会出现这个kafka的问题

其实这个问题是,代码中有段逻辑内存使用不合理,导致程序不稳定,程序一宕机就会在各个环节出错
这次正好是在写kafka的时候错误抛异常了

生产者被提前关闭了,如果是进程测试的话,在结束的时候,休眠一下,给缓存的消息一个时间发送。

这个问题解决没有,什么原因?

我的是计算过程中出现了问题,解决问题后这个错误就没有了,可以查看下其他错误

你的答案

查看kafka相关的其他问题或提一个您自己的问题