flink消费kafka 异常结束 Custom Source ? Attempting to cancel task Source: Custom Source

Lion 发表于: 2020-07-01   最后更新时间: 2020-07-01 14:51:03   4,398 游览

flink自动结束Custom Source

1、flink正常运行一段时间后(一两天)突然自动结束Custom Source,并且关闭couchbase取数类,导致后续读取couchbase超时。
2、目前发现存在一定程度的数据倾斜的问题,不知道是不是数据倾斜导致的

20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Attempting to cancel task Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d) switched from RUNNING to CANCELING.
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Triggering cancellation of task code Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Attempting to cancel task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9) switched from RUNNING to CANCELING.
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Triggering cancellation of task code Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d) switched from CANCELING to CANCELED.
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Freeing task resources for Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d).
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.69/bb9.public119-bb9.2.bbd.cb
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.68/bb9.public119-bb9.1.bbd.cb
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.70/11.112.125.70
20/06/30 11:00:21 INFO couchbase.client.core.config.ConfigurationProvider: Closed bucket mba_rec_realtime
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.71/11.112.125.71
20/06/30 11:00:21 INFO apache.flink.runtime.taskmanager.Task: Ensuring all FileSystem streams are closed for task Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d) [CANCELED]
20/06/30 11:00:21 INFO apache.flink.runtime.taskexecutor.TaskExecutor: Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map -> Filter 5e68e3be7a8b4e83983688fc2185f90d.
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown kvIoPool: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown IoPool: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown viewIoPool: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown queryIoPool: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown searchIoPool: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Core Scheduler: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Runtime Metrics Collector: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Latency Metrics Collector: success 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Netty: failure 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Netty shutdown is best effort, ignoring failure
20/06/30 11:00:24 INFO couchbase.client.core.node.Node: Disconnected from Node 10.62.56.138/bjzyx.public304-bjzyx.1.bbd.cb
20/06/30 11:00:24 INFO couchbase.client.core.node.Node: Disconnected from Node 10.62.56.139/bjzyx.public304-bjzyx.2.bbd.cb
20/06/30 11:00:24 INFO couchbase.client.core.node.Node: Disconnected from Node 10.62.56.140/10.62.56.140
20/06/30 11:00:24 INFO couchbase.client.core.config.ConfigurationProvider: Closed bucket qijian_meta
20/06/30 11:00:24 INFO couchbase.client.core.node.Node: Disconnected from Node 10.62.56.141/10.62.56.141
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown IoPool: success 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown kvIoPool: success 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown viewIoPool: success 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown queryIoPool: success 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown searchIoPool: success 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Core Scheduler: success 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Runtime Metrics Collector: success 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Latency Metrics Collector: success 
20/06/30 11:00:25 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Netty: success 
20/06/30 11:00:25 INFO apache.flink.runtime.taskmanager.Task: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9) switched from CANCELING to CANCELED.
20/06/30 11:00:25 INFO apache.flink.runtime.taskmanager.Task: Freeing task resources for Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9).
20/06/30 11:00:25 INFO apache.flink.runtime.taskmanager.Task: Ensuring all FileSystem streams are closed for task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9) [CANCELED]
20/06/30 11:00:25 INFO apache.flink.runtime.taskexecutor.TaskExecutor: Un-registering task and sending final execution state CANCELED to JobManager for task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) decbf840da4b07b5874a6f16364171d9.
20/06/30 11:01:54 INFO apache.flink.runtime.taskexecutor.TaskExecutor: Received task Source: Custom Source -> Flat Map -> Filter (7/50).
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) switched from CREATED to DEPLOYING.
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Creating FileSystem stream leak safety net for task Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) [DEPLOYING]
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Loading JAR files for task Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) [DEPLOYING].
20/06/30 11:01:54 INFO apache.flink.runtime.taskexecutor.TaskExecutor: Received task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50).
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Registering task at network: Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) [DEPLOYING].
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) switched from CREATED to DEPLOYING.
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Creating FileSystem stream leak safety net for task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) [DEPLOYING]
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Loading JAR files for task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) [DEPLOYING].
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Registering task at network: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) [DEPLOYING].
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) switched from DEPLOYING to RUNNING.
20/06/30 11:01:54 INFO apache.flink.runtime.taskmanager.Task: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) switched from DEPLOYING to RUNNING.
20/06/30 11:01:54 INFO flink.streaming.runtime.tasks.StreamTask: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
20/06/30 11:01:54 INFO flink.streaming.runtime.tasks.StreamTask: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
20/06/30 11:01:54 WARN org.apache.flink.metrics.MetricGroup: The operator name Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) exceeded the 80 characters length limit and was truncated.
20/06/30 11:01:54 INFO flink.runtime.state.heap.HeapKeyedStateBackend: Initializing heap keyed state backend with stream factory.
20/06/30 11:01:54 INFO flink.api.java.typeutils.TypeExtractor: class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition does not contain a setter for field topic
20/06/30 11:01:54 INFO flink.api.java.typeutils.TypeExtractor: Class class org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition cannot be used as a POJO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
20/06/30 11:01:54 INFO flink.streaming.connectors.kafka.FlinkKafkaConsumerBase: No restore state for FlinkKafkaConsumer.
20/06/30 11:01:54 INFO apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [dc-resource-1657031ad-24.bbd.virtual:9092, dc-resource-1657031ad-37.bbd.virtual:9092, dc-resource-1657031ad-10.bbd.virtual:9092, dc-resource-1657031ad-14.bbd.virtual:9092, dc-resource-1657031ad-29.bbd.virtual:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = uaa.flink.feige_push_message_arrive_click
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
20/06/30 11:01:54 WARN apache.kafka.clients.consumer.ConsumerConfig: The configuration 'zookeeper.connect' was supplied but isn't a known config.
20/06/30 11:01:54 INFO apache.kafka.common.utils.AppInfoParser: Kafka version : 2.0.1
20/06/30 11:01:54 INFO apache.kafka.common.utils.AppInfoParser: Kafka commitId : fa14705e51bd2ce5
20/06/30 11:01:54 INFO org.apache.kafka.clients.Metadata: Cluster ID: QsC877N-QiWmXol7pgw7ig
20/06/30 11:01:54 INFO flink.streaming.connectors.kafka.FlinkKafkaConsumerBase: Consumer subtask 6 initially has no partitions to read from.
20/06/30 11:01:54 INFO apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [dc-resource-1657031ad-24.bbd.virtual:9092, dc-resource-1657031ad-37.bbd.virtual:9092, dc-resource-1657031ad-10.bbd.virtual:9092, dc-resource-1657031ad-14.bbd.virtual:9092, dc-resource-1657031ad-29.bbd.virtual:9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    default.api.timeout.ms = 60000
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = uaa.flink.feige_push_message_arrive_click
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
20/06/30 11:01:55 WARN apache.kafka.clients.consumer.ConsumerConfig: The configuration 'zookeeper.connect' was supplied but isn't a known config.
20/06/30 11:01:55 INFO apache.kafka.common.utils.AppInfoParser: Kafka version : 2.0.1
20/06/30 11:01:55 INFO apache.kafka.common.utils.AppInfoParser: Kafka commitId : fa14705e51bd2ce5
20/06/30 11:02:03 ERROR mba.rec.commons.repository.MbaRtFetchRepository: get [pshmsg:200629228191226644, pshmsg:200614028163616398, pshmsg:200613028185402716, pshmsg:200629028179557883, pshmsg:200629028184901551] from cb failed!
rx.exceptions.CompositeException: 5 exceptions occurred. 
    at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:818)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:579)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:852)
    at rx.observers.SerializedObserver.onError(SerializedObserver.java:152)
    at rx.observers.SerializedSubscriber.onError(SerializedSubscriber.java:78)
    at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onTimeout(OperatorTimeoutBase.java:177)
    at rx.internal.operators.OperatorTimeout$1$1.call(OperatorTimeout.java:41)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  ComposedException 1 :
    java.util.concurrent.TimeoutException
        at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onTimeout(OperatorTimeoutBase.java:177)
        at rx.internal.operators.OperatorTimeout$1$1.call(OperatorTimeout.java:41)
        at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
  ComposedException 2 :
    java.util.concurrent.TimeoutException
        at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onTimeout(OperatorTimeoutBase.java:177)
        at rx.internal.operators.OperatorTimeout$1$1.call(OperatorTimeout.java:41)
        at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
添加评论
你的答案

查看kafka相关的其他问题或提一个您自己的问题