有后续吗 我遇到跟你一样的问题
package com.ai.linkgroup.statistics.mq;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.ai.linkgroup.statistics.util.ZipUtils;
import com.common.system.mq.serializer.StringSerializer;
import kafka.consumer.Consumer;
import kafka.consumer.ConsumerConfig;
import kafka.consumer.ConsumerIterator;
import kafka.consumer.KafkaStream;
import kafka.javaapi.consumer.ConsumerConnector;
import kafka.message.MessageAndMetadata;
/**
@author blueskyevil->70942 2017年9月21日
/
public class KafkaReceiver implements Watcher
{
private static final Logger logger = LoggerFactory.getLogger(KafkaReceiver.class);
private ConsumerConnector consumer=null;
// param
private String zkConfig;
private String group;
private int consumers;
private KeeperState stateValue;
private ConsumerConfig cc;
// 主题对应的消息队列
private Map
// 主题对应的消息队列
private Map
public void init() //consumer需要连接zookper
{//https://blog.csdn.net/u010463032/article/details/78892858
Properties props = new Properties();
props.put("zookeeper.connect", zkConfig);//指定了zookpeer的connect ,以hostname:port形式,就是zookeeper各节点的hostname和port,为防止某个挂掉,可以指定多个connect string
props.put("group.id", group);//指定了consumer的group名字,group名一样的进程属于同一个consumer group
props.put("consumer.numbers", consumers);
props.put("auto.commit.enable", "true");
props.put("auto.commit.interval.ms", "60000");
props.put("derializer.class", "kafka.serializer.DefaultDecoder");
//新增参数
props.put("zookeeper.session.timeout.ms", "70000");//socket请求超时时间,默认值是30*1000
props.put("rebalance.backoff.ms", "20000");
props.put("rebalance.max.retries", "10");
props.put("zookeeper.connection.timeout.ms", "30000");//zookper的session超时时间,没有收到心跳,则认为server挂掉了,设置过低,会被误认为挂了,如果设置过高真的挂了,很长时间才被server得知
cc = new ConsumerConfig(props);
topicMap = new ConcurrentHashMap<String, BlockingQueue<String>>();
topicConsumers = new ConcurrentHashMap<String, ConsumerConnector>();
}
/**
@return 消息队列
*/
public BlockingQueue
{
if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
{
return initReadMsg(topic,true);
}
else
{
return topicMap.get(topic);
}
}
/**
@return 消息队列
*/
public BlockingQueue
{
//add by peiyj 防止意外关闭客户端
if(null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
{
return initReadMsg(topic,false);
}
else
{
return topicMap.get(topic);
}
}
/**
@return 消息队列
*/
private synchronized BlockingQueue
{
//默认消息队列500
BlockingQueue
//
if (null==topicMap.get(topic)||stateValue==Event.KeeperState.Expired)
{
topicMap.put(topic, msgQ);
try
{
// Consumer
consumer = Consumer.createJavaConsumerConnector(cc);
logger.info("consumer...createJavaConsumerConnector topic={}",topic);
topicConsumers.put(topic, consumer);//一个主题放一个消费者,将主题放入消费者
}
catch (Exception e)
{
logger.error("initReadMsg err.......",e);
}
Thread readMsg = new Thread(new ReadMsg(topic, msgQ,zipFlag), "["+topic+"]KafkaStream Reader");
readMsg.start();
}
return msgQ;
}
/**
@param topic
*/
public void shutDownConsumer(String topic)
{
try
{
// Consumer
topicConsumers.get(topic).shutdown();
logger.info("consumer...shutdown topic={}",topic);
topicConsumers.remove(topic);
topicMap.remove(topic);
}
catch (Exception e)
{
logger.error("shutDownConsumer err......",e);
}
}
/**
读取mq消息线程
*/
private class ReadMsg implements Runnable
{
private String topic;
private boolean zipFlag;
private BlockingQueue
public ReadMsg(String topic,BlockingQueue
{
this.topic = topic;
this.msgQ = msgQ;
this.zipFlag=zipFlag;
}
public void run()
{
Map<String, Integer> topicCountMap = new HashMap<String, Integer>();
// value表示consumer thread线程数量
topicCountMap.put(topic, new Integer(consumers));
while (true)
{
try
{
if(null==topicMap.get(topic))
{
break;
}
Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = topicConsumers.get(topic).createMessageStreams(topicCountMap);
logger.info("consumer...createMessageStreams topic={}",topic);
for (KafkaStream<byte[], byte[]> m_stream : consumerMap.get(topic))
{
ConsumerIterator<byte[], byte[]> it = m_stream.iterator();
while (it.hasNext())
{
//MQ offset 移动
MessageAndMetadata<byte[], byte[]> mm = it.next();
String msg = String.valueOf(new StringSerializer().<String>deserialize(mm.message()));
if(zipFlag)
{
msg = ZipUtils.unzip(msg);
logger.debug("control receive topic={},msg={}",topic,msg);
msgQ.put(msg);
}
else
{
logger.debug("topic={},msg={}",topic,msg);
msgQ.put(msg);
}
}
}
}
catch (Exception e)
{
e.printStackTrace();
logger.error("KafkaConsumer Reader Exception", e);
}
try
{
Thread.sleep(2000);
}
catch (InterruptedException e)
{
logger.error("ReadMsg sleep InterruptedException......",e);
}
}
}
}
public void setZkConfig(String zkConfig)
{
this.zkConfig = zkConfig;
}
public void setGroup(String group)
{
this.group = group;
}
public void setConsumers(int consumers)
{
this.consumers = consumers;
}
@Override
public void process(WatchedEvent event) {
stateValue=event.getState();
}
}
今日访问:
昨日访问:
本周访问:
本月访问:
所有访问:1065
最近登录:5年前
加入时间:2019-06-17 17:42:22