kafka官方文档的Producer的bootstrap.servers的解释中这样写到
This list should be in the form host1:port1,host2:port2,.... Since these servers are just used for the initial connection to discover the full cluster membership (which may change dynamically), this list need not contain the full set of servers (you may want more than one, though, in case a server is down). If no server in this list is available sending data will fail until on becomes available.
bootstrap.servers中不需要包含所有的brokers,you may want more than one, though, in case a server is down
应该是说在list中只要有一个可用的就行,producer第一次连接获取metadata时会遍历list中的broker,一个不行会换另一个
但是我测试发现并不会换
我搭建一个集群,b1,b2,b3. producer的bootstrap.servers中写b1,b2,b4(b4不存在), 或者把b1杀掉,bootstrap.servers中写b1,b2,b3(b1已经死了), 总之就是bootstrap.servers中有不可用的broker
启动producer,send的时候有一定的概率会抛出IO Error异常,持续60s,然后Timeout Exception: failed to update metadata in 60000ms.
然后producer就退出了,消息没发出去(丢失)。
for(int i = 0; i < 100; ++i){
pr = new Producer(properties);
pr.send(xx)
pr.close();
}
总会有一些producer选到b1,或者b4,然后就发生错误了。
producer首先从bootstrap servers列表中选一个broker发送metadata请求,根据metadata确定本条消息应该发到哪台机器,然后和那台机器建立连接,把消息发过去。
但是如果选到的 boostrap servers中不可用的b1,或者b4,获取不到metadata,从实验来看并没有换其他的broker尝试,我在源码中也找了,没看到相关的实现。
2018-03-08 15:39:11,779 DEBUG [org.apache.kafka.clients.producer.internals.Sender] - Starting Kafka producer I/O thread.
2018-03-08 15:39:11,854 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:11,855 DEBUG [org.apache.kafka.clients.NetworkClient] - Init connection to node -1 for sending metadata request in the next iteration
2018-03-08 15:39:11,855 DEBUG [org.apache.kafka.clients.NetworkClient] - Initiating connection to node -1 at 10.142.233.55:9092.
2018-03-08 15:39:11,857 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:11,966 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,067 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,171 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,275 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,376 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,482 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,583 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,687 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,788 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,889 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:12,997 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:13,102 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:13,208 DEBUG [org.apache.kafka.clients.NetworkClient] - Trying to send metadata request to node -1
2018-03-08 15:39:13,266 WARN [org.apache.kafka.common.network.Selector] - Error in I/O with /10.142.233.55
java.net.ConnectException: Connection refused
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:744)
at org.apache.kafka.common.network.Selector.poll(Selector.java:238)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:192)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:191)
at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:122)
at java.lang.Thread.run(Thread.java:745)
2018-03-08 15:39:13,266 DEBUG [org.apache.kafka.clients.NetworkClient] - Node -1 disconnected.
持续60s,然后就update metadata TimeoutException了。
kafka版本是 0.8.2.1,试过0.10的版本好像也一样。
方便的话可以在自己的集群上测试下,告诉我结果
或者熟悉源码及相关流程的指点一下?
感激不尽!
实践出真理。注释不一定正确
如果你kill b3,会这样吗?
会,随便 kill哪一个broker, producer都有一定的概率可能会选到那个broker去发metadata请求
你先看下这篇文章,顺便把生产者代码贴一下。
https://www.orchome.com/6
生产者代码
Properties props = new Properties(); props.put("bootstrap.servers", "10.142.233.50:9092,10.142.233.51:9092,10.142.233.52:9092"); props.put("key.serializer.class", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer.class", "org.apache.kafka.common.serialization.StringSerializer"); // KafkaProducer<string, string=""> producer = new KafkaProducer<string, string="">(props); String topic = "cctest"; for(int i = 1; i <= 100; ++i){ String key = "keyyyy"; String value = key + "|msg" + i; KafkaProducer<string, string=""> producer = new KafkaProducer<string, string="">(props); ProducerRecord<string, string=""> record = new ProducerRecord<string, string=""> (topic, key, value); producer.send(record); producer.close(); } }
有的producer能成功发送,有的producer报错失败。 把上面10.142.233.51,51,52个中的任意一台改为10.142.233.55(不存在的机器) 就会出现之前贴的日志中的错误。
把
producer.close();
去掉,后面加个休眠时间试试。不用send来举例了
public class MainApp { public static void main(String[] args) throws IOException { Properties props = new Properties(); props.put("bootstrap.servers", "10.142.233.50:9092,10.142.233.55:9092,10.142.233.56:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); String topic = "cctest"; List<PartitionInfo> pinfo = producer.partitionsFor(topic); System.out.println(pinfo.size()); }
集群是50,51,52三台,bootstrap.servers中只有50是有效的,55,56是不存在的机器。 反复运行上面这段代码,有一定的几率发生错误,证明produce在bootstrap servers中遇到不可用的broker server后并没有换到其他的
public class MainApp { public static void main(String[] args) throws IOException { Properties props = new Properties(); props.put("bootstrap.servers", "10.142.233.50:9092,10.142.233.55:9092,10.142.233.56:9092"); props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer"); props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer"); KafkaProducer<String, String> producer = new KafkaProducer<String, String>(props); String topic = "cctest"; List<PartitionInfo> pinfo = producer.partitionsFor(topic); System.out.println(pinfo.size()); }
集群是50,51,52三台,bootstrap.servers中只有50是有效的,55,56是不存在的机器。 反复运行上面这段代码,有一定的几率发生错误,证明produce在bootstrap servers中遇到不可用的broker server后并没有换到其他的
我觉得你应该在设置为有的集群,发送期间,进行故障转移做测试,因为在启动之初,空节点,kafka是获取不到系统全貌的。
道理是一样的,都是bootstrap中有不可用的。如果我集群就是50,55,56呢,只不过55和56挂掉了(replica有3份)。
因为在启动之初,空节点,kafka是获取不到系统全貌的
启动之初,producer应该随便成功连一个broker就能获取到集群的全貌吧。 先在bootstrap中随便选一个去获取metadata,不行就换一个,直到bootstrap中所有的都不行才报错。 按照文档的说法应该是这样的吧? 但是我在源码中并没有找到相关的流程,实际中kafka也不是这样做的
发送期间,进行故障转移做测试
假设一开始一切正常,producer要发100条消息,发到50条的时候,集群中有个一个broker挂了
这个原有的producer是不会有问题的,剩下的50条消息还能写进去
但是如果这个时候(集群中一个broker挂掉期间)有一个新的producer来发消息,这个新producer可能会出上面的问题
按理来说,集群中broker挂掉期间,新的producer应该还是能正常获取集群信息,写入消息吧。
我想找到答案:kafka是不是按文档中那样做的 “bootstrap中一个不行换另一个去尝试”?
从我的实验以及查阅源码都没有发现它会更换
希望大家自己测试一下,反馈下结果。 或者知道相关流程或源码的大神指点一二。
当然也可能kafka就是这样的。当然我觉得这是一个缺陷
P.S. 之前发邮件到kafka-dev mailing list都没人鸟,无意中发现这个网站,博主还是挺厉害的,希望多交流
哥们,这个问题搞定了吗?把
producer.close();
去掉,后面加个休眠时间试试。这样处理有没有解决问题?你的答案