OrcHome OrcHome
OrcHome个人中心.
半兽人 修改资料 更换头像
关注(0) 粉丝(0) 积分(0)
关注

暂无关注.....

粉丝

暂无粉丝.....


半兽人 回复 ighack 消费者没有提交数据,该数据什么时候会被在次消费到 中:
你来决定何时commit。你的逻辑会很复杂。
6小时前
发表了 发布说明 - Kafka - 0.10.2.1版本

昨天
半兽人 kafka处理200k大小消息体优化 发表评论:
每台机器的吞吐是有限的,kafka尽可能的最大发挥利用每台节点的吞吐,你关注下你单节点机器吞吐量,如果已经接近最大,则只能增加节点,如果还很早,则需要增加分区数来提供并发。
2天前
半兽人 回复 ighack kafka多线程消费取不到数据 中:
什么叫没有提交的数据。
2天前
ighack 回复 半兽人 kafka多线程消费取不到数据 中:
我换了一种写法。现在可以接收全。但问题是自动关闭也高在了false。但没有提交的数据。不重启是收不到数据的

3天前
半兽人 回复 ighack kafka多线程消费取不到数据 中:
自动关闭设置为false。
4天前
ighack 回复 半兽人 kafka多线程消费取不到数据 中:
Consumer类的核心代码
consumer
= new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
public void run(){
boolean isBreak = false;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
DoSomeThings Dos = new DoSomeThings();
boolean iResult = Dos.execute(record);
if (iResult) {
consumer.commitSync();
}
isBreak = true;
//fixedThreadPool.execute(new DoSomeThings(record));
}
if(isBreak){
break;
}
}
consumer.wakeup();
}

主类的核心代码

ExecutorService ConsumerThreadPool = Executors.newFixedThreadPool(threadNum);
System.out.println("Hello World!");
RedisHelper.setRedisURL(redisURL);
RedisHelper.getLogicConfigs();
ThreadGroup tg = new ThreadGroup("处理线程组");
while(true){

ConsumerThreadPool.execute(new Consumer(topicName,filePath));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
我发现这种写法可以取到数据。但取不全我发5条数据。日志里只有3条。而且第次都是相同的三条。如每次发1,2,3,4,5 日志记录里每次记录的都是0,1,4

8天前
半兽人 回复 ighack kafka多线程消费取不到数据 中:
所以最多只能有3个线程消费。
8天前
ighack 回复 半兽人 消费者没有提交数据,该数据什么时候会被在次消费到 中:
还有就是offset 99 ,100这两个。我在100的位置提交了。那么是不是99的数据也取不出来了啊,还是说只是100的提交了。99的数据还是会取出来的

8天前
ighack 回复 半兽人 消费者没有提交数据,该数据什么时候会被在次消费到 中:
指定offset提交是不是这样的啊
Map<TopicPartition,OffsetAndMetadata> offset = new HashMap<TopicPartition,OffsetAndMetadata>();
offset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
consumer.commitSync(offset);

8天前
ighack 回复 半兽人 消费者没有提交数据,该数据什么时候会被在次消费到 中:
什么叫重新分配的消费者啊。我的程序一直没关闭。希望能把没有提交的数据重新消费掉。说明了就是业务处理失败的。无限消费没有提交的数据。直到该条数据被正确处理掉。就提交
8天前
ighack 回复 半兽人 kafka多线程消费取不到数据 中:
我有三个分区,只开了一个线程

8天前
半兽人 消费者没有提交数据,该数据什么时候会被在次消费到 发表评论:
重新分配的消费者可重新消费。
9天前
半兽人 回复 半兽人 kafka多线程消费取不到数据 中:
不严谨,线程数<=分区数
9天前
半兽人 kafka多线程消费取不到数据 发表评论:
http://orchome.com/5
看完这篇文章 你就明白了,分区数=线程数

9天前
ighack 回复 半兽人 kafka consumer业务处理失败时,重试机制是怎样的,会一直重新消费阻塞吗(rocketmq会放到重试队列)? 中:
我在处理失败的时候就没提交,但不知道什么时候会被重新消费。程序一停在那里没有重新消费那些没提交的数据,只有我重新启动程序才会被在次消费到

9天前
小白晒太阳 回复 半兽人 java.net.SocketTimeoutException kafka写入消费失败 中:
加了DESCRIBE权限,给上面说的机器A。
9天前
半兽人 回复 小白晒太阳 java.net.SocketTimeoutException kafka写入消费失败 中:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
你配认证了?

9天前
小白晒太阳 回复 半兽人 java.net.SocketTimeoutException kafka写入消费失败 中:
什么配置,是指kafka 配置吗,在http://orchome.com/570有贴过
9天前
半兽人 回复 小白晒太阳 java.net.SocketTimeoutException kafka写入消费失败 中:
一分钟一次还好啦。你把你的配置发一下
9天前
小白晒太阳 回复 半兽人 java.net.SocketTimeoutException kafka写入消费失败 中:
kafka日志就打印了那一条log,没有错误信息。
我发现了一个可疑的点,我的kafka集群中有2个broker所在的机器存在大量的CLOSE_WAIT。

CLOSE_WAIT 385
ESTABLISHED 8

均来自机器A ,我在机器A上做了这样的事情,每分钟用java去执行bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker -zookeeper ${zk} -topic ${topic} -group ${group}kafka自带的脚本。

感觉像是请求太频繁,kafka broker 连接未释放导致再有新的请求就出问题了。

9天前
发表了 Docker私有仓库Registry的安装

22天前
发表了 docker registry 报“Upload failed, retrying: Received unexpected HTTP status: 500 Internal Server Error”

28天前
发表了 docker registry push报“http: server gave HTTP response to HTTPS client”

28天前
发表了 在docker里运行jenkins

1月前
发表了 使用Docker搭建jetty或tomcat

1月前
发表了 Docker网络及flannel介绍

1月前
发表了 ETCD简介

1月前
发表了 Linux在shell中日期格式化(时间格式化)

1月前
发表了 Linux使用crontab实现简单定时任务

1月前
发表了 kafka SASL验证

1月前
发表了 java的JIT即时编译

2月前
发表了 时间片-简介

2月前
发表了 内核态和用户态

2月前
发表了 理解linux time命令的real, user和sys

2月前
发表了 JVM的符号引用和直接引用

2月前
发表了 Kafka Connect配置

2月前
发表了 Kafka Streams配置

2月前
发表了 Kafka新消费者配置

2月前
发表了 发行说明 - Kafka - 0.10.2.0版本

3月前

半兽人 回复 ighack 消费者没有提交数据,该数据什么时候会被在次消费到 评论:
你来决定何时commit。你的逻辑会很复杂。
6小时前
半兽人 kafka处理200k大小消息体优化 发表评论:
每台机器的吞吐是有限的,kafka尽可能的最大发挥利用每台节点的吞吐,你关注下你单节点机器吞吐量,如果已经接近最大,则只能增加节点,如果还很早,则需要增加分区数来提供并发。
2天前
半兽人 回复 ighack kafka多线程消费取不到数据 评论:
什么叫没有提交的数据。
2天前
ighack 回复 半兽人 kafka多线程消费取不到数据 评论:
我换了一种写法。现在可以接收全。但问题是自动关闭也高在了false。但没有提交的数据。不重启是收不到数据的

3天前
半兽人 回复 ighack kafka多线程消费取不到数据 评论:
自动关闭设置为false。
4天前
ighack 回复 半兽人 kafka多线程消费取不到数据 评论:
Consumer类的核心代码
consumer
= new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
public void run(){
boolean isBreak = false;
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records) {
DoSomeThings Dos = new DoSomeThings();
boolean iResult = Dos.execute(record);
if (iResult) {
consumer.commitSync();
}
isBreak = true;
//fixedThreadPool.execute(new DoSomeThings(record));
}
if(isBreak){
break;
}
}
consumer.wakeup();
}

主类的核心代码

ExecutorService ConsumerThreadPool = Executors.newFixedThreadPool(threadNum);
System.out.println("Hello World!");
RedisHelper.setRedisURL(redisURL);
RedisHelper.getLogicConfigs();
ThreadGroup tg = new ThreadGroup("处理线程组");
while(true){

ConsumerThreadPool.execute(new Consumer(topicName,filePath));
try {
Thread.sleep(100);
} catch (InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
我发现这种写法可以取到数据。但取不全我发5条数据。日志里只有3条。而且第次都是相同的三条。如每次发1,2,3,4,5 日志记录里每次记录的都是0,1,4

8天前
半兽人 回复 ighack kafka多线程消费取不到数据 评论:
所以最多只能有3个线程消费。
8天前
ighack 回复 半兽人 消费者没有提交数据,该数据什么时候会被在次消费到 评论:
还有就是offset 99 ,100这两个。我在100的位置提交了。那么是不是99的数据也取不出来了啊,还是说只是100的提交了。99的数据还是会取出来的

8天前
ighack 回复 半兽人 消费者没有提交数据,该数据什么时候会被在次消费到 评论:
指定offset提交是不是这样的啊
Map<TopicPartition,OffsetAndMetadata> offset = new HashMap<TopicPartition,OffsetAndMetadata>();
offset.put(new TopicPartition(record.topic(),record.partition()),new OffsetAndMetadata(record.offset()));
consumer.commitSync(offset);

8天前
ighack 回复 半兽人 消费者没有提交数据,该数据什么时候会被在次消费到 评论:
什么叫重新分配的消费者啊。我的程序一直没关闭。希望能把没有提交的数据重新消费掉。说明了就是业务处理失败的。无限消费没有提交的数据。直到该条数据被正确处理掉。就提交
8天前
ighack 回复 半兽人 kafka多线程消费取不到数据 评论:
我有三个分区,只开了一个线程

8天前
半兽人 消费者没有提交数据,该数据什么时候会被在次消费到 发表评论:
重新分配的消费者可重新消费。
9天前
半兽人 回复 半兽人 kafka多线程消费取不到数据 评论:
不严谨,线程数<=分区数
9天前
半兽人 kafka多线程消费取不到数据 发表评论:
http://orchome.com/5
看完这篇文章 你就明白了,分区数=线程数

9天前
ighack 回复 半兽人 kafka consumer业务处理失败时,重试机制是怎样的,会一直重新消费阻塞吗(rocketmq会放到重试队列)? 评论:
我在处理失败的时候就没提交,但不知道什么时候会被重新消费。程序一停在那里没有重新消费那些没提交的数据,只有我重新启动程序才会被在次消费到

9天前
小白晒太阳 回复 半兽人 java.net.SocketTimeoutException kafka写入消费失败 评论:
加了DESCRIBE权限,给上面说的机器A。
9天前
半兽人 回复 小白晒太阳 java.net.SocketTimeoutException kafka写入消费失败 评论:
authorizer.class.name=kafka.security.auth.SimpleAclAuthorizer
你配认证了?

9天前
小白晒太阳 回复 半兽人 java.net.SocketTimeoutException kafka写入消费失败 评论:
什么配置,是指kafka 配置吗,在http://orchome.com/570有贴过
9天前
半兽人 回复 小白晒太阳 java.net.SocketTimeoutException kafka写入消费失败 评论:
一分钟一次还好啦。你把你的配置发一下
9天前
小白晒太阳 回复 半兽人 java.net.SocketTimeoutException kafka写入消费失败 评论:
kafka日志就打印了那一条log,没有错误信息。
我发现了一个可疑的点,我的kafka集群中有2个broker所在的机器存在大量的CLOSE_WAIT。

CLOSE_WAIT 385
ESTABLISHED 8

均来自机器A ,我在机器A上做了这样的事情,每分钟用java去执行bin/kafka-run-class.sh kafka.tools.ConsumerOffsetChecker -zookeeper ${zk} -topic ${topic} -group ${group}kafka自带的脚本。

感觉像是请求太频繁,kafka broker 连接未释放导致再有新的请求就出问题了。

9天前