kafka关于顺序消费和分区的共存问题 + broker 被kill掉后,发送/消费异常?

顺序消费和分区的共存问题:

假设有一个公共的参数设置系统,通过它进行参数配置,然后发送到kafka。各个子系统订阅相关的主题,获取最新的参数。

这种场景,顺序消费是确保参数正确性。所以我的理解是,只能将该主题设置一个分区,这样参数就全部写在这里了。正常来讲应该能最大限度的按修改的先后顺序被生产者发送到这里,然后消费者也能顺序的消费。这样才能保证最后一次的修改在子系统中体现出来。

如果有多个分区,能减缓发送的压力。但是一个消费者只能按分区消费,不能跨分区。如果修改记录被均匀的分散在各个分区,那么最终会导致消费者获取的消息并不一定是最新的。

broker 被kill掉后,发送/消费异常

我配置了 bootstrap.servers=192.168.100.238:9092,192.168.100.238:9093,192.168.100.238:9094

同样,在kafka服务器查看:

[root@localhost kafka_2.12-0.11.0.3-1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181  --topic QQQQQQ_topic7
Topic:QQQQQQ_topic7     PartitionCount:1        ReplicationFactor:3     Configs:
    Topic: QQQQQQ_topic7    Partition: 0    Leader: 0       Replicas: 0,1,2 Isr: 2,0,1

然后我将 broker0的进程kill掉了。再看:

[root@localhost kafka_2.12-0.11.0.3-1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181  --topic QQQQQQ_topic7
Topic:QQQQQQ_topic7     PartitionCount:1        ReplicationFactor:3     Configs:
    Topic: QQQQQQ_topic7    Partition: 0    Leader: 1       Replicas: 0,1,2 Isr: 2,1

这个时候,Leader: 1 , Isr: 2,1

程序中配置的bootstrap.servers 没有变化。

此时出现的情况是:生产者能发送消息,但是间歇性出现

java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:712)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:370)
at org.apache.kafka.common.network.Selector.poll(Selector.java:334)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:745)

而消费者就一直在刷刷。。。。并不能正常消费消息。

问题:Leader 不是已经重新选举了吗?是它内部又分配到了 broker0?

请指导。







发表于: 29天前   最后更新时间: 29天前   游览量:193
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 你的客户端的版本和生产一一对应吗?
    • 项目依赖:
      <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka_2.12</artifactId>
            <version>0.11.0.3</version>
        </dependency>
      启动日志:
      2018-08-24 11:00:07,337[INFO ]-[main]-[AppInfoParser]: Kafka version : 0.11.0.3

      服务器上:
      [root@localhost kafka_2.12-0.11.0.3]# find ./libs/ -name \*kafka_\* | head -1 | grep -o '\kafka[^\n]*'
      kafka_2.12-0.11.0.3.jar
      ========================================================================
      程序配置了 bootstrap.servers=192.168.100.238:9092,192.168.100.238:9093,192.168.100.238:9094
      假如我将9092 kill掉之后,它在此时正好是Leader ,我需要将bootstrap.servers调整为192.168.100.238:9093,192.168.100.238:9094吗?--这想法会不会很无厘头。

      像我这样测试,其实在生产也会存在,因为并不能预知哪个broker会偶尔崩溃或者停止。

      另外,非常感谢您的回复。
        • 是官网的。
          请问您使用的是什么版本?看下是否跟版本有关
          我整理一下我最近给你留的问题:
          1.主备切换后,不恢复。
          2.实现了 Partitioner (是不是能控制消息落在同一个分区?),我的业务是通过canal监听mysql的数据变化,然后调用kafka的生产者发送消息。所以需要保证顺序问题。
          方便的话,请留个邮箱,我把代码给你看看
            • 2.0.0
              Released July 30, 2018
              Release Notes
              Source download: kafka-2.0.0-src.tgz (asc, sha512)
              Binary downloads:
              Scala 2.11  - kafka_2.11-2.0.0.tgz (asc, sha512)
              Scala 2.12  - kafka_2.12-2.0.0.tgz (asc, sha512)


              您说的2.11 指的是哪个版本?上面是官网最新的
                • 你执行下这个命令:
                  bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic __consumer_offsets

                  这个是存储offset的主题,如果副本只有1个,那么就会导致集群挂掉一个,你消费者再也不消费的情况。把该主题增加副本就行了。
                  http://orchome.com/454 命令大全里面,执行增加副本。

                    • 今天发现个问题:
                      测试100000个线程发送,同时开启kafka客户端监听消息接收。
                      发现main方法跑完了,客户端才消费到七八千。是消息还没真正发出去?还是消费处理慢?
                        • 不好意思,有点操之过急了。副本的处理我准备实践一下。

                          压力测试,一下子就爆了。是我的main方法写得有问题,我是循环10000次,然后每次 new Runnable。用线程池情况有变化。

                          另外是消费者poll的时候,仔细观察过了日志输出,基本上每提取个五百条数据就会停顿一下,这样子,一万条数据就要慢上20秒。
                          代码如下:
                          public static void main(String[] args) throws IOException {
                                  Properties properties = new Properties();
                                  InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties");
                                  properties.load(in);
                                  Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
                                  Collection<String> topics = Arrays.asList("QQQQQQ_topic7");
                                  // 消费者订阅topic
                                  consumer.subscribe(topics);
                                  ConsumerRecords<String, String> consumerRecords = null;
                                  while (true) {
                                      // 接下来就要从topic中拉取数据
                                      consumerRecords = consumer.poll(10);
                                      // 遍历每一条记录
                                      for (ConsumerRecord consumerRecord : consumerRecords) {
                                          long offset = consumerRecord.offset();
                                          int partition = consumerRecord.partition();
                                          Object key = consumerRecord.key();
                                          Object value = consumerRecord.value();
                                          System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value);
                                      }

                                  }
                              }

                            持续的吗,消费者?
                            我看你已经切换成功了。
                          • 评论…
                            • in this conversation