019-07-30 22:10:48 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto r$ZKSessionExpireListener@45dab8e1] org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68) at kafka.utils.ZKCheckedEphemeral.create(ZkUtils.scala:1139) at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:282) at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:520) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired at org.apache.zookeeper.KeeperException.create(KeeperException.java:127) ... 5 more 2019-08-01 16:32:13 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto r$ZKSessionExpireListener@565d5ced] kafka.common.ConsumerRebalanceFailedException: TfLinkGroupStatistics-app-1_int02-1564555622382-82b77842 can't rebalance after 10 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670) at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) 2019-08-01 17:44:43 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto r$ZKSessionExpireListener@11b58572] kafka.common.ConsumerRebalanceFailedException: TfLinkGroupStatistics-app-1_int02-1564555622476-3c1ef867 can't rebalance after 10 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670) at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) 2019-08-05 02:38:16 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto r$ZKSessionExpireListener@69573f0c] kafka.common.ConsumerRebalanceFailedException: TfLinkGroupStatistics-app-1_int02-1564728243760-6be43aea can't rebalance after 10 retries at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670) at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522) at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735) at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71) 2019-08-05 12:12:24 [ERROR]-[kafka.consumer.ConsumerFetcherThread.error(99)] [ConsumerFetcherThread-TfLinkGroupStatistics-app-1_int02-1564728243540-65444ef2-0-1], Curr ent offset 168242589 for partition [data.original.flux.5min,0] out of range; reset offset to 195077415 2019-08-05 12:12:54 [ERROR]-[kafka.consumer.ConsumerIterator.error(99)] consumed offset: 168242589 doesn't match fetch offset: 195077415 for data.original.flux.5min:0: fetched offset = 195077523: consumed offset = 168242589; Consumer may lose data 2019-08-05 19:07:38 [ERROR]-[org.logicalcobwebs.proxool.xml-test.sweep(105)] Prototype java.sql.SQLException: Io 异常: Connection reset at oracle.jdbc.dbaccess.DBError.throwSqlException(DBError.java:134) at oracle.jdbc.dbaccess.DBError.throwSqlException(DBError.java:179) at oracle.jdbc.dbaccess.DBError.throwSqlException(DBError.java:333) at oracle.jdbc.driver.OracleConnection.(OracleConnection.java:404) at oracle.jdbc.driver.OracleDriver.getConnectionInstance(OracleDriver.java:468) at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:314) at java.sql.DriverManager.getConnection(DriverManager.java:664) at java.sql.DriverManager.getConnection(DriverManager.java:208) at org.logicalcobwebs.proxool.DefaultConnectionBuilder.buildConnection(DefaultConnectionBuilder.java:39) at org.logicalcobwebs.proxool.Prototyper.buildConnection(Prototyper.java:159) at org.logicalcobwebs.proxool.Prototyper.sweep(Prototyper.java:102) at org.logicalcobwebs.proxool.PrototyperThread.run(PrototyperThread.java:44) [asiainfo@int02 ai-module-LinkGroupStatistics]$ 偶尔报错
cc = new ConsumerConfig(props);
topicMap = new ConcurrentHashMap<String, BlockingQueue<String>>();
topicConsumers = new ConcurrentHashMap<String, ConsumerConnector>();
}
为什么要单分区呢?现在消费速度是度搜好,一秒多少笔。
这边都是单分区,我看了单线程读取kafka消息5到4百多不等,运行两天多,就会出现消息卡死状况,好急呀。前辈能不能扣扣聊下823758653,或者我给您打电话,求救
019-07-30 22:10:48 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto(OracleConnection.java:404)
r$ZKSessionExpireListener@45dab8e1]
org.I0Itec.zkclient.exception.ZkException: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
at org.I0Itec.zkclient.exception.ZkException.create(ZkException.java:68)
at kafka.utils.ZKCheckedEphemeral.create(ZkUtils.scala:1139)
at kafka.consumer.ZookeeperConsumerConnector.kafka$consumer$ZookeeperConsumerConnector$$registerConsumerInZK(ZookeeperConsumerConnector.scala:282)
at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:520)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
Caused by: org.apache.zookeeper.KeeperException$SessionExpiredException: KeeperErrorCode = Session expired
at org.apache.zookeeper.KeeperException.create(KeeperException.java:127)
... 5 more
2019-08-01 16:32:13 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto
r$ZKSessionExpireListener@565d5ced]
kafka.common.ConsumerRebalanceFailedException: TfLinkGroupStatistics-app-1_int02-1564555622382-82b77842 can't rebalance after 10 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
2019-08-01 17:44:43 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto
r$ZKSessionExpireListener@11b58572]
kafka.common.ConsumerRebalanceFailedException: TfLinkGroupStatistics-app-1_int02-1564555622476-3c1ef867 can't rebalance after 10 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
2019-08-05 02:38:16 [ERROR]-[org.I0Itec.zkclient.ZkEventThread.run(77)] Error handling event ZkEvent[New session event sent to kafka.consumer.ZookeeperConsumerConnecto
r$ZKSessionExpireListener@69573f0c]
kafka.common.ConsumerRebalanceFailedException: TfLinkGroupStatistics-app-1_int02-1564728243760-6be43aea can't rebalance after 10 retries
at kafka.consumer.ZookeeperConsumerConnector$ZKRebalancerListener.syncedRebalance(ZookeeperConsumerConnector.scala:670)
at kafka.consumer.ZookeeperConsumerConnector$ZKSessionExpireListener.handleNewSession(ZookeeperConsumerConnector.scala:522)
at org.I0Itec.zkclient.ZkClient$6.run(ZkClient.java:735)
at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
2019-08-05 12:12:24 [ERROR]-[kafka.consumer.ConsumerFetcherThread.error(99)] [ConsumerFetcherThread-TfLinkGroupStatistics-app-1_int02-1564728243540-65444ef2-0-1], Curr
ent offset 168242589 for partition [data.original.flux.5min,0] out of range; reset offset to 195077415
2019-08-05 12:12:54 [ERROR]-[kafka.consumer.ConsumerIterator.error(99)] consumed offset: 168242589 doesn't match fetch offset: 195077415 for data.original.flux.5min:0:
fetched offset = 195077523: consumed offset = 168242589;
Consumer may lose data
2019-08-05 19:07:38 [ERROR]-[org.logicalcobwebs.proxool.xml-test.sweep(105)] Prototype
java.sql.SQLException: Io 异常: Connection reset
at oracle.jdbc.dbaccess.DBError.throwSqlException(DBError.java:134)
at oracle.jdbc.dbaccess.DBError.throwSqlException(DBError.java:179)
at oracle.jdbc.dbaccess.DBError.throwSqlException(DBError.java:333)
at oracle.jdbc.driver.OracleConnection.
at oracle.jdbc.driver.OracleDriver.getConnectionInstance(OracleDriver.java:468)
at oracle.jdbc.driver.OracleDriver.connect(OracleDriver.java:314)
at java.sql.DriverManager.getConnection(DriverManager.java:664)
at java.sql.DriverManager.getConnection(DriverManager.java:208)
at org.logicalcobwebs.proxool.DefaultConnectionBuilder.buildConnection(DefaultConnectionBuilder.java:39)
at org.logicalcobwebs.proxool.Prototyper.buildConnection(Prototyper.java:159)
at org.logicalcobwebs.proxool.Prototyper.sweep(Prototyper.java:102)
at org.logicalcobwebs.proxool.PrototyperThread.run(PrototyperThread.java:44)
[asiainfo@int02 ai-module-LinkGroupStatistics]$
偶尔报错
zk连接重置,并且offset的拉取到的也不匹配。
1、zk什么版本
2、kafka什么版本
3、你运行了多长时间报的错,压力多少
4、客户端源码贴出来看看
kafka_2.11-0.10.1.0 ,zookeeper-3.4.10。运行了三天,数据量五分钟200万条,由于业务逻辑,只能部署一个进程消费,业务逻辑900的线程池,单线程批量入库。解析完数据入map,每个小时key0,1,2,3,4,5,6,7,8,9,10,11.数据跑不完会被覆盖。
不知道卡在哪里了。为啥跑了几天卡住了
public void init() //consumer需要连接zookper
{//https://blog.csdn.net/u010463032/article/details/78892858
Properties props = new Properties();
props.put("zookeeper.connect", zkConfig);//指定了zookpeer的connect ,以hostname:port形式,就是zookeeper各节点的hostname和port,为防止某个挂掉,可以指定多个connect string
props.put("group.id", group);//指定了consumer的group名字,group名一样的进程属于同一个consumer group
props.put("consumer.numbers", consumers);
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "60000");
props.put("derializer.class", "kafka.serializer.DefaultDecoder");
//新增参数
props.put("zookeeper.session.timeout.ms", "70000");//socket请求超时时间,默认值是30*1000
props.put("rebalance.backoff.ms", "20000");
props.put("rebalance.max.retries", "10");
props.put("zookeeper.connection.timeout.ms", "30000");//zookper的session超时时间,没有收到心跳,则认为server挂掉了,设置过低,会被误认为挂了,如果设置过高真的挂了,很长时间才被server得知
cc = new ConsumerConfig(props); topicMap = new ConcurrentHashMap<String, BlockingQueue<String>>(); topicConsumers = new ConcurrentHashMap<String, ConsumerConnector>(); }
/**
* 启动mq消息读取线程 * @param topic 消息主题 * @param zipFlag 消息压缩标志 * @return 消息队列 */ private synchronized BlockingQueue<String> initReadMsg(String topic,boolean zipFlag) { //默认消息队列500 BlockingQueue<String> msgQ = new ArrayBlockingQueue<String>(500); // if (null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired) { topicMap.put(topic, msgQ); try { // Consumer consumer = Consumer.createJavaConsumerConnector(cc); logger.info("consumer...createJavaConsumerConnector topic={}",topic); topicConsumers.put(topic, consumer);//一个主题放一个消费者,将主题放入消费者 } catch (Exception e) { logger.error("initReadMsg err.......",e); } Thread readMsg = new Thread(new ReadMsg(topic, msgQ,zipFlag), "["+topic+"]KafkaStream Reader"); readMsg.start(); } return msgQ; }
我先读读你的代码。
ReadMsg类 贴一下
package com.ai.linkgroup.statistics.mq;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ai.linkgroup.statistics.util.ZipUtils;
import com.common.system.mq.serializer.StringSerializer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
@author blueskyevil->70942 2017年9月21日> topicMap; topicConsumers;
/
public class KafkaReceiver implements Watcher
{
private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
private ConsumerConnector consumer=null;
// param
private String zkConfig;
private String group;
private int consumers;
private KeeperState stateValue;
private ConsumerConfig cc;
// 主题对应的消息队列
private Map
// 主题对应的消息队列
private Map
public void init() //consumer需要连接zookper
{//https://blog.csdn.net/u010463032/article/details/78892858
Properties props = new Properties(); props.put("zookeeper.connect", zkConfig);//指定了zookpeer的connect ,以hostname:port形式,就是zookeeper各节点的hostname和port,为防止某个挂掉,可以指定多个connect string props.put("group.id", group);//指定了consumer的group名字,group名一样的进程属于同一个consumer group props.put("consumer.numbers", consumers); props.put("auto.commit.enable", "true"); props.put("auto.commit.interval.ms", "60000"); props.put("derializer.class", "kafka.serializer.DefaultDecoder"); //新增参数 props.put("zookeeper.session.timeout.ms", "70000");//socket请求超时时间,默认值是30*1000 props.put("rebalance.backoff.ms", "20000"); props.put("rebalance.max.retries", "10"); props.put("zookeeper.connection.timeout.ms", "30000");//zookper的session超时时间,没有收到心跳,则认为server挂掉了,设置过低,会被误认为挂了,如果设置过高真的挂了,很长时间才被server得知 cc = new ConsumerConfig(props); topicMap = new ConcurrentHashMap<String, BlockingQueue<String>>(); topicConsumers = new ConcurrentHashMap<String, ConsumerConnector>();
}
/**
@return 消息队列 receiveMsg(String topic)
*/
public BlockingQueue
{
if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
{
return initReadMsg(topic,true);
}
else
{
return topicMap.get(topic);
}
}
/**
@return 消息队列 receiveMsgUnzip(String topic)
*/
public BlockingQueue
{
//add by peiyj 防止意外关闭客户端
if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
{
return initReadMsg(topic,false);
}
else
{
return topicMap.get(topic);
}
}
/**
@return 消息队列 initReadMsg(String topic,boolean zipFlag) msgQ = new ArrayBlockingQueue(500);
*/
private synchronized BlockingQueue
{
//默认消息队列500
BlockingQueue
//
if (null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
{
topicMap.put(topic, msgQ); try { // Consumer consumer = Consumer.createJavaConsumerConnector(cc); logger.info("consumer...createJavaConsumerConnector topic={}",topic); topicConsumers.put(topic, consumer);//一个主题放一个消费者,将主题放入消费者 } catch (Exception e) { logger.error("initReadMsg err.......",e); } Thread readMsg = new Thread(new ReadMsg(topic, msgQ,zipFlag), "["+topic+"]KafkaStream Reader"); readMsg.start();
}
return msgQ;
}
/**
@param topic
*/
public void shutDownConsumer(String topic)
{
try
{
// Consumer topicConsumers.get(topic).shutdown(); logger.info("consumer...shutdown topic={}",topic); topicConsumers.remove(topic); topicMap.remove(topic);
}
catch (Exception e)
{
logger.error("shutDownConsumer err......",e);
}
}
/**
读取mq消息线程 msgQ;
*/
private class ReadMsg implements Runnable
{
private String topic;
private boolean zipFlag;
private BlockingQueue
public ReadMsg(String topic,BlockingQueue msgQ,boolean zipFlag)
{
this.topic = topic; this.msgQ = msgQ; this.zipFlag=zipFlag;
}
public void run()
{
Map<String, Integer> topicCountMap = new HashMap<String, Integer>(); // value表示consumer thread线程数量 topicCountMap.put(topic, new Integer(consumers)); while (true) { try { if(null==topicMap.get(topic)) { break; } Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = topicConsumers.get(topic).createMessageStreams(topicCountMap); logger.info("consumer...createMessageStreams topic={}",topic); for (KafkaStream<byte[], byte[]> m_stream : consumerMap.get(topic)) { ConsumerIterator<byte[], byte[]> it = m_stream.iterator(); while (it.hasNext()) { //MQ offset 移动 MessageAndMetadata<byte[], byte[]> mm = it.next(); String msg = String.valueOf(new StringSerializer().<String>deserialize(mm.message())); if(zipFlag) { msg = ZipUtils.unzip(msg); logger.debug("control receive topic={},msg={}",topic,msg); msgQ.put(msg); } else { logger.debug("topic={},msg={}",topic,msg); msgQ.put(msg); } } } } catch (Exception e) { e.printStackTrace(); logger.error("KafkaConsumer Reader Exception", e); } try { Thread.sleep(2000); } catch (InterruptedException e) { logger.error("ReadMsg sleep InterruptedException......",e); } }
}
}
public void setZkConfig(String zkConfig)
{
this.zkConfig = zkConfig;
}
public void setGroup(String group)
{
this.group = group;
}
public void setConsumers(int consumers)
{
this.consumers = consumers;
}
@Override
public void process(WatchedEvent event) {
stateValue=event.getState(); }
}
你咋用的流的方式写消费者呢?
你的答案