kafka关于顺序消费和分区的共存问题 + broker 被kill掉后,发送/消费异常?

√锋²º¹8ヾ/❤ 发表于: 2018-08-24   最后更新时间: 2018-08-24 14:30:55   3,715 游览

顺序消费和分区的共存问题:

假设有一个公共的参数设置系统,通过它进行参数配置,然后发送到kafka。各个子系统订阅相关的主题,获取最新的参数。

这种场景,顺序消费是确保参数正确性。所以我的理解是,只能将该主题设置一个分区,这样参数就全部写在这里了。正常来讲应该能最大限度的按修改的先后顺序被生产者发送到这里,然后消费者也能顺序的消费。这样才能保证最后一次的修改在子系统中体现出来。

如果有多个分区,能减缓发送的压力。但是一个消费者只能按分区消费,不能跨分区。如果修改记录被均匀的分散在各个分区,那么最终会导致消费者获取的消息并不一定是最新的。

broker 被kill掉后,发送/消费异常

我配置了 bootstrap.servers=192.168.100.238:9092,192.168.100.238:9093,192.168.100.238:9094

同样,在kafka服务器查看:

[root@localhost kafka_2.12-0.11.0.3-1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181  --topic QQQQQQ_topic7
Topic:QQQQQQ_topic7     PartitionCount:1        ReplicationFactor:3     Configs:
    Topic: QQQQQQ_topic7    Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 2,0,1

然后我将 broker0的进程kill掉了。再看:

[root@localhost kafka_2.12-0.11.0.3-1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181  --topic QQQQQQ_topic7
Topic:QQQQQQ_topic7     PartitionCount:1        ReplicationFactor:3     Configs:
    Topic: QQQQQQ_topic7    Partition: 0    Leader: 1       Replicas: 0,1,2 Isr: 2,1

这个时候,Leader: 1 , Isr: 2,1

程序中配置的bootstrap.servers 没有变化。

此时出现的情况是:生产者能发送消息,但是间歇性出现

java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:712)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:370)
at org.apache.kafka.common.network.Selector.poll(Selector.java:334)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:745)

而消费者就一直在刷刷。。。。并不能正常消费消息。

问题:Leader 不是已经重新选举了吗?是它内部又分配到了 broker0?

请指导。

发表于 2018-08-24

持续的吗,消费者?
我看你已经切换成功了。

你的客户端的版本和生产一一对应吗?

项目依赖:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka2.12</artifactId>
    <version>0.11.0.3</version>
</dependency>

启动日志:

2018-08-24 11:00:07,337[INFO ]-[main]-[AppInfoParser]: Kafka version : 0.11.0.3

服务器上:

[root@localhost kafka_2.12-0.11.0.3]# find ./libs/ -name kafka | head -1 | grep -o '\kafka[^\n]*'

kafka_2.12-0.11.0.3.jar

========================================================================

程序配置了

bootstrap.servers=192.168.100.238:9092,192.168.100.238:9093,192.168.100.238:9094

假如我将9092 kill掉之后,它在此时正好是Leader,我需要将bootstrap.servers调整为

192.168.100.238:9093,192.168.100.238:9094

吗?--这想法会不会很无厘头。

像我这样测试,其实在生产也会存在,因为并不能预知哪个broker会偶尔崩溃或者停止。

另外,非常感谢您的回复。

手动调整bootstrap.servers太无厘头了。
打连接被拒是正常的,但是如果主备切换后,如果不恢复,才是问题。

那像我现在的情况,跟版本有关联吗?
有没有什么排查方法,指导下的。

你的客户端是用官网的吗?没有什么其他改动吧。

是官网的。
请问您使用的是什么版本?看下是否跟版本有关
我整理一下我最近给你留的问题:
1.主备切换后,不恢复。
2.实现了 Partitioner (是不是能控制消息落在同一个分区?),我的业务是通过canal监听mysql的数据变化,然后调用kafka的生产者发送消息。所以需要保证顺序问题。
方便的话,请留个邮箱,我把代码给你看看

kafka是2.11,2.10已经遇到很多bug了,另外你先抛弃掉你的代码逻辑,在本地搭建一套新的kafka集群,用没有改动过的代码,然后进行测试。
loolc@qq.com

2.0.0
Released July 30, 2018
Release Notes
Source download: kafka-2.0.0-src.tgz (asc, sha512)
Binary downloads:
Scala 2.11  - kafka_2.11-2.0.0.tgz (asc, sha512)
Scala 2.12  - kafka_2.12-2.0.0.tgz (asc, sha512)

您说的2.11 指的是哪个版本?上面是官网最新的

我刚按照你的版本试了下,kill掉一个节点,一切正常。
https://www.orchome.com/6

你执行下这个命令:

bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic __consumer_offsets

这个是存储offset的主题,如果副本只有1个,那么就会导致集群挂掉一个,你消费者再也不消费的情况。把该主题增加副本就行了。

https://www.orchome.com/454 命令大全里面,执行增加副本。

今天发现个问题:
测试100000个线程发送,同时开启kafka客户端监听消息接收。
发现main方法跑完了,客户端才消费到七八千。是消息还没真正发出去?还是消费处理慢?

老哥,咱们问题一个一个来呀。。。
你在发送完成后,休眠一下,或者加上producer.close();

不好意思,有点操之过急了。副本的处理我准备实践一下。

压力测试,一下子就爆了。是我的main方法写得有问题,我是循环10000次,然后每次 new Runnable。用线程池情况有变化。

另外是消费者poll的时候,仔细观察过了日志输出,基本上每提取个五百条数据就会停顿一下,这样子,一万条数据就要慢上20秒。

代码如下:

public static void main(String[] args) throws IOException {
        Properties properties = new Properties();
        InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
        properties.load(in);
        Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        Collection<String> topics = Arrays.asList("QQQQQQ_topic7");
        // 消费者订阅topic
        consumer.subscribe(topics);
        ConsumerRecords<String, String> consumerRecords = null;
        while (true) {
            // 接下来就要从topic中拉取数据
            consumerRecords = consumer.poll(10);
            // 遍历每一条记录
            for (ConsumerRecord consumerRecord : consumerRecords) {
                long offset = consumerRecord.offset();
                int partition = consumerRecord.partition();
                Object key = consumerRecord.key();
                Object value = consumerRecord.value();
                System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value);
            }

        }
    }

第一个问题解决了吗?kill掉一台的。
新的问题可以另开一个问答额。

问题我整理了发您邮箱了。

你的答案

查看kafka相关的其他问题或提一个您自己的问题