kafka stream 在实际生产中的问题

目前用kafka stream 写了程序上线之后运行一段时间之后
kafka 表现的不正常
broker server.log 出现各种WARN

问题一

 WARN [ReplicaFetcher replicaId=3, leaderId=1, fetcherId=1] Error in fetch to broker 1, request (type=FetchRequest, replicaId=3, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={crash-2=(offset=1, logStartOffset=0, maxBytes=1048576), stream-3=(offset=47620189, logStartOffset=43978264, maxBytes=1048576)}, isolationLevel=READ_UNCOMMITTED) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 1 was disconnected before the response was read

问题二

WARN Attempting to send response via channel for which there is no open connection, connection id 192.168.2.181:9092-192.168.2.183:54288-33 (kafka.network.Processor)

问题三

Member filter-StreamThread-4-consumer-457afa49-2c58-4c25-84bb-d2f3ac4c1ad8 in group filter has failed, removing it from the group (kafka.coordinator.group.GroupCoordinator)

consumer 端抛出
问题四


2018-04-11 00:18:14|filter-StreamThread-3|ERROR|org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.error.301|[Consumer clientId=filter-StreamThread-3-consumer, groupId=filter] Offset commit failed on partition sample-2 at offset 67919084: The coordinator is not aware of this member.

各节点防火墙没开
配置如下

broker.id=3
listeners=PLAINTEXT://node3:9092
advertised.listeners=PLAINTEXT://node3:9092
num.network.threads=8
num.io.threads=8
socket.send.buffer.bytes=102400
socket.receive.buffer.bytes=102400
socket.request.max.bytes=104857600
log.dirs=/data/kafka/kafka-log-1,/data/kafka/kafka-log-2
num.partitions=8
num.recovery.threads.per.data.dir=1
offsets.topic.replication.factor=1
transaction.state.log.replication.factor=1
transaction.state.log.min.isr=1
log.flush.interval.messages=10000
log.flush.interval.ms=1000
log.retention.hours=240
log.segment.bytes=1073741824
log.retention.check.interval.ms=300000
message.max.bytes=1000000
log.roll.hours=240
log.flush.scheduler.interval.ms=2000
auto.create.topics.enable=true
auto.leader.rebalance.enable=true
zookeeper.connect=node1:2182,node2:2182,node3:2182
zookeeper.connection.timeout.ms=6000
num.replica.fetchers=4
replica.fetch.max.bytes=1048576
replica.fetch.wait.max.ms=500
default.replication.factor=3
group.initial.rebalance.delay.ms=0

除了broker.id
listener
advertised.listener
其他配置一样
可以正常生产和消费。
但是长时间的测试下broker 端会表现的不正常






发表于: 8月前   最后更新时间: 8月前   游览量:2626
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 我的Kafka Stream也碰到了问题二和问题三,使用的是Kafka 1.0版本
    1、先检查对应节点是否有异常日志。
    2、我看很多是连接端口等,先加大客户端的超时时间。
            props.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, 60000);
            props.put(ConsumerConfig.FETCH_MAX_WAIT_MS_CONFIG, 60000);
            props.put(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG, 70000);
    • 加入超时设置之后
      在consumer 或者producer 端偶尔会产生
       org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition..
        • 这个错误真是太奇怪了,这是集群不正常的表现。你看下集群状态是否正常。

          ## 查询集群描述
          bin/kafka-topics.sh --describe --zookeeper
          链接:http://orchome.com/454
            环境
            jdk1.8
            kafka 1.0.0
            kafka-stream 1.0.0
            • 以每秒200 条的速度发,有时候测试1000W数据没有丢失数据。但是一旦broker 端表现不正常的话,数据会有丢失,虽然在producer 端是异步发送,.catch 异常,ack设置是-1 retries 0 ,有时候producer 端会抛出
              this server is not the leader for that topic-partition
              • 评论…
                • in this conversation