flink消费kafka 异常结束 Custom Source ? Attempting to cancel task Source: Custom Source

Lion 发表于: 2020-07-01   最后更新时间: 2020-07-01 14:51:03   4,794 游览

flink自动结束Custom Source

1、flink正常运行一段时间后(一两天)突然自动结束Custom Source,并且关闭couchbase取数类,导致后续读取couchbase超时。
2、目前发现存在一定程度的数据倾斜的问题,不知道是不是数据倾斜导致的

20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Attempting to cancel task Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d) switched from RUNNING to CANCELING.
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Triggering cancellation of task code Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Attempting to cancel task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9) switched from RUNNING to CANCELING.
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Triggering cancellation of task code Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9).
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d) switched from CANCELING to CANCELED.
20/06/30 11:00:20 INFO apache.flink.runtime.taskmanager.Task: Freeing task resources for Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d).
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.69/bb9.public119-bb9.2.bbd.cb
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.68/bb9.public119-bb9.1.bbd.cb
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.70/11.112.125.70
20/06/30 11:00:21 INFO couchbase.client.core.config.ConfigurationProvider: Closed bucket mba_rec_realtime
20/06/30 11:00:21 INFO couchbase.client.core.node.Node: Disconnected from Node 11.112.125.71/11.112.125.71
20/06/30 11:00:21 INFO apache.flink.runtime.taskmanager.Task: Ensuring all FileSystem streams are closed for task Source: Custom Source -> Flat Map -> Filter (45/50) (5e68e3be7a8b4e83983688fc2185f90d) [CANCELED]
20/06/30 11:00:21 INFO apache.flink.runtime.taskexecutor.TaskExecutor: Un-registering task and sending final execution state CANCELED to JobManager for task Source: Custom Source -> Flat Map -> Filter 5e68e3be7a8b4e83983688fc2185f90d.
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown kvIoPool: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown IoPool: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown viewIoPool: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown queryIoPool: success 
20/06/30 11:00:21 INFO couchbase.client.core.env.CoreEnvironment: Shutdown searchIoPoolool: success20/06/30 11:00:21 INFObase.cliente.environment: Shutdownduler: success20/06/30 11:00:21 INFObase.clientreEnvironment: Shutdownetrics Collectors 
20/06/30 11:00:21 INFObase.clientreEnvironment: Shutdownetrics Collectors 
20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown Netty: failure 
20/06/30 11:00:24 INFObase.cliente.cliente.environment: Nettyn iseffort, ignoring failure
20/06/30 11:00:24 INFO couchbase.cliente.nodeDisconnected10.62.56.138/bjzyx.public304-bjzyx.1.bbd20/06/30 11:00:24 INFObase.client.core.nodeode: DisconnectedDisconnectedconnected fromom Node10.62.56.139/bjzyx.public304ic304-bjzyx.2.bbd20/06/30 11:00:24 INFO10.62.56.140/10.62.56.140
20/06/30 11:00:24 INFOe.cliente.configigurationProvider: ClosedrationProvider: Closed bucket qijian_meta20/06/30 11:00:24 INFObase.clientode: Disconnected10.62.56.141/10.62.56.141
20/06/30 11:00:24 INFObase.cliente.cliente.environment: Shutdownnment: Shutdownown IoPoolol: success20/06/30 11:00:24 INFO couchbase.client.core.env.CoreEnvironment: Shutdown kvIoPool: success 
20/06/30 11:00:24 INFObase.clientreEnvironment: Shutdownl: success20/06/30 11:00:24 INFObase.clientreEnvironment: Shutdownironment: Shutdownnment: Shutdownown queryIoPoolyIoPool: successol: successuccess 
20/06/30 11:00:24 INFOcouchbasebase.cliente.cliente.environment: Shutdownnment: Shutdownown searchIoPool: success20/06/30 11:00:24 INFO20/06/30 11:00:24 INFO couchbase.clientreEnvironment: Shutdownetrics Collectors 
20/06/30 11:00:24 INFO couchbasebase.cliente.cliente.environment: Shutdownetrics Collectors 
20/06/30 11:00:25 INFObase.clientreEnvironment: Shutdownccess 
20/06/30 11:00:25 INFOapachelink.runtime.taskmanagerager.TaskTaskmblingProcessingTimeWindows(10000), ProcessingTimeTriggerageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9) switched from CANCELING to CANCELED.
20/06/30 11:00:25 INFOapachee.flinklink.runtime.taskmanagerk: Freeingurces foressingTimeWindows(10000), ProcessingTimeTriggerageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9).
20/06/30 11:00:25 INFOe.flink.runtime.taskmanagerager.TaskTaskallm streams are closed for Windowow(TumblingProcessingTimeWindows10000), ProcessingTimeTriggerageWindow) (21/50) (decbf840da4b07b5874a6f16364171d9ANCELED]
20/06/30 11:00:25 INFOe.flink.runtime.taskexecutor.TaskExecutor: Un-registering task and sending final execution state CANCELED to JobManager for task Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow74a6f16364171d9.
20/06/30 11:01:54 INFOapachelink.runtimecutor.TaskExecutor: Received Source: Custom> Flat -> Filter (7/50).
20/06/30 11:01:54 INFO apachee.flink.runtimeager.Task: Source: Custom Source -> Flat Map -> Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7aitched from CREATED to
20/06/30 11:01:54 INFOe.flink.runtime.taskmanager.Task: Creating FileSystemleak task Source: Customce -> Flat Mapter (7/50) (ab630f8fc8942c21b5d2bf0913202b7aEPLOYING]
20/06/30 11:01:54 INFO apache.flinklink.runtime.taskmanager.Task: Loading JAR forSource: Customce -> Flat Map Filter (7/50) (ab630f8fc8942c21b5d2bf0913202b7aEPLOYING].
20/06/30 11:01:54 INFOe.flink.runtimecutor.TaskExecutor: Received taskow(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindowageWindow) (30/50).
20/06/30 11:01:54 INFOe.flink.runtimeager.Task: Registeringng task attwork: Source: Customurce -> Flat Mapr (7/50) (ab630f8fc8942c21b5d2bf0913202b7a) [DEPLOYINGEPLOYING].
20/06/30 11:01:54 INFOe.flink.runtimeager.Task: WindowmblingProcessingTimeWindows(10000), ProcessingTimeTriggerageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) switched from CREATEDd from CREATED toYING.
20/06/30 11:01:54 INFOapachelink.runtimeager.Task: CreatingFileSystem streamleak safety task Windowow(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) [DEPLOYINGING]
20/06/30 11:01:54 INFO apachelink.runtimeager.Task: LoadingAR filestask WindowlingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow30/50) (15b018382670070dd8707b29e11a8a14) [DEPLOYING20/06/30 11:01:54 INFO apachee.flink.runtime.taskmanagerk: Registering task attwork: Window(TumblingProcessingTimeWindows(10000), ProcessingTimeTrigger, PushMessageWindow) (30/50) (15b018382670070dd8707b29e11a8a14) [DEPLOYING].
20/06/30 11:01:54 INFO7/50) (ab630f8fc8942c21b5d2bf0913202b7a30f8fc8942c21b5d2bf0913202b7a) switchedo RUNNING20/06/30 11:01:54 INFO10000), ProcessingTimeTriggercessingTimeTrigger, PushMessageWindow30/50) (15b018382670070dd8707b29e11a8a142670070dd8707b29e11a8a14) switchedrom DEPLOYINGG to RUNNING
20/06/30 11:01:54 INFO.streamingTask: Nond hasn configured default/ JobManagerager) MemoryStateBackend MemoryStateBackend (datamemoryheckpoints totoanager) (checkpoints: 'null TRUEE, maxStateSizetateSize: 5242880)
20/06/30 11:01:54 INFO flink.streaming.runtime.tasks.StreamTask: No state backend has been configured, using default (Memory / JobManager) MemoryStateBackend (data in heap memory / checkpoints to JobManager) (checkpoints: 'null', savepoints: 'null', asynchronous: TRUE, maxStateSize: 5242880)
20/06/30 11:01:54 WARNpache.flink.metrics.MetricGroupor name Windoww(TumblingProcessingTimeWindows10000), ProcessingTimeTriggercessingTimeTrigger, PushMessageWindow) exceededthe80 characters lengthngth limit andruncated.
20/06/30 11:01:54 INFO flink.runtime.state.heap.HeapKeyedStateBackendzing heap keyed backend witham factory20/06/30 11:01:54 INFO flink.apii.javaeExtractor: class.flinkeaming.connectorsinternalsPartition does not contain a fieldic
20/06/30 11:01:54 INFO flink.apii.javaeExtractor: Classapache.flinknk.streamingg.connectors.internals.KafkaTopicPartition cannotasO type because not all fields are valid POJO fields, and must be processed as GenericType. Please read the Flink documentation on "Data Types & Serialization" for details of the effect on performance.
20/06/30 11:01:54 INFO flink.streaming.connectors.kafka.FlinkKafkaConsumerBase: No restore state for FlinkKafkaConsumeraming.connectors: No restorekaConsumer.
20/06/30 11:01:54 INFO INFO apache.kafka.clientsr.ConsumerConfig  auto INFO apache.kafka.clients.consumer.ConsumerConfig: ConsumerConfig values: 
    auto.commit.interval.ms = 5000
    autoset.reset = latest
    bootstrap.servers = [dc-resource-1657031ad24.bbd.virtualbbd.virtual:9092, dc-resource-1657031ad-37.bbd.virtual:9092, dc-resource-1657031ad-10.bbd.virtual:9092, dc-resource-1657031adad14.bbd.virtual:9092, dc1657031ad-29.bbd.virtual:9092]
    check.crcs = true
    client.id = 
    connections.max]
    check.crcs client.id540000
    default.api.timeout.ms = 60000
    enable.auto.commit
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = uaa.flink.feige_push_message_arrive_click
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common
    interceptor.classesinternal
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apacheor.classes
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max
    interceptor.classesernal.leave.on
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level
    interceptor.classes
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apacherceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max
    interceptorinterceptor.classes = []
    internal.leave.group.on.close = true
    isolation
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.classes
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadatadata.max.age.ms
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.client.callback.handlers.config
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinitllback.handler
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl
    sasl.client= null
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time
    sasl.clientck.handler
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberosback.handler
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.secondsn.refresh.minriod.seconds = 60
    sasl.login.refresh.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1int.identification    ssl.key.password = nuller.algorithmrithm = SunX509  ssln = nullystore.password= JKS
    ssl.protocol = TLS
    ssl.provider = nullcure.randomntation = null, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
20/06/30 11:01:54 WARN apache.kafka.clients.consumer.ConsumerConfig: The configuration 'zookeeper.connect' was supplied but isn't a known config.
20/06/30 11:01:54 INFO apache.kafka.common.utils.AppInfoParser: Kafka version : 2.0.1
20/06/30 11:01:54 INFO apache.kafka.common.utils.AppInfoParser: Kafka commitId : fa14705e51bd2ce5
20/06/30 11:01:54 INFO org.apache.kafka.clientsa.clients INFO org.apache.kafka.clients.Metadata: Cluster ID: QsC877N-QiWmXol7pgw7ig
20/06/30 11:01:54 INFO flink.streaming.connectors.kafka.FlinkKafkaConsumerBase: Consumer subtask 6 initially has no partitions to read from.
20/06/30 11:01:54 INFO apachesumer.ConsumerConfigrConfig values: 
    auto.commiterval.ms = 5000
    auto.offset.reset = latest
    bootstrap.servers = [dc-resourcelatest
    bootstrap.servers = [dc-resource-1657031ad-24.bbd.virtual:9092, dc1657031ad-37.bbd.virtual:9092, dc-resource-1657031ad10.bbd9092, dc1657031ad14.bbd9092, dc1657031ad29.bbd9092]
    check.crcs = true
    client.id = 
    connections.max.idle.ms = 540000
    default
    default.api60000
    enable
    enable.autode.internaltch.max
    enable.auto.commit = true
    exclude.internal.topics = true
    fetch.max.bytes = 52428800
    fetch.max.wait.ms = 500
    fetch.min.bytes = 1
    group.id = uaak.feige_push_message_arrive_clickpush_message_arrive_click
    heartbeatval.ms
    group.id = uaa.flink.feige_push_message_arrive_click
    heartbeat.interval.ms = 3000
    interceptor.classes = []
    internal.leave.group.on
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = classasses = []
    internal
    interceptor.classes = []
    internal.leave.group.on.close = true
    isolation.level = read_uncommitted
    key.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
    max.partition.fetch.bytes = 1048576
    max.poll.interval.ms = 300000
    max.poll.records = 500
    metadata.max.age.ms = 300000
    metric.reporters = []
    metrics.num.samples = 2
    metrics.recording.level = INFO
    metrics.sample.window.ms = 30000
    partition.assignment.strategy = [class org.apache.kafka.clients.consumer.RangeAssignor]
    receive.buffer.bytes = 65536
    reconnect.backoff.max.ms = 1000
    reconnect.backoff.ms = 50
    request.timeout.ms = 30000
    retry.backoff.ms = 100
    sasl.client.callback.handler.class = null
    sasl.jaas.config = null
    sasl.kerberos.kinit.cmd = /usr/bin/kinit
    sasl.kerberos.min.time.before.relogin = 60000
    sasl.kerberos.service.name = null
    sasl.kerberos.ticket.renew.jitter = 0.05
    sasl.kerberos.ticket.renew.window.factor = 0.8
    sasl.login.callback.handler.class = null
    sasl.login.class = null
    sasl.login.refresh.buffer.seconds = 300
    sasl.login.refresh.min.period.seconds = 60
    sasl.login.refresh.window.factor = 0.8
    sasl.login.refresh.window.jitter = 0.05
    sasl.mechanism = GSSAPI
    security.protocol = PLAINTEXT
    send.buffer.bytes = 131072
    session.timeout.ms = 10000
    ssl.cipher.suites = null
    ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
    ssl.endpoint.identification.algorithm = https
    ssl.key.password = null
    ssl.keymanager.algorithm = SunX509
    ssl.keystore.location = null
    ssl.keystore.password = null
    ssl.keystore.type = JKS
    ssl.protocol = TLS
    ssl.provider = null
    ssl.secure.random.implementation = null
    ssl.trustmanager.algorithm = PKIX
    ssl.truststore.location = null
    ssl.truststore.password = null
    ssl.truststore.type = JKS
    value.deserializer = class org.apache.kafka.common.serialization.ByteArrayDeserializer
20/06/30 11:01:55 WARN apache.kafka.clients.consumer.ConsumerConfig: The configuration 'zookeeper.connect' was supplied but isn't a known config.
20/06/30 11:01:55 INFO apache.kafka.common.utils.AppInfoParser: Kafka version : 2.0.1
20/06/30 11:01:55 INFO apache.kafka.common.utils.AppInfoParser: Kafka commitId : fa14705e51bd2ce5
20/06/30 11:02:03 ERROR mba.rec.commons.repository.MbaRtFetchRepository: get [pshmsg:200629228191226644, pshmsg:200614028163616398, pshmsg:200613028185402716, pshmsg:200629028179557883, pshmsg:200629028184901551] from cb failed!
rx.exceptions.CompositeException: 5 exceptions occurred. 
    at rx.internal.operators.OperatorMerge$MergeSubscriber.reportError(OperatorMerge.java:268)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.checkTerminate(OperatorMerge.java:818)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emitLoop(OperatorMerge.java:579)
    at rx.internal.operators.OperatorMerge$MergeSubscriber.emit(OperatorMerge.java:568)
    at rx.internal.operators.OperatorMerge$InnerSubscriber.onError(OperatorMerge.java:852)
    at rx.observers.SerializedObserver.onError(SerializedObserver.java:152)
    at rx.observers.SerializedSubscriber.onError(SerializedSubscriber.java:78)
    at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onTimeout(OperatorTimeoutBase.java:177)
    at rx.internal.operators.OperatorTimeout$1$1.call(OperatorTimeout.java:41)
    at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
    at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
  ComposedException 1 :
    java.util.concurrent.TimeoutException
        at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onTimeout(OperatorTimeoutBase.java:177)
        at rx.internal.operators.OperatorTimeout$1$1.call(OperatorTimeout.java:41)
        at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
  ComposedException 2 :
    java.util.concurrent.TimeoutException
        at rx.internal.operators.OperatorTimeoutBase$TimeoutSubscriber.onTimeout(OperatorTimeoutBase.java:177)
        at rx.internal.operators.OperatorTimeout$1$1.call(OperatorTimeout.java:41)
        at rx.internal.schedulers.EventLoopsScheduler$EventLoopWorker$2.call(EventLoopsScheduler.java:189)
        at rx.internal.schedulers.ScheduledAction.run(ScheduledAction.java:55)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$201(ScheduledThreadPoolExecutor.java:180)
        at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
添加评论
你的答案

查看kafka相关的其他问题或提一个您自己的问题