上次按照大神要求,修正了__consumer_offsets
副本数,集群部分节点宕机可正常进行生产或者消费,但是又遇到新问题 java 客户端生产或者消费数据遇到各别节点宕机遇到以下问题
java.net.ConnectException: Connection refused: no further information
at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:152)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:471)
at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:243)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
at kafka.KafkaSource.run(KafkaSource.java:83)
at kafka.KafkaSource.main(KafkaSource.java:146)
14:10:31,368 DEBUG NetworkClient:804 - [Consumer clientId=consumer-1, groupId=test_3] Node 0 disconnected.
14:10:31,368 WARN NetworkClient:671 - [Consumer clientId=consumer-1, groupId=test_3] Connection to node 0 could not be established. Broker may not be available.
14:10:31,369 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,419 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,469 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,519 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,569 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,619 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
你把连接都配置上了吗?bootsrap.list
都配置了
一直持续 无法恢复消费? 代码贴一下
代码很长,你有邮箱吗
需要把java客户端进程重启才能继续生产或者消费,但是如果生产环境我不可能集群坏了我还要动我的java业务代码吧?
你先用官方的例子 测试一下。
生产端:
public static void send(){
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.84:9092,192.168.1.85:9092,192.168.1.86:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
int i =0;
while(true){
producer.send(new ProducerRecord<String, String>("spdb-cal", "上海"+i));
i++;
}
}
消费端:
public static void main(String[] args) {
Properties props = new Properties();
props.put("bootstrap.servers","192.168.1.84:9092,192.168.1.85:9092,192.168.1.86:9092");
props.put("group.id","test");
props.put("enable.auto.commit","false");
props.put("auto.commit.interval.ms","1000");
props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer <String,String> consumer = new KafkaConsumer <>(props);
consumer.subscribe(Arrays.asList("spdb-cal"));
while(true){
ConsumerRecords <String,String> records = consumer.poll(100);
for(ConsumerRecord <String,String> record:records){
String msg= record.value();
Test.printFile(msg+"\r\n");
}
}
}
按照官方例子写的简单案例,也是出现我上述的错误,需要重启这两块代码才能继续正常执行
生产者一半以上就无法发送了。
大神,什么意思?
请细说下?
https://www.orchome.com/22
看了半天也没看出来能解决我的问题
具体操作流程我描述下,麻烦大神帮我解答下:
我启动三个节点kafka broker 0 1 2 ,开启生产以及消费者java客户端进程模拟生产消费操作做容错性测试,第一步kill broker 0,客户端能正常生产消费;第二步进一步kill broker 1 ,客户端能正常生产消费;第三步 重启broker 0 并kill broker 2 ,客户端就不能正常生产和消费了,报上述错误。但是我重启客户端生产和消费的进程,又可以正常生产和消费了。
你的答案