顺序消费和分区的共存问题:
假设有一个公共的参数设置系统,通过它进行参数配置,然后发送到kafka。各个子系统订阅相关的主题,获取最新的参数。
这种场景,顺序消费是确保参数正确性。所以我的理解是,只能将该主题设置一个分区,这样参数就全部写在这里了。正常来讲应该能最大限度的按修改的先后顺序被生产者发送到这里,然后消费者也能顺序的消费。这样才能保证最后一次的修改在子系统中体现出来。
如果有多个分区,能减缓发送的压力。但是一个消费者只能按分区消费,不能跨分区。如果修改记录被均匀的分散在各个分区,那么最终会导致消费者获取的消息并不一定是最新的。
broker 被kill掉后,发送/消费异常
我配置了 bootstrap.servers=192.168.100.238:9092,192.168.100.238:9093,192.168.100.238:9094
同样,在kafka服务器查看:
[root@localhost kafka_2.12-0.11.0.3-1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic QQQQQQ_topic7
Topic:QQQQQQ_topic7 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: QQQQQQ_topic7 Partition: 0 Leader: 0 Replicas: 0,1,2 Isr: 2,0,1
然后我将 broker0的进程kill掉了。再看:
[root@localhost kafka_2.12-0.11.0.3-1]# bin/kafka-topics.sh --describe --zookeeper 127.0.0.1:2181 --topic QQQQQQ_topic7
Topic:QQQQQQ_topic7 PartitionCount:1 ReplicationFactor:3 Configs:
Topic: QQQQQQ_topic7 Partition: 0 Leader: 1 Replicas: 0,1,2 Isr: 2,1
这个时候,Leader: 1 , Isr: 2,1
程序中配置的bootstrap.servers 没有变化。
此时出现的情况是:生产者能发送消息,但是间歇性出现
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:712)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:95)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:370)
at org.apache.kafka.common.network.Selector.poll(Selector.java:334)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:224)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:162)
at java.lang.Thread.run(Thread.java:745)
而消费者就一直在刷刷。。。。并不能正常消费消息。
问题:Leader 不是已经重新选举了吗?是它内部又分配到了 broker0?
请指导。
持续的吗,消费者?
我看你已经切换成功了。
你的客户端的版本和生产一一对应吗?
项目依赖:
<dependency> <groupId>org.apache.kafka</groupId> <artifactId>kafka2.12</artifactId> <version>0.11.0.3</version> </dependency>
启动日志:
2018-08-24 11:00:07,337[INFO ]-[main]-[AppInfoParser]: Kafka version : 0.11.0.3
服务器上:
[root@localhost kafka_2.12-0.11.0.3]# find ./libs/ -name kafka | head -1 | grep -o '\kafka[^\n]*'
kafka_2.12-0.11.0.3.jar
========================================================================
程序配置了
假如我将9092 kill掉之后,它在此时正好是Leader,我需要将bootstrap.servers调整为
吗?--这想法会不会很无厘头。
像我这样测试,其实在生产也会存在,因为并不能预知哪个broker会偶尔崩溃或者停止。
另外,非常感谢您的回复。
手动调整bootstrap.servers太无厘头了。
打连接被拒是正常的,但是如果主备切换后,如果不恢复,才是问题。
那像我现在的情况,跟版本有关联吗?
有没有什么排查方法,指导下的。
你的客户端是用官网的吗?没有什么其他改动吧。
是官网的。
请问您使用的是什么版本?看下是否跟版本有关
我整理一下我最近给你留的问题:
1.主备切换后,不恢复。
2.实现了 Partitioner (是不是能控制消息落在同一个分区?),我的业务是通过canal监听mysql的数据变化,然后调用kafka的生产者发送消息。所以需要保证顺序问题。
方便的话,请留个邮箱,我把代码给你看看
kafka是2.11,2.10已经遇到很多bug了,另外你先抛弃掉你的代码逻辑,在本地搭建一套新的kafka集群,用没有改动过的代码,然后进行测试。
loolc@qq.com
2.0.0 Released July 30, 2018 Release Notes Source download: kafka-2.0.0-src.tgz (asc, sha512) Binary downloads: Scala 2.11 - kafka_2.11-2.0.0.tgz (asc, sha512) Scala 2.12 - kafka_2.12-2.0.0.tgz (asc, sha512)
您说的2.11 指的是哪个版本?上面是官网最新的
我刚按照你的版本试了下,kill掉一个节点,一切正常。
https://www.orchome.com/6
你执行下这个命令:
这个是存储offset的主题,如果副本只有1个,那么就会导致集群挂掉一个,你消费者再也不消费的情况。把该主题增加副本就行了。
https://www.orchome.com/454 命令大全里面,执行增加副本。
今天发现个问题:
测试100000个线程发送,同时开启kafka客户端监听消息接收。
发现main方法跑完了,客户端才消费到七八千。是消息还没真正发出去?还是消费处理慢?
老哥,咱们问题一个一个来呀。。。
你在发送完成后,休眠一下,或者加上producer.close();
不好意思,有点操之过急了。副本的处理我准备实践一下。
压力测试,一下子就爆了。是我的main方法写得有问题,我是循环10000次,然后每次 new Runnable。用线程池情况有变化。
另外是消费者poll的时候,仔细观察过了日志输出,基本上每提取个五百条数据就会停顿一下,这样子,一万条数据就要慢上20秒。
代码如下:
public static void main(String[] args) throws IOException { Properties properties = new Properties(); InputStream in = KafkaConsumerOps.class.getClassLoader().getResourceAsStream("consumer.properties"); properties.load(in); Consumer<String, String> consumer = new KafkaConsumer<String, String>(properties); Collection<String> topics = Arrays.asList("QQQQQQ_topic7"); // 消费者订阅topic consumer.subscribe(topics); ConsumerRecords<String, String> consumerRecords = null; while (true) { // 接下来就要从topic中拉取数据 consumerRecords = consumer.poll(10); // 遍历每一条记录 for (ConsumerRecord consumerRecord : consumerRecords) { long offset = consumerRecord.offset(); int partition = consumerRecord.partition(); Object key = consumerRecord.key(); Object value = consumerRecord.value(); System.out.format("%d\t%d\t%s\t%s\n", offset, partition, key, value); } } }
第一个问题解决了吗?kill掉一台的。
新的问题可以另开一个问答额。
问题我整理了发您邮箱了。
你的答案