flink kafka Interrupted while joining ioThread

在使用flink 操作 kafka的时候出现如下问题,求助一下

2018-01-15 18:45:04,387 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5).
2018-01-15 18:45:04,387 INFO  org.apache.flink.runtime.taskmanager.Task                     - Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5) switched from RUNNING to CANCELING.
2018-01-15 18:45:04,391 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5).
2018-01-15 18:45:04,391 INFO  org.apache.flink.runtime.taskmanager.Task                     - Attempting to cancel task SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0).
2018-01-15 18:45:04,391 INFO  org.apache.flink.runtime.taskmanager.Task                     - SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0) switched from RUNNING to CANCELING.
2018-01-15 18:45:04,394 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
2018-01-15 18:45:04,396 INFO  org.apache.flink.runtime.taskmanager.Task                     - Triggering cancellation of task code SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0).
2018-01-15 18:45:04,396 ERROR org.apache.kafka.clients.producer.KafkaProducer               - Interrupted while joining ioThread
java.lang.InterruptedException
        at java.lang.Object.wait(Native Method)
        at java.lang.Thread.join(Thread.java:1253)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:672)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:651)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:630)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:118)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.dispose(AbstractUdfStreamOperator.java:126)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.disposeAllOperators(StreamTask.java:428)
        at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:333)
        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:702)
        at java.lang.Thread.run(Thread.java:745)
2018-01-15 18:45:04,397 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Map -> (Filter -> Map -> Sink: Unnam
ed, Filter -> Map -> Map, Filter -> Map -> Sink: Unnamed) (420e754665eace3e01125689feb47f59)
2018-01-15 18:45:04,397 INFO  org.apache.kafka.clients.producer.KafkaProducer               - Proceeding to force close the producer since pending requests could not be completed within timeout 9223372036854775807 ms.
2018-01-15 18:45:04,400 INFO  org.apache.zookeeper.ZooKeeper                                - Session: 0x260833c2b62f786 closed
2018-01-15 18:45:04,400 INFO  org.apache.zookeeper.ClientCnxn                               - EventThread shut down
2018-01-15 18:45:04,407 INFO  org.apache.flink.runtime.taskmanager.Task                     - SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0) switched from CANCELING to CANCELED.
2018-01-15 18:45:04,407 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fcaf0).
2018-01-15 18:45:04,407 INFO  org.apache.zookeeper.ZooKeeper                                - Session: 0x1605d320327352c closed
2018-01-15 18:45:04,407 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (5/5) (50f7a2a179bb19676c324bc2b82fca
f0) [CANCELED]
2018-01-15 18:45:04,407 INFO  org.apache.zookeeper.ClientCnxn                               - EventThread shut down
2018-01-15 18:45:04,412 INFO  org.apache.flink.runtime.taskmanager.Task                     - Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5) switched from CANCELING to CANCELED.
2018-01-15 18:45:04,412 INFO  org.apache.flink.runtime.taskmanager.Task                     - Freeing task resources for Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5d5).
2018-01-15 18:45:04,415 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state CANCELED to JobManager for task SkippedDataReduce -> Sink: SkippedDataStreamStaticsZkSink (50
f7a2a179bb19676c324bc2b82fcaf0)
2018-01-15 18:45:04,415 INFO  org.apache.flink.runtime.taskmanager.Task                     - Ensuring all FileSystem streams are closed for task Reduce -> Sink: SinkToZK (5/5) (6329cf26aac32624aa6eefe3227fa5
d5) [CANCELED]
2018-01-15 18:45:04,415 INFO  org.apache.flink.yarn.YarnTaskManager                         - Un-registering task and sending final execution state CANCELED to JobManager for task Reduce -> Sink: SinkToZK (63
29cf26aac32624aa6eefe3227fa5d5)
2018-01-15 18:45:04,417 ERROR org.apache.flink.streaming.runtime.tasks.StreamTask           - Error during disposal of stream operator.
org.apache.kafka.common.KafkaException: Failed to close kafka producer
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:702)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:651)
        at org.apache.kafka.clients.producer.KafkaProducer.close(KafkaProducer.java:630)
        at org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducerBase.close(FlinkKafkaProducerBase.java:317)
        at org.apache.flink.api.common.functions.util.FunctionUtils.closeFunction(FunctionUtils.java:43)
        at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.close(AbstractUdfStreamOperator.java:118)






发表于: 8月前   最后更新时间: 8月前   游览量:1059
上一条: 前提:(kafka集群3个broker,topic 8个分区 3个副本).问题(杀掉broker0之后,就无法消费,可以produce)
下一条: kafka producer发送消息send报错?

评论…


  • 我的系统里,有一段逻辑是会把数据往另一个kafka 的 topic里写
    这个写是不间断的
    所以在我写kafka之前系统出现了其他问题,导致宕机
    日志里就会出现这个kafka的问题

    其实这个问题是,代码中有段逻辑内存使用不合理,导致程序不稳定,程序一宕机就会在各个环节出错
    这次正好是在写kafka的时候错误抛异常了
    什么原因啊  解决了没  我这边也出现这个问题了
    生产者被提前关闭了,如果是进程测试的话,在结束的时候,休眠一下,给缓存的消息一个时间发送。
  • 评论…
    • in this conversation