flink自动结束Custom Source
1、flink正常运行一段时间后(一两天)突然自动结束Custom Source,并且关闭couchbase取数类,导致后续读取couchbase超时。
2、目前发现存在一定程度的数据倾斜的问题,不知道是不是数据倾斜导致的
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Attempting to cancel task Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d) switched from RUNNING to CANCELING.
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Triggering cancellation of task code Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Attempting to cancel task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9) switched from RUNNING to CANCELING.
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Triggering cancellation of task code Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d) switched from CANCELING to CANCELED.
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Freeing task resources for Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d).
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.69/bb9.public119-bb9.2.bbd.cb
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.68/bb9.public119-bb9.1.bbd.cb
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.70/11.112.125.70
20/06/30 11:00:21 INFO couchbase.client.core.config.ConfigurationProvider: Closed bucket mba_rec_realtime
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.71/11.112.125.71
20/06/30 11:00:21 INFO apache.flink.runtime.taskmanager.Task: Ensuring all FileSystem streams are closed for task Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d) [CANCELED]
20/06/30 11:00:21 INFO apache.flink.runtime.taskexecutor.TaskExecutor: Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map -> Filter 5e68e3be7a8b4e83983688fc2185f90d.
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown kvIoPool: success
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown IoPool: success
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown viewIoPool: success
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown queryIoPool: success
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown searchIoPool: success
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Core Scheduler: success
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Runtime Metrics Collector: success
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Latency Metrics Collector: success
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Netty: failure
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Netty shutdown is best effort, ignoring failure
20/06/30 11:00:24 INFO couchbase.client.core.node.Node: Disconnected from Node 10.62.56.138/bjzyx.public304-bjzyx.1.bbd.cb
20/06/30 11:00:24 INFO couchbase.client.core.node.Node: Disconnected from Node 10.62.56.139/bjzyx.public304-bjzyx.2.bbd.cb
20/06/30 11:00:24 INFO couchbase.client.core.node.Node: Disconnected from Node 10.62.56.140/10.62.56.140
20/06/30 11:00:24 INFO couchbase.client.core.config.ConfigurationProvider: Closed bucket qijian_meta
20/06/30 11:00:24 INFO couchbase.client.core.node.Node: Disconnected from Node 10.62.56.141/10.62.56.141
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown IoPool: success
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown kvIoPool: success
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown viewIoPool: success
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown queryIoPool: success
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown searchIoPool: success
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Core Scheduler: success
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Runtime Metrics Collector: success
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Latency Metrics Collector: success
20/06/30 11:00:25 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Netty: success
20/06/30 11:00:25 INFO apache.flink.runtime.taskmanager.Task: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9) switched from CANCELING to CANCELED.
20/06/30 11:00:25 INFO apache.flink.runtime.taskmanager.Task: Freeing task resources for Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9).
20/06/30 11:00:25 INFO apache.flink.runtime.taskmanager.Task: Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9) [CANCELED]
20/06/30 11:00:25 INFO apache.flink.runtime.taskexecutor.TaskExecutor: Un-registering task and sending final execution state CANCELED to JobManager for task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) decbf840da4b07b5874a6f16364171d9.
20/06/30 11:01:54 INFO apache.flink.runtime.taskexecutor.TaskExecutor: Received task Source: Custom Source -> Flat Map -> Filter (7/50).
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) switched from CREATED to DEPLOYING.
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Creating FileSystem stream leak safety net for task Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) [DEPLOYING]
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Loading JAR files for task Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) [DEPLOYING].
20/06/30 11:01:54 INFO apache.flink.runtime.taskexecutor.TaskExecutor: Received task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50).
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Registering task at network: Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) [DEPLOYING].
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) switched from CREATED to DEPLOYING.
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Creating FileSystem stream leak safety net for task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) [DEPLOYING]
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Loading JAR files for task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) [DEPLOYING].
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Registering task at network: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) [DEPLOYING].
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) switched from DEPLOYING to RUNNING.
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) switched from DEPLOYING to RUNNING.
20/06/30 11:01:54 INFO flink.streaming.runtime.tasks.StreamTask: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
20/06/30 11:01:54 INFO flink.streaming.runtime.tasks.StreamTask: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
20/06/30 11:01:54 WARN org.apache.flink.metrics.MetricGroup: The operator name Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) exceeded the 80 characters length limit and was truncated.
20/06/30 11:01:54 INFO flink.runtime.state.heap.HeapKeyedStateBackend: Initializing heap keyed state backend with stream factory.
20/06/30 11:01:54 INFO flink.api.java.typeutils.TypeExtractor: class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does not contain a setter for field topic
20/06/30 11:01:54 INFO flink.api.java.typeutils.TypeExtractor: Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
20/06/30 11:01:54 INFO flink.streaming.connectors.kafka.FlinkKafkaConsumerBase: No restore state for FlinkKafkaConsumer.
20/06/30 11:01:54 INFO apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [dc-resource-1657031ad-24.bbd.virtual:9092, dc-resource-1657031ad-37.bbd.virtual:9092, dc-resource-1657031ad-10.bbd.virtual:9092, dc-resource-1657031ad-14.bbd.virtual:9092, dc-resource-1657031ad-29.bbd.virtual:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = uaa.flink.feige_push_message_arrive_click
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
20/06/30 11:01:54 WARN apache.kafka.clients.consumer.ConsumerConfig: The configuration 'zookeeper.connect' was supplied but isn't a known config.
20/06/30 11:01:54 INFO apache.kafka.common.utils.AppInfoParser: Kafka version : 2.0.1
20/06/30 11:01:54 INFO apache.kafka.common.utils.AppInfoParser: Kafka commitId : fa14705e51bd2ce5
20/06/30 11:01:54 INFO org.apache.kafka.clients.Metadata: Cluster ID: QsC877N-QiWmXol7pgw7ig
20/06/30 11:01:54 INFO flink.streaming.connectors.kafka.FlinkKafkaConsumerBase: Consumer subtask 6 initially has no partitions to read from.
20/06/30 11:01:54 INFO apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values:
auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [dc-resource-1657031ad-24.bbd.virtual:9092, dc-resource-1657031ad-37.bbd.virtual:9092, dc-resource-1657031ad-10.bbd.virtual:9092, dc-resource-1657031ad-14.bbd.virtual:9092, dc-resource-1657031ad-29.bbd.virtual:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 540000
default.api.timeout.ms = 60000
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1
group.id = uaa.flink.feige_push_message_arrive_click
heartbeat.interval.ms = 3000
interceptor.classes = []
internal.leave.group.on.close = true
isolation.level = read_uncommitted
key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
max.partition.fetch.bytes = 1048576
max.poll.interval.ms = 300000
max.poll.records = 500
metadata.max.age.ms = 300000
metric.reporters = []
metrics.num.samples = 2
metrics.recording.level = INFO
metrics.sample.window.ms = 30000
partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
receive.buffer.bytes = 65536
reconnect.backoff.max.ms = 1000
reconnect.backoff.ms = 50
request.timeout.ms = 30000
retry.backoff.ms = 100
sasl.client.callback.handler.class = null
sasl.jaas.config = null
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.min.time.before.relogin = 60000
sasl.kerberos.service.name = null
sasl.kerberos.ticket.renew.jitter = 0.05
sasl.kerberos.ticket.renew.window.factor = 0.8
sasl.login.callback.handler.class = null
sasl.login.class = null
sasl.login.refresh.buffer.seconds = 300
sasl.login.refresh.min.period.seconds = 60
sasl.login.refresh.window.factor = 0.8
sasl.login.refresh.window.jitter = 0.05
sasl.mechanism = GSSAPI
security.protocol = PLAINTEXT
send.buffer.bytes = 131072
session.timeout.ms = 10000
ssl.cipher.suites = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.endpoint.identification.algorithm = https
ssl.key.password = null
ssl.keymanager.algorithm = SunX509
ssl.keystore.location = null
ssl.keystore.password = null
ssl.keystore.type = JKS
ssl.protocol = TLS
ssl.provider = null
ssl.secure.random.implementation = null
ssl.trustmanager.algorithm = PKIX
ssl.truststore.location = null
ssl.truststore.password = null
ssl.truststore.type = JKS
value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
20/06/30 11:01:55 WARN apache.kafka.clients.consumer.ConsumerConfig: The configuration 'zookeeper.connect' was supplied but isn't a known config.
20/06/30 11:01:55 INFO apache.kafka.common.utils.AppInfoParser: Kafka version : 2.0.1
20/06/30 11:01:55 INFO apache.kafka.common.utils.AppInfoParser: Kafka commitId : fa14705e51bd2ce5
20/06/30 11:02:03 ERROR mba.rec.commons.repository.MbaRtFetchRepository: get [pshmsg:200629228191226644, pshmsg:200614028163616398, pshmsg:200613028185402716, pshmsg:200629028179557883, pshmsg:200629028184901551] from cb failed!
rx.exceptions.CompositeException: 5 exceptions occurred.
at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268)
at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:818)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:579)
at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:852)
at rx.observers.SerializedObserver.onError(SerializedObserver.java:152)
at rx.observers.SerializedSubscriber.onError(SerializedSubscriber.java:78)
at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onTimeout(OperatorTimeoutBase.java:177)
at rx.internal.operators.OperatorTimeout$1$1.call(OperatorTimeout.java:41)
at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
ComposedException 1 :
java.util.concurrent.TimeoutException
at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onTimeout(OperatorTimeoutBase.java:177)
at rx.internal.operators.OperatorTimeout$1$1.call(OperatorTimeout.java:41)
at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
ComposedException 2 :
java.util.concurrent.TimeoutException
at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onTimeout(OperatorTimeoutBase.java:177)
at rx.internal.operators.OperatorTimeout$1$1.call(OperatorTimeout.java:41)
at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
你的答案