kafka 节点宕机后 客户端生产或者消费报错,重启客户端线程又可以继续消费

上次按照大神要求,修正了__consumer_offsets副本数,集群部分节点宕机可正常进行生产或者消费,但是又遇到新问题 java 客户端生产或者消费数据遇到各别节点宕机遇到以下问题

 java.net.ConnectException: Connection refused: no further information
    at sun.nio.ch.SocketChannelImpl.checkConnect(Native Method)
    at sun.nio.ch.SocketChannelImpl.finishConnect(SocketChannelImpl.java:717)
    at org.apache.kafka.common.network.PlaintextTransportLayer.finishConnect(PlaintextTransportLayer.java:50)
    at org.apache.kafka.common.network.KafkaChannel.finishConnect(KafkaChannel.java:152)
    at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:471)
    at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
    at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:161)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:243)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314)
    at org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115)
    at kafka.KafkaSource.run(KafkaSource.java:83)
    at kafka.KafkaSource.main(KafkaSource.java:146)
14:10:31,368 DEBUG NetworkClient:804 - [Consumer clientId=consumer-1, groupId=test_3] Node 0 disconnected.
14:10:31,368  WARN NetworkClient:671 - [Consumer clientId=consumer-1, groupId=test_3] Connection to node 0 could not be established. Broker may not be available.
14:10:31,369 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,419 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,469 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,519 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,569 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available
14:10:31,619 DEBUG NetworkClient:916 - [Consumer clientId=consumer-1, groupId=test_3] Give up sending metadata request since no node is available





发表于: 13天前   最后更新时间: 13天前   游览量:183
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 具体操作流程我描述下,麻烦大神帮我解答下:
    我启动三个节点kafka broker 0 1 2 ,开启生产以及消费者java客户端进程模拟生产消费操作做容错性测试,第一步kill broker 0,客户端能正常生产消费;第二步进一步kill broker 1 ,客户端能正常生产消费;第三步 重启broker 0 并kill broker 2 ,客户端就不能正常生产和消费了,报上述错误。但是我重启客户端生产和消费的进程,又可以正常生产和消费了。
    你把连接都配置上了吗?bootsrap.list
    • 生产端:
      public static void send(){
        Properties props = new Properties();
        props.put("bootstrap.servers", "192.168.1.84:9092,192.168.1.85:9092,192.168.1.86:9092");
        props.put("acks", "all");
        props.put("retries", 0);
        props.put("batch.size", 16384);
        props.put("linger.ms", 1);
        props.put("buffer.memory", 33554432);
        props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
        Producer<String, String> producer = new KafkaProducer<>(props);
        int i =0;
           while(true){
        producer.send(new ProducerRecord<String, String>("spdb-cal", "上海"+i));
               i++;

           }
      }
        • 消费端:
           public static void main(String[] args) {
            
            Properties props = new Properties();
                props.put("bootstrap.servers","192.168.1.84:9092,192.168.1.85:9092,192.168.1.86:9092");
                props.put("group.id","test");
                props.put("enable.auto.commit","false");
                props.put("auto.commit.interval.ms","1000"); 
                props.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
                props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
                KafkaConsumer <String,String> consumer = new KafkaConsumer <>(props);
                consumer.subscribe(Arrays.asList("spdb-cal"));
                while(true){
                 ConsumerRecords <String,String> records = consumer.poll(100);
                    for(ConsumerRecord <String,String> record:records){
               String msg= record.value();
               Test.printFile(msg+"\r\n");

                    }
                }
           }
          • 评论…
            • in this conversation