目前用 kafka stream 写了程序上线之后运行一段时间之后
kafka 表现的不正常
broker server.log 出现各种WARN
环境
- jdk1.8
- kafka 1.0.0
- kafka-stream 1.0.0
问题一
WARN [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=1] Error in fetch to broker 1, request (type=FetchRequest, replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={crash-2=(offset=1, logStartOffset=0, maxBytes=1048576), stream-3=(offset=47620189, logStartOffset=43978264, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read
问题二
WARN Attempting to send response via channel for which there is no open connection, connection id 192.168.2.181:9092-192.168.2.183:54288-33 (kafka.network.Processor)
问题三
Member filter-StreamThread-4-consumer-457afa49-2c58-4c25-84bb-d2f3ac4c1ad8 in group filter has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)
consumer 端抛出
问题四
2018-04-11 00:18:14|filter-StreamThread-3|ERROR|org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.error.301|[Consumer clientId=filter-StreamThread-3-consumer, groupId=filter] Offset commit failed on partition sample-2 at offset 67919084: The coordinator is not aware of this member.
各节点防火墙没开
配置如下
broker.id=3
listeners=PLAINTEXT://node3:9092
advertised.listeners=PLAINTEXT://node3:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-log-1,/data/kafka/kafka-log-2
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=240
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
message.max.bytes=1000000
log.roll.hours=240
log.flush.scheduler.interval.ms=2000
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
zookeeper.connect=node1:2182,node2:2182,node3:2182
zookeeper.connection.timeout.ms=6000
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
default.replication.factor=3
group.initial.rebalance.delay.ms=0
除了
broker.id
listener
advertised.listener
其他配置一样,可以正常生产和消费。
但是长时间的测试下broker端会表现的不正常。
以每秒200条的速度发,有时候测试1000W数据没有丢失数据。但是一旦broker端表现不正常的话,数据会有丢失,虽然在producer端是异步发送, catch异常,ack设置是-1,retries 0,有时候producer端会抛出:
this server is not the leader for that topic-partition
这些异常一般通过加大客户端的超时时间,就可解决。
props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG,60000); props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,60000); props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG,60000); props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG,70000);
加入超时设置之后,在consumer或者producer端偶尔会产生
这个错误真是太奇怪了,这是集群不正常的表现。你看下集群状态是否正常。
## 查询集群描述 bin/kafka-topics.sh --describe --zookeeper
链接:https://www.orchome.com/454
正常的
我也遇到到了问题3, 感觉和数据量突然增大有关系,又好像和机器的CPU有关系。每次我要重启所有kafka消费者服务
我的Kafka Stream也碰到了问题二和问题三,使用的是Kafka 1.0版本
我是同步发送成功后,提交offset
警告,系统可以自行恢复。先调大超时时间,继续观察。
client端时间,还是broker时间,请问一下哪几个
client.
你的答案