kafka stream 在实际生产中的问题

funky 发表于: 2018-04-11   最后更新时间: 2021-09-24 14:32:12   14,277 游览

目前用 kafka stream 写了程序上线之后运行一段时间之后

kafka 表现的不正常

broker server.log 出现各种WARN

环境

  • jdk1.8
  • kafka 1.0.0
  • kafka-stream 1.0.0

问题一

WARN [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=1] Error in fetch to broker 1, request (type=FetchRequest, replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={crash-2=(offset=1, logStartOffset=0, maxBytes=1048576), stream-3=(offset=47620189, logStartOffset=43978264, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read

问题二

WARN Attempting to send response via channel for which there is no open connection, connection id 192.168.2.181:9092-192.168.2.183:54288-33 (kafka.network.Processor)

问题三

Member filter-StreamThread-4-consumer-457afa49-2c58-4c25-84bb-d2f3ac4c1ad8 in group filter has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

consumer 端抛出

问题四

2018-04-11 00:18:14|filter-StreamThread-3|ERROR|org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.error.301|[Consumer clientId=filter-StreamThread-3-consumer, groupId=filter] Offset commit failed on partition sample-2 at offset 67919084: The coordinator is not aware of this member.

各节点防火墙没开

配置如下

broker.id=3
listeners=PLAINTEXT://node3:9092
advertised.listeners=PLAINTEXT://node3:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-log-1,/data/kafka/kafka-log-2
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=240
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
message.max.bytes=1000000
log.roll.hours=240
log.flush.scheduler.interval.ms=2000
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
zookeeper.connect=node1:2182,node2:2182,node3:2182
zookeeper.connection.timeout.ms=6000
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
default.replication.factor=3
group.initial.rebalance.delay.ms=0

除了

broker.id
listener
advertised.listener

其他配置一样,可以正常生产和消费。
但是长时间的测试下broker端会表现的不正常。

以每秒200条的速度发,有时候测试1000W数据没有丢失数据。但是一旦broker端表现不正常的话,数据会有丢失,虽然在producer端是异步发送, catch异常,ack设置是-1,retries 0,有时候producer端会抛出:

this server is not the leader for that topic-partition

发表于 2018-04-11
添加评论

这些异常一般通过加大客户端的超时时间,就可解决。

props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,60000);
props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000);
props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,60000);
props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,70000);
funky -> 半兽人 6年前

加入超时设置之后,在consumer或者producer端偶尔会产生

org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition..

半兽人 -> funky 6年前

这个错误真是太奇怪了,这是集群不正常的表现。你看下集群状态是否正常。

## 查询集群描述
bin/kafka-topics.sh --describe --zookeeper

链接:https://www.orchome.com/454

funky -> 半兽人 6年前

正常的

我也遇到到了问题3, 感觉和数据量突然增大有关系,又好像和机器的CPU有关系。每次我要重启所有kafka消费者服务

我的Kafka Stream也碰到了问题二和问题三,使用的是Kafka 1.0版本

我是同步发送成功后,提交offset

半兽人 -> 风VS蓝天 6年前

警告,系统可以自行恢复。先调大超时时间,继续观察。

风VS蓝天 -> 半兽人 6年前

client端时间,还是broker时间,请问一下哪几个

你的答案

查看kafka相关的其他问题或提一个您自己的问题