kafka第一条记录消费不到, 接下来的消息能够正常消费到。

袁江镇 发表于: 2019-08-15   最后更新时间: 2019-08-15 17:36:00   13,254 游览

日志如下:

[2019-08-15 05:54:53.165][main][INFO ][RCStep 668] - Polling: ...
[2019-08-15 05:54:53.195][main][INFO ][Metadata 285] - Cluster ID: jjcD6HjpTjycBzzdDSapgw
[2019-08-15 05:54:53.197][main][INFO ][AbstractCoordinator 677] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Discovered group coordinator 192.168.31.14:9092 (id: 2147482641 rack: null)
[2019-08-15 05:54:53.199][main][INFO ][ConsumerCoordinator 472] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Revoking previously assigned partitions []
[2019-08-15 05:54:53.200][main][INFO ][AbstractCoordinator 509] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] (Re-)joining group
[2019-08-15 05:54:54.170][main][INFO ][RCStep 672] - partitions:[]
[2019-08-15 05:54:54.170][main][INFO ][RCStep 675] - records.count=0
[2019-08-15 05:54:54.171][main][DEBUG][BasicUtil 17] - took 1006ms.
[2019-08-15 05:54:54.172][main][INFO ][RCStep 668] - Polling: ...
[2019-08-15 05:54:55.173][main][INFO ][RCStep 672] - partitions:[]
[2019-08-15 05:54:55.173][main][INFO ][RCStep 675] - records.count=0
[2019-08-15 05:54:55.174][main][DEBUG][BasicUtil 17] - took 2009ms.
[2019-08-15 05:54:55.174][main][INFO ][RCStep 668] - Polling: ...
[2019-08-15 05:54:56.175][main][INFO ][RCStep 672] - partitions:[]
[2019-08-15 05:54:56.176][main][INFO ][RCStep 675] - records.count=0
[2019-08-15 05:54:56.176][main][DEBUG][BasicUtil 17] - took 3011ms.
[2019-08-15 05:54:56.176][main][INFO ][RCStep 668] - Polling: ...
[2019-08-15 05:54:56.229][main][INFO ][AbstractCoordinator 473] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Successfully joined group with generation 1
[2019-08-15 05:54:56.230][main][INFO ][ConsumerCoordinator 280] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Setting newly assigned partitions [CVC.TELEMATICS.MSD.V1-11, CVC.TELEMATICS.MSD.V1-10, CVC.TELEMATICS.MSD.V1-1, CVC.TELEMATICS.MSD.V1-0, CVC.TELEMATICS.MSD.V1-5, CVC.TELEMATICS.MSD.V1-4, CVC.TELEMATICS.MSD.V1-3, CVC.TELEMATICS.MSD.V1-2, CVC.TELEMATICS.MSD.V1-9, CVC.TELEMATICS.MSD.V1-8, CVC.TELEMATICS.MSD.V1-7, CVC.TELEMATICS.MSD.V1-6]
[2019-08-15 05:54:56.258][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-11 to offset 2.
[2019-08-15 05:54:56.258][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-10 to offset 0.
[2019-08-15 05:54:56.258][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-1 to offset 0.
[2019-08-15 05:54:56.258][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-0 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-5 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-4 to offset 106.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-3 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-2 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-9 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-8 to offset 1.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-7 to offset 0.
[2019-08-15 05:54:56.260][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-6 to offset 0.
[2019-08-15 05:54:57.176][main][INFO ][RCStep 672] - partitions:[]
[2019-08-15 05:54:57.177][main][INFO ][RCStep 675] - records.count=0
[2019-08-15 05:54:57.177][main][DEBUG][BasicUtil 17] - took 4012ms.
发表于 2019-08-15
添加评论

先启动消费者,消息就不会丢了。

跟订报纸一样,当你从订阅报纸的那一刻,报社才知道你,后续的报纸才会发送给你(之前的不会给你),所以你需要优先启动消费者。

袁江镇 -> 半兽人 5年前

我是在程序启动是就subscribe了

袁江镇 -> 半兽人 5年前

send消息时 offset=105。rejoined group后,offset怎么就是106了

半兽人 -> 袁江镇 5年前

你send的时候,还获取到offset了?

袁江镇 -> 半兽人 5年前
RecordMetadata rmd = producer.send(record).get();
   LOG.debug(
     "RecordMetadata:offset={}, partition={}, topic={}",
      rmd.offset(),
      rmd.partition(),
      rmd.topic());

send之后就是这个地方打印了offset的值。

半兽人 -> 袁江镇 5年前

同步发送,马上获取到回通知结果。

生产者发送消息的RecordMetadata日志记录:

[2019-08-15 05:54:53.136][main][DEBUG][DataUploadStep 53] - RecordMetadata:offset=105, partition=4, topic=CVC.TELEMATICS.MSD.V1
408097591 -> 袁江镇 5年前

auto.offset.reset 如果KafkaConsumer没有初始化的offset的话,就会采用这个策略,默认是latest,这样的话就会忽略第一条KafkaProducer生产的消息。

你把auto.offset.reset改成latest,或许能解决你的问题

[2019-08-16 08:49:32.864][main][INFO ][ConsumerCoordinator 472] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Revoking previously assigned partitions []
[2019-08-16 08:49:32.865][main][INFO ][AbstractCoordinator 509] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] (Re-)joining group
[2019-08-16 08:49:33.829][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:33.830][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:33.831][main][DEBUG][BasicUtil 17] - took 1007ms.
[2019-08-16 08:49:33.831][main][INFO ][RCStep 668] - Polling: ...
[2019-08-16 08:49:34.832][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:34.833][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:34.833][main][DEBUG][BasicUtil 17] - took 2009ms.
[2019-08-16 08:49:34.833][main][INFO ][RCStep 668] - Polling: ...
[2019-08-16 08:49:35.834][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:35.834][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:35.834][main][DEBUG][BasicUtil 17] - took 3010ms.
[2019-08-16 08:49:35.834][main][INFO ][RCStep 668] - Polling: ...
[2019-08-16 08:49:35.895][main][INFO ][AbstractCoordinator 473] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Successfully joined group with generation 1
[2019-08-16 08:49:35.896][main][INFO ][ConsumerCoordinator 280] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Setting newly assigned partitions [CVC.TELEMATICS.MSD.V1-11, CVC.TELEMATICS.MSD.V1-10, CVC.TELEMATICS.MSD.V1-1, CVC.TELEMATICS.MSD.V1-0, CVC.TELEMATICS.MSD.V1-5, CVC.TELEMATICS.MSD.V1-4, CVC.TELEMATICS.MSD.V1-3, CVC.TELEMATICS.MSD.V1-2, CVC.TELEMATICS.MSD.V1-9, CVC.TELEMATICS.MSD.V1-8, CVC.TELEMATICS.MSD.V1-7, CVC.TELEMATICS.MSD.V1-6]
[2019-08-16 08:49:35.932][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-11 to offset 2.
[2019-08-16 08:49:35.933][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-10 to offset 0.
[2019-08-16 08:49:35.933][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-1 to offset 0.
[2019-08-16 08:49:35.933][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-0 to offset 0.
[2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-5 to offset 0.
[2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-4 to offset 147.
[2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-3 to offset 0.
[2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-2 to offset 0.
[2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-9 to offset 9.
[2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-8 to offset 1.
[2019-08-16 08:49:35.935][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-7 to offset 0.
[2019-08-16 08:49:35.935][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-6 to offset 0.
[2019-08-16 08:49:36.835][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:36.836][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:36.836][main][DEBUG][BasicUtil 17] - took 4012ms.
[2019-08-16 08:49:36.836][main][INFO ][RCStep 668] - Polling: ...
[2019-08-16 08:49:37.837][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:37.838][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:37.838][main][DEBUG][BasicUtil 17] - took 5014ms.
[2019-08-16 08:49:37.838][main][INFO ][RCStep 668] - Polling: ...
[2019-08-16 08:49:38.839][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:38.839][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:38.839][main][DEBUG][BasicUtil 17] - took 6015ms.
[2019-08-16 08:49:38.839][main][INFO ][RCStep 668] - Polling: ...
[2019-08-16 08:49:39.840][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:39.840][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:39.840][main][DEBUG][BasicUtil 17] - took 7016ms.
[2019-08-16 08:49:39.840][main][INFO ][RCStep 668] - Polling: ...
[2019-08-16 08:49:40.841][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:40.841][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:40.841][main][DEBUG][BasicUtil 17] - took 8017ms.
[2019-08-16 08:49:40.842][main][INFO ][RCStep 668] - Polling: ...
[2019-08-16 08:49:41.842][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:41.843][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:41.843][main][DEBUG][BasicUtil 17] - took 9019ms.
[2019-08-16 08:49:41.843][main][INFO ][RCStep 668] - Polling: ...
[2019-08-16 08:49:42.844][main][INFO ][RCStep 672] - partitions:[]
[2019-08-16 08:49:42.844][main][INFO ][RCStep 675] - records.count=0
[2019-08-16 08:49:42.844][main][DEBUG][BasicUtil 17] - took 10020ms.
[2019-08-16 08:49:42.846][main][DEBUG][RCStep 637] - get ecall/bcall eventId:null
袁江镇 -> 袁江镇 5年前

[2019-08-15 05:54:57.176][main][INFO ][RCStep 672] - partitions:[]
重新joined group,partition为空。 。。。

半兽人 -> 袁江镇 5年前

你的问题我都看乱了。
这个不该是consumer先已经join进去了,为什么还会 rejoined group。

袁江镇 -> 半兽人 5年前

都Fetch到offset了, 为啥就poll不到数据?

袁江镇 -> 袁江镇 5年前
[2019-09-11 06:05:06.087][main][INFO ][AbstractCoordinator 677] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Discovered group coordinator 192.168.31.14:9092 (id: 2147482641 rack: null)
[2019-09-11 06:05:06.094][main][INFO ][ConsumerCoordinator 472] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Revoking previously assigned partitions []
[2019-09-11 06:05:06.094][main][INFO ][AbstractCoordinator 509] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] (Re-)joining group
[2019-09-11 06:05:07.017][main][INFO ][RCStep 239] - records.count=0
[2019-09-11 06:05:07.018][main][DEBUG][BasicUtil 17] - took 1006ms.
[2019-09-11 06:05:07.018][main][DEBUG][RCStep 235] - start polling ...
[2019-09-11 06:05:08.020][main][INFO ][RCStep 239] - records.count=0
[2019-09-11 06:05:08.020][main][DEBUG][BasicUtil 17] - took 2008ms.
[2019-09-11 06:05:08.020][main][DEBUG][RCStep 235] - start polling ...
[2019-09-11 06:05:09.022][main][INFO ][RCStep 239] - records.count=0
[2019-09-11 06:05:09.022][main][DEBUG][BasicUtil 17] - took 3010ms.
[2019-09-11 06:05:09.022][main][DEBUG][RCStep 235] - start polling ...
[2019-09-11 06:05:09.148][main][INFO ][AbstractCoordinator 473] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Successfully joined group with generation 1
[2019-09-11 06:05:09.149][main][INFO ][ConsumerCoordinator 280] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Setting newly assigned partitions [CVC.TELEMATICS.RESPONSE.V1.AppMocker001-8, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-9, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-6, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-7, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-10, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-11, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-0, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-1, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-4, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-5, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-2, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-3]
[2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-8 to offset 0.
[2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-9 to offset 0.
[2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-6 to offset 0.
[2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-7 to offset 0.
[2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-10 to offset 0.
[2019-09-11 06:05:09.187][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-11 to offset 0.
[2019-09-11 06:05:09.187][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-0 to offset 0.
[2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-1 to offset 0.
[2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-4 to offset 0.
[2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-5 to offset 0.
[2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-2 to offset 0.
[2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-3 to offset 22.
[2019-09-11 06:05:10.023][main][INFO ][RCStep 239] - records.count=0
[2019-09-11 06:05:10.023][main][DEBUG][BasicUtil 17] - took 4011ms.
[2019-09-11 06:05:10.023][main][DEBUG][RCStep 235] - start polling ...
[2019-09-11 06:05:11.025][main][INFO ][RCStep 239] - records.count=0
[2019-09-11 06:05:11.025][main][DEBUG][BasicUtil 17] - took 5013ms.
[2019-09-11 06:05:11.025][main][DEBUG][RCStep 235] - start polling ...
你的答案

查看kafka相关的其他问题或提一个您自己的问题