kafka消息滞留

小鱼杰杰 发表于: 2019-07-22   最后更新时间: 2019-07-22 17:46:06   2,066 游览

提问说明

kafka队列里面还是几天前的数据,实时数据卡在最后

  1. 单分区
  2. 单消费者
  3. 配置没有修改,都是默认的
发表于 2019-07-22
添加评论

为什么要单分区呢?现在消费速度是度搜好,一秒多少笔。

小鱼杰杰 -> 半兽人 5年前

这边都是单分区,我看了单线程读取kafka消息5到4百多不等,运行两天多,就会出现消息卡死状况,好急呀。前辈能不能扣扣聊下823758653,或者我给您打电话,求救

019-07-30 22:10:48 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto
r$ZKSessionExpireListener@45dab8e1]
org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68)
at kafka.utils.ZKCheckedEphemeral.create(ZkUtils.scala:1139)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:282)
at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:520)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
... 5 more
2019-08-01 16:32:13 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto
r$ZKSessionExpireListener@565d5ced]
kafka.common.ConsumerRebalanceFailedException: TfLinkGroupStatistics-app-1_int02-1564555622382-82b77842 can't rebalance after 10 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
2019-08-01 17:44:43 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto
r$ZKSessionExpireListener@11b58572]
kafka.common.ConsumerRebalanceFailedException: TfLinkGroupStatistics-app-1_int02-1564555622476-3c1ef867 can't rebalance after 10 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
2019-08-05 02:38:16 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto
r$ZKSessionExpireListener@69573f0c]
kafka.common.ConsumerRebalanceFailedException: TfLinkGroupStatistics-app-1_int02-1564728243760-6be43aea can't rebalance after 10 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
2019-08-05 12:12:24 [ERROR]-[kafka.consumer.ConsumerFetcherThread.error(99)] [ConsumerFetcherThread-TfLinkGroupStatistics-app-1_int02-1564728243540-65444ef2-0-1], Curr
ent offset 168242589 for partition [data.original.flux.5min,0] out of range; reset offset to 195077415
2019-08-05 12:12:54 [ERROR]-[kafka.consumer.ConsumerIterator.error(99)] consumed offset: 168242589 doesn't match fetch offset: 195077415 for data.original.flux.5min:0:
fetched offset = 195077523: consumed offset = 168242589;
Consumer may lose data
2019-08-05 19:07:38 [ERROR]-[org.logicalcobwebs.proxool.xml-test.sweep(105)] Prototype
java.sql.SQLException: Io 异常: Connection reset
at oracle.jdbc.dbaccess.DBError.throwSqlException(DBError.java:134)
at oracle.jdbc.dbaccess.DBError.throwSqlException(DBError.java:179)
at oracle.jdbc.dbaccess.DBError.throwSqlException(DBError.java:333)
at oracle.jdbc.driver.OracleConnection.(OracleConnection.java:404)
at oracle.jdbc.driver.OracleDriver.getConnectionInstance(OracleDriver.java:468)
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:314)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.logicalcobwebs.proxool.DefaultConnectionBuilder.buildConnection(DefaultConnectionBuilder.java:39)
at org.logicalcobwebs.proxool.Prototyper.buildConnection(Prototyper.java:159)
at org.logicalcobwebs.proxool.Prototyper.sweep(Prototyper.java:102)
at org.logicalcobwebs.proxool.PrototyperThread.run(PrototyperThread.java:44)
[asiainfo@int02 ai-module-LinkGroupStatistics]$
偶尔报错

半兽人 -> 小鱼杰杰 5年前

zk连接重置,并且offset的拉取到的也不匹配。
1、zk什么版本
2、kafka什么版本
3、你运行了多长时间报的错,压力多少
4、客户端源码贴出来看看

小鱼杰杰 -> 半兽人 5年前

kafka_2.11-0.10.1.0 ,zookeeper-3.4.10。运行了三天,数据量五分钟200万条,由于业务逻辑,只能部署一个进程消费,业务逻辑900的线程池,单线程批量入库。解析完数据入map,每个小时key0,1,2,3,4,5,6,7,8,9,10,11.数据跑不完会被覆盖。

不知道卡在哪里了。为啥跑了几天卡住了

小鱼杰杰 -> 半兽人 5年前

public void init() //consumer需要连接zookper
{//https://blog.csdn.net/u010463032/article/details/78892858
Properties props = new Properties();
props.put("zookeeper.connect", zkConfig);//指定了zookpeer的connect ,以hostname:port形式,就是zookeeper各节点的hostname和port,为防止某个挂掉,可以指定多个connect string
props.put("group.id", group);//指定了consumer的group名字,group名一样的进程属于同一个consumer group
props.put("consumer.numbers", consumers);
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "60000");
props.put("derializer.class", "kafka.serializer.DefaultDecoder");
//新增参数
props.put("zookeeper.session.timeout.ms", "70000");//socket请求超时时间,默认值是30*1000
props.put("rebalance.backoff.ms", "20000");
props.put("rebalance.max.retries", "10");
props.put("zookeeper.connection.timeout.ms", "30000");//zookper的session超时时间,没有收到心跳,则认为server挂掉了,设置过低,会被误认为挂了,如果设置过高真的挂了,很长时间才被server得知

    cc = new ConsumerConfig(props);

    topicMap = new ConcurrentHashMap<String, BlockingQueue<String>>();
    topicConsumers = new ConcurrentHashMap<String, ConsumerConnector>();
}

/**

 * 启动mq消息读取线程  
 * @param topic    消息主题
 * @param zipFlag    消息压缩标志
 * @return    消息队列  
 */
private synchronized BlockingQueue<String> initReadMsg(String topic,boolean zipFlag)
{
    //默认消息队列500
    BlockingQueue<String> msgQ = new ArrayBlockingQueue<String>(500);
    //
    if (null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired) 
    {
        topicMap.put(topic, msgQ);
        try 
        {
            // Consumer
             consumer = Consumer.createJavaConsumerConnector(cc);
            logger.info("consumer...createJavaConsumerConnector topic={}",topic);
            topicConsumers.put(topic, consumer);//一个主题放一个消费者,将主题放入消费者
        } 
        catch (Exception e) 
        {
            logger.error("initReadMsg err.......",e);
        }
        Thread readMsg = new Thread(new ReadMsg(topic, msgQ,zipFlag), "["+topic+"]KafkaStream Reader");
        readMsg.start();
    }
    return msgQ;
}
半兽人 -> 小鱼杰杰 5年前

我先读读你的代码。

半兽人 -> 小鱼杰杰 5年前

ReadMsg类 贴一下

小鱼杰杰 -> 半兽人 5年前

package com.ai.linkgroup.statistics.mq;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;

import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.ai.linkgroup.statistics.util.ZipUtils;
import com.common.system.mq.serializer.StringSerializer;

import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;

/**

  • MQ接口接收数据
  • @author blueskyevil->70942 2017年9月21日
    /
    public class KafkaReceiver implements Watcher
    {
    private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
    private ConsumerConnector consumer=null;
    // param
    private String zkConfig;
    private String group;
    private int consumers;
    private KeeperState stateValue;
    private ConsumerConfig cc;
    // 主题对应的消息队列
    private Map> topicMap;
    // 主题对应的消息队列
    private Map topicConsumers;

    public void init() //consumer需要连接zookper
    {//https://blog.csdn.net/u010463032/article/details/78892858

     Properties props = new Properties();
     props.put("zookeeper.connect", zkConfig);//指定了zookpeer的connect ,以hostname:port形式,就是zookeeper各节点的hostname和port,为防止某个挂掉,可以指定多个connect string
     props.put("group.id", group);//指定了consumer的group名字,group名一样的进程属于同一个consumer group
     props.put("consumer.numbers", consumers);
     props.put("auto.commit.enable", "true");
     props.put("auto.commit.interval.ms", "60000");
     props.put("derializer.class", "kafka.serializer.DefaultDecoder");
     //新增参数  
     props.put("zookeeper.session.timeout.ms", "70000");//socket请求超时时间,默认值是30*1000
     props.put("rebalance.backoff.ms", "20000");
     props.put("rebalance.max.retries", "10");
     props.put("zookeeper.connection.timeout.ms", "30000");//zookper的session超时时间,没有收到心跳,则认为server挂掉了,设置过低,会被误认为挂了,如果设置过高真的挂了,很长时间才被server得知
    
     cc = new ConsumerConfig(props);
    
     topicMap = new ConcurrentHashMap<String, BlockingQueue<String>>();
     topicConsumers = new ConcurrentHashMap<String, ConsumerConnector>();
    

    }

    /**

    • mq消息读取 消息被压缩 队列大小为10
    • @param topic 消息主题
    • @return 消息队列
      */
      public BlockingQueue receiveMsg(String topic)
      {
      if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
      {

       return initReadMsg(topic,true);
      

      }
      else
      {

       return topicMap.get(topic);
      

      }
      }

      /**

    • mq消息读取 消息未压缩 队列大小为10
    • @param topic 消息主题
    • @return 消息队列
      */
      public BlockingQueue receiveMsgUnzip(String topic)
      {
      //add by peiyj 防止意外关闭客户端
      if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
      {

       return initReadMsg(topic,false);
      

      }
      else
      {

       return topicMap.get(topic);
      

      }
      }

      /**

    • 启动mq消息读取线程
    • @param topic 消息主题
    • @param zipFlag 消息压缩标志
    • @return 消息队列
      */
      private synchronized BlockingQueue initReadMsg(String topic,boolean zipFlag)
      {
      //默认消息队列500
      BlockingQueue msgQ = new ArrayBlockingQueue(500);
      //
      if (null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
      {

       topicMap.put(topic, msgQ);
       try 
       {
           // Consumer
            consumer = Consumer.createJavaConsumerConnector(cc);
           logger.info("consumer...createJavaConsumerConnector topic={}",topic);
           topicConsumers.put(topic, consumer);//一个主题放一个消费者,将主题放入消费者
       } 
       catch (Exception e) 
       {
           logger.error("initReadMsg err.......",e);
       }
       Thread readMsg = new Thread(new ReadMsg(topic, msgQ,zipFlag), "["+topic+"]KafkaStream Reader");
       readMsg.start();
      

      }
      return msgQ;
      }

      /**

    • 关闭消费者
    • @param topic
      */
      public void shutDownConsumer(String topic)
      {
      try
      {

       // Consumer
       topicConsumers.get(topic).shutdown();
      
       logger.info("consumer...shutdown topic={}",topic);
      
       topicConsumers.remove(topic);
      
       topicMap.remove(topic);
      

      }
      catch (Exception e)
      {

       logger.error("shutDownConsumer err......",e);
      

      }
      }

      /**

    • 读取mq消息线程
      */
      private class ReadMsg implements Runnable
      {
      private String topic;
      private boolean zipFlag;
      private BlockingQueue msgQ;

      public ReadMsg(String topic,BlockingQueue msgQ,boolean zipFlag)
      {

       this.topic = topic;
       this.msgQ = msgQ;
       this.zipFlag=zipFlag;
      

      }

      public void run()
      {

       Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
       // value表示consumer thread线程数量
       topicCountMap.put(topic, new Integer(consumers));
       while (true) 
       {
           try 
           {    
               if(null==topicMap.get(topic))
               {
                   break;
               }
      
               Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = topicConsumers.get(topic).createMessageStreams(topicCountMap);
      
               logger.info("consumer...createMessageStreams topic={}",topic);
      
               for (KafkaStream<byte[], byte[]> m_stream : consumerMap.get(topic)) 
               {
                   ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
                   while (it.hasNext()) 
                   {
                       //MQ offset 移动
                       MessageAndMetadata<byte[], byte[]> mm = it.next();
                       String msg = String.valueOf(new StringSerializer().<String>deserialize(mm.message()));
                       if(zipFlag)
                       {
                           msg =  ZipUtils.unzip(msg);
                           logger.debug("control receive topic={},msg={}",topic,msg);
                           msgQ.put(msg);
                       }
                       else
                       {
                           logger.debug("topic={},msg={}",topic,msg);
                           msgQ.put(msg);
                       }
                   }
               }
           } 
           catch (Exception e)
           {
               e.printStackTrace();
               logger.error("KafkaConsumer Reader Exception", e);
           }
      
           try 
           {
               Thread.sleep(2000);
           } 
           catch (InterruptedException e) 
           {
               logger.error("ReadMsg sleep InterruptedException......",e);
           }
       }
      

      }
      }

      public void setZkConfig(String zkConfig)
      {
      this.zkConfig = zkConfig;
      }

      public void setGroup(String group)
      {
      this.group = group;
      }

      public void setConsumers(int consumers)
      {
      this.consumers = consumers;
      }

      @Override
      public void process(WatchedEvent event) {

        stateValue=event.getState();



}

}

半兽人 -> 小鱼杰杰 5年前

你咋用的流的方式写消费者呢?

你的答案

查看kafka相关的其他问题或提一个您自己的问题