返回到文章

采纳

编辑于

kafka stream 在实际生产中的问题

kafka

目前用 kafka stream 写了程序上线之后运行一段时间之后

kafka 表现的不正常

broker server.log 出现各种WARN

环境

  • jdk1.8
  • kafka 1.0.0
  • kafka-stream 1.0.0

问题一

WARN [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=1] Error in fetch to broker 1, request (type=FetchRequest, replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={crash-2=(offset=1, logStartOffset=0, maxBytes=1048576), stream-3=(offset=47620189, logStartOffset=43978264, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read

问题二

WARN Attempting to send response via channel for which there is no open connection, connection id 192.168.2.181:9092-192.168.2.183:54288-33 (kafka.network.Processor)

问题三

Member filter-StreamThread-4-consumer-457afa49-2c58-4c25-84bb-d2f3ac4c1ad8 in group filter has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

consumer 端抛出

问题四

2018-04-11 00:18:14|filter-StreamThread-3|ERROR|org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.error.301|[Consumer clientId=filter-StreamThread-3-consumer, groupId=filter] Offset commit failed on partition sample-2 at offset 67919084: The coordinator is not aware of this member.

各节点防火墙没开

配置如下

broker.id=3
listeners=PLAINTEXT://node3:9092
advertised.listeners=PLAINTEXT://node3:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-log-1,/data/kafka/kafka-log-2
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=240
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
message.max.bytes=1000000
log.roll.hours=240
log.flush.scheduler.interval.ms=2000
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
zookeeper.connect=node1:2182,node2:2182,node3:2182
zookeeper.connection.timeout.ms=6000
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
default.replication.factor=3
group.initial.rebalance.delay.ms=0

除了

broker.id
listener
advertised.listener

其他配置一样,可以正常生产和消费。
但是长时间的测试下broker端会表现的不正常。

以每秒200条的速度发,有时候测试1000W数据没有丢失数据。但是一旦broker端表现不正常的话,数据会有丢失,虽然在producer端是异步发送, catch异常,ack设置是-1,retries 0,有时候producer端会抛出:

this server is not the leader for that topic-partition