OrcHome OrcHome
OrcHome个人中心.

√锋²º¹8ヾ/❤

已关注 关注

关注

暂无关注.....

粉丝

暂无粉丝.....


半兽人 回复 √锋²º¹8ヾ/❤ kafka消费者客户端(0.10.0.1API) 中:
这个是合理的吧,你订阅哪些,和拉出来的全部的列表,是2个功能呀
3天前
√锋²º¹8ヾ/❤ kafka消费者客户端(0.10.0.1API) 发表评论:
大神您好,
在初始化消费者的时候,有  consumer.subscribe(Arrays.asList("foo", "bar"));
但是测试发现 ,使用 Map<String, List<PartitionInfo>> listTopics = consumer.listTopics(); 能够得到该kafka 的broker上的所有主题。

哪怕是 subscribe 的主题为任意值。 consumer.listTopics() 都能得到所有主题。但是消费的时候,却又只会消费订阅的那个主题。
3天前
半兽人 回复 √锋²º¹8ヾ/❤ 业务系统连接不上kafka服务处理 中:
1、阻塞的可以直接捕获异常,根据你业务可以针对性的进行处理,落库也好,都可以。我个人觉得日志打出来就好,因为kafka发送出现异常少之又少,运行5年了,没出现过。不然你为了异常要额外添加很多处理逻辑。
2、kafkaProducer_sync一次初始化,和配置一起初始化。
3、 kafkaProducer_sync.close();   在系统停止的时候调用。
7天前
√锋²º¹8ヾ/❤ 业务系统连接不上kafka服务处理 发表评论:
1. 如何考虑kafka服务的异常情况
2. kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);  该在什么地方初始化才合适?


8天前
发表了 业务系统连接不上kafka服务处理
8天前
半兽人 回复 √锋²º¹8ヾ/❤ kafka入门介绍 中:
到问题专区发完整的代码吧。
kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);
是一次。
9天前
√锋²º¹8ヾ/❤ 回复 半兽人 kafka入门介绍 中:
        ==============================================================================
long startTime = System.currentTimeMillis();
  //这里通过Future.get()方法,阻塞当前线程,等待Kafka服务端的ACK响应
         try {
          reply.setStatus(1);
          ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, key,value);
          Future<RecordMetadata> future = kafkaProducer_sync.send(record);
          RecordMetadata recordMetadata = future.get();
          if (recordMetadata == null) {
           reply.setStatus(0);
           reply.setMsg("发送失败:recordMetadata为空");
           return reply;
          }
         } catch (Exception e) {
          reply.setStatus(0);
          if(e instanceof java.util.concurrent.ExecutionException) {
           if(e.getCause() != null && e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException) {
            reply.setMsg("发送超时:"+e.getMessage());
            kafkaProducer_sync.close();
           }else {
            reply.setMsg("发送失败");
           }
          }else {
           reply.setMsg("发送失败");
          }
          e.printStackTrace();
             return reply;
         }finally {
          //kafkaProducer_sync.close();
  }
         long endTime = System.currentTimeMillis();
         reply.setMsg("发送成功! 响应时间["+ ((endTime - startTime) / 1000.0) + "秒]");
         logger.info("sendSync 响应时间[" + ((endTime - startTime) / 1000.0) + "秒]");
   return reply;
===========================================================================
您看我这样的逻辑是否符合?

kafkaProducer_sync 这个对象,有没有必要在每次调用上面方法的时候都
kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);
初始化一次?!!

还是在static 块调用一次就好了?还有就是 kafkaProducer_sync.close()这个用不用好?我发现用了会报send 之前已关闭的异常。我看一些例子都在调用完send之后,调用一下 producer.close();

附配置:max.block.ms设置为8秒
metadata.fetch.timeout.ms 设置为 20秒

9天前
半兽人 回复 √锋²º¹8ヾ/❤ kafka入门介绍 中:
对,send的时候捕获异常,消费者poll会自动恢复,不用管的。
主要是发送者防止消息丢失。
9天前
√锋²º¹8ヾ/❤ 回复 無名 kafka入门介绍 中:
谢谢您的回复。
非阻塞和阻塞模式,大概的了解。
我问题中“有没有什么机制能检测kafka的服务是否正常?”--这个意思是假若我们程序连接的kafka是在新网,我们在本地调用新网的xxx:9092,一种情况是我将本地网线拔了;一种是新网的服务停止了。这个时候,在send或者poll的时候,捕获kafka服务的异常?
9天前
無名 回复 √锋²º¹8ヾ/❤ kafka入门介绍 中:
首先,retries设置为0,就不会重试了。
其次,你发送方式属于非阻塞式的,后台发送,可以利用callback来获取发送成功还是失败的异常。
如果你想及时捕捉,可使用阻塞式发送,效率牺牲很大,只需要在发送的时候价格.get()即可,就能实时拿到异常。
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))).get();

9天前
√锋²º¹8ヾ/❤ kafka入门介绍 发表评论:
您好,有个问题想请教下你:假如我的kafka服务是192.168.100.238:9092,如果在我的应用中,使用了192.168.100.239:9092 或者 临时将kafka服务停掉。这个时候,我使用同步模式进行生产者发送消息。逻辑为“1xxx系统业务----2kafka发送(等待响应)--3xxx系统业务”。但是第2步中,kafka服务异常的情况下,好像一直得不到任何响应。
有没有什么机制能检测kafka的服务是否正常?另外是,使用main方法调用,有个60000的超时提示,但是好像还是一直尝试发送。我们能不能控制(捕获)kafka的超时响应,还有当kafka服务down掉之后,不再尝试。
谢谢
9天前

半兽人 回复 √锋²º¹8ヾ/❤ kafka消费者客户端(0.10.0.1API) 评论:
这个是合理的吧,你订阅哪些,和拉出来的全部的列表,是2个功能呀
3天前
√锋²º¹8ヾ/❤ kafka消费者客户端(0.10.0.1API) 发表评论:
大神您好,
在初始化消费者的时候,有  consumer.subscribe(Arrays.asList("foo", "bar"));
但是测试发现 ,使用 Map<String, List<PartitionInfo>> listTopics = consumer.listTopics(); 能够得到该kafka 的broker上的所有主题。

哪怕是 subscribe 的主题为任意值。 consumer.listTopics() 都能得到所有主题。但是消费的时候,却又只会消费订阅的那个主题。
3天前
半兽人 回复 √锋²º¹8ヾ/❤ 业务系统连接不上kafka服务处理 评论:
1、阻塞的可以直接捕获异常,根据你业务可以针对性的进行处理,落库也好,都可以。我个人觉得日志打出来就好,因为kafka发送出现异常少之又少,运行5年了,没出现过。不然你为了异常要额外添加很多处理逻辑。
2、kafkaProducer_sync一次初始化,和配置一起初始化。
3、 kafkaProducer_sync.close();   在系统停止的时候调用。
7天前
√锋²º¹8ヾ/❤ 业务系统连接不上kafka服务处理 发表评论:
1. 如何考虑kafka服务的异常情况
2. kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);  该在什么地方初始化才合适?


8天前
半兽人 回复 √锋²º¹8ヾ/❤ kafka入门介绍 评论:
到问题专区发完整的代码吧。
kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);
是一次。
9天前
√锋²º¹8ヾ/❤ 回复 半兽人 kafka入门介绍 评论:
        ==============================================================================
long startTime = System.currentTimeMillis();
  //这里通过Future.get()方法,阻塞当前线程,等待Kafka服务端的ACK响应
         try {
          reply.setStatus(1);
          ProducerRecord<String, String> record = new ProducerRecord<String,String>(topic, key,value);
          Future<RecordMetadata> future = kafkaProducer_sync.send(record);
          RecordMetadata recordMetadata = future.get();
          if (recordMetadata == null) {
           reply.setStatus(0);
           reply.setMsg("发送失败:recordMetadata为空");
           return reply;
          }
         } catch (Exception e) {
          reply.setStatus(0);
          if(e instanceof java.util.concurrent.ExecutionException) {
           if(e.getCause() != null && e.getCause() instanceof org.apache.kafka.common.errors.TimeoutException) {
            reply.setMsg("发送超时:"+e.getMessage());
            kafkaProducer_sync.close();
           }else {
            reply.setMsg("发送失败");
           }
          }else {
           reply.setMsg("发送失败");
          }
          e.printStackTrace();
             return reply;
         }finally {
          //kafkaProducer_sync.close();
  }
         long endTime = System.currentTimeMillis();
         reply.setMsg("发送成功! 响应时间["+ ((endTime - startTime) / 1000.0) + "秒]");
         logger.info("sendSync 响应时间[" + ((endTime - startTime) / 1000.0) + "秒]");
   return reply;
===========================================================================
您看我这样的逻辑是否符合?

kafkaProducer_sync 这个对象,有没有必要在每次调用上面方法的时候都
kafkaProducer_sync = new KafkaProducer<String, String>(props_sync);
初始化一次?!!

还是在static 块调用一次就好了?还有就是 kafkaProducer_sync.close()这个用不用好?我发现用了会报send 之前已关闭的异常。我看一些例子都在调用完send之后,调用一下 producer.close();

附配置:max.block.ms设置为8秒
metadata.fetch.timeout.ms 设置为 20秒

9天前
半兽人 回复 √锋²º¹8ヾ/❤ kafka入门介绍 评论:
对,send的时候捕获异常,消费者poll会自动恢复,不用管的。
主要是发送者防止消息丢失。
9天前
√锋²º¹8ヾ/❤ 回复 無名 kafka入门介绍 评论:
谢谢您的回复。
非阻塞和阻塞模式,大概的了解。
我问题中“有没有什么机制能检测kafka的服务是否正常?”--这个意思是假若我们程序连接的kafka是在新网,我们在本地调用新网的xxx:9092,一种情况是我将本地网线拔了;一种是新网的服务停止了。这个时候,在send或者poll的时候,捕获kafka服务的异常?
9天前
無名 回复 √锋²º¹8ヾ/❤ kafka入门介绍 评论:
首先,retries设置为0,就不会重试了。
其次,你发送方式属于非阻塞式的,后台发送,可以利用callback来获取发送成功还是失败的异常。
如果你想及时捕捉,可使用阻塞式发送,效率牺牲很大,只需要在发送的时候价格.get()即可,就能实时拿到异常。
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i))).get();

9天前
√锋²º¹8ヾ/❤ kafka入门介绍 发表评论:
您好,有个问题想请教下你:假如我的kafka服务是192.168.100.238:9092,如果在我的应用中,使用了192.168.100.239:9092 或者 临时将kafka服务停掉。这个时候,我使用同步模式进行生产者发送消息。逻辑为“1xxx系统业务----2kafka发送(等待响应)--3xxx系统业务”。但是第2步中,kafka服务异常的情况下,好像一直得不到任何响应。
有没有什么机制能检测kafka的服务是否正常?另外是,使用main方法调用,有个60000的超时提示,但是好像还是一直尝试发送。我们能不能控制(捕获)kafka的超时响应,还有当kafka服务down掉之后,不再尝试。
谢谢
9天前

√锋²º¹8ヾ/❤ 发表了 业务系统连接不上kafka服务处理
8天前