日志如下:
[2019-08-15 05:54:53.165][main][INFO ][RCStep 668] - Polling: ...
[2019-08-15 05:54:53.195][main][INFO ][Metadata 285] - Cluster ID: jjcD6HjpTjycBzzdDSapgw
[2019-08-15 05:54:53.197][main][INFO ][AbstractCoordinator 677] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Discovered group coordinator 192.168.31.14:9092 (id: 2147482641 rack: null)
[2019-08-15 05:54:53.199][main][INFO ][ConsumerCoordinator 472] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Revoking previously assigned partitions []
[2019-08-15 05:54:53.200][main][INFO ][AbstractCoordinator 509] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] (Re-)joining group
[2019-08-15 05:54:54.170][main][INFO ][RCStep 672] - partitions:[]
[2019-08-15 05:54:54.170][main][INFO ][RCStep 675] - records.count=0
[2019-08-15 05:54:54.171][main][DEBUG][BasicUtil 17] - took 1006ms.
[2019-08-15 05:54:54.172][main][INFO ][RCStep 668] - Polling: ...
[2019-08-15 05:54:55.173][main][INFO ][RCStep 672] - partitions:[]
[2019-08-15 05:54:55.173][main][INFO ][RCStep 675] - records.count=0
[2019-08-15 05:54:55.174][main][DEBUG][BasicUtil 17] - took 2009ms.
[2019-08-15 05:54:55.174][main][INFO ][RCStep 668] - Polling: ...
[2019-08-15 05:54:56.175][main][INFO ][RCStep 672] - partitions:[]
[2019-08-15 05:54:56.176][main][INFO ][RCStep 675] - records.count=0
[2019-08-15 05:54:56.176][main][DEBUG][BasicUtil 17] - took 3011ms.
[2019-08-15 05:54:56.176][main][INFO ][RCStep 668] - Polling: ...
[2019-08-15 05:54:56.229][main][INFO ][AbstractCoordinator 473] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Successfully joined group with generation 1
[2019-08-15 05:54:56.230][main][INFO ][ConsumerCoordinator 280] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Setting newly assigned partitions [CVC.TELEMATICS.MSD.V1-11, CVC.TELEMATICS.MSD.V1-10, CVC.TELEMATICS.MSD.V1-1, CVC.TELEMATICS.MSD.V1-0, CVC.TELEMATICS.MSD.V1-5, CVC.TELEMATICS.MSD.V1-4, CVC.TELEMATICS.MSD.V1-3, CVC.TELEMATICS.MSD.V1-2, CVC.TELEMATICS.MSD.V1-9, CVC.TELEMATICS.MSD.V1-8, CVC.TELEMATICS.MSD.V1-7, CVC.TELEMATICS.MSD.V1-6]
[2019-08-15 05:54:56.258][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-11 to offset 2.
[2019-08-15 05:54:56.258][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-10 to offset 0.
[2019-08-15 05:54:56.258][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-1 to offset 0.
[2019-08-15 05:54:56.258][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-0 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-5 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-4 to offset 106.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-3 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-2 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-9 to offset 0.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-8 to offset 1.
[2019-08-15 05:54:56.259][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-7 to offset 0.
[2019-08-15 05:54:56.260][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-6 to offset 0.
[2019-08-15 05:54:57.176][main][INFO ][RCStep 672] - partitions:[]
[2019-08-15 05:54:57.177][main][INFO ][RCStep 675] - records.count=0
[2019-08-15 05:54:57.177][main][DEBUG][BasicUtil 17] - took 4012ms.
先启动消费者,消息就不会丢了。
我是在程序启动是就subscribe了
send消息时 offset=105。rejoined group后,offset怎么就是106了
你send的时候,还获取到offset了?
RecordMetadata rmd = producer.send(record).get(); LOG.debug( "RecordMetadata:offset={}, partition={}, topic={}", rmd.offset(), rmd.partition(), rmd.topic());
send之后就是这个地方打印了offset的值。
同步发送,马上获取到回通知结果。
生产者发送消息的
RecordMetadata
日志记录:[2019-08-15 05:54:53.136][main][DEBUG][DataUploadStep 53] - RecordMetadata:offset=105, partition=4, topic=CVC.TELEMATICS.MSD.V1
auto.offset.reset 如果KafkaConsumer没有初始化的offset的话,就会采用这个策略,默认是
latest
,这样的话就会忽略第一条KafkaProducer生产的消息。你把
auto.offset.reset
改成latest
,或许能解决你的问题[2019-08-16 08:49:32.864][main][INFO ][ConsumerCoordinator 472] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Revoking previously assigned partitions [] [2019-08-16 08:49:32.865][main][INFO ][AbstractCoordinator 509] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] (Re-)joining group [2019-08-16 08:49:33.829][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:33.830][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:33.831][main][DEBUG][BasicUtil 17] - took 1007ms. [2019-08-16 08:49:33.831][main][INFO ][RCStep 668] - Polling: ... [2019-08-16 08:49:34.832][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:34.833][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:34.833][main][DEBUG][BasicUtil 17] - took 2009ms. [2019-08-16 08:49:34.833][main][INFO ][RCStep 668] - Polling: ... [2019-08-16 08:49:35.834][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:35.834][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:35.834][main][DEBUG][BasicUtil 17] - took 3010ms. [2019-08-16 08:49:35.834][main][INFO ][RCStep 668] - Polling: ... [2019-08-16 08:49:35.895][main][INFO ][AbstractCoordinator 473] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Successfully joined group with generation 1 [2019-08-16 08:49:35.896][main][INFO ][ConsumerCoordinator 280] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Setting newly assigned partitions [CVC.TELEMATICS.MSD.V1-11, CVC.TELEMATICS.MSD.V1-10, CVC.TELEMATICS.MSD.V1-1, CVC.TELEMATICS.MSD.V1-0, CVC.TELEMATICS.MSD.V1-5, CVC.TELEMATICS.MSD.V1-4, CVC.TELEMATICS.MSD.V1-3, CVC.TELEMATICS.MSD.V1-2, CVC.TELEMATICS.MSD.V1-9, CVC.TELEMATICS.MSD.V1-8, CVC.TELEMATICS.MSD.V1-7, CVC.TELEMATICS.MSD.V1-6] [2019-08-16 08:49:35.932][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-11 to offset 2. [2019-08-16 08:49:35.933][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-10 to offset 0. [2019-08-16 08:49:35.933][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-1 to offset 0. [2019-08-16 08:49:35.933][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-0 to offset 0. [2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-5 to offset 0. [2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-4 to offset 147. [2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-3 to offset 0. [2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-2 to offset 0. [2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-9 to offset 9. [2019-08-16 08:49:35.934][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-8 to offset 1. [2019-08-16 08:49:35.935][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-7 to offset 0. [2019-08-16 08:49:35.935][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-3, groupId=telematics-ft-sync] Resetting offset for partition CVC.TELEMATICS.MSD.V1-6 to offset 0. [2019-08-16 08:49:36.835][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:36.836][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:36.836][main][DEBUG][BasicUtil 17] - took 4012ms. [2019-08-16 08:49:36.836][main][INFO ][RCStep 668] - Polling: ... [2019-08-16 08:49:37.837][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:37.838][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:37.838][main][DEBUG][BasicUtil 17] - took 5014ms. [2019-08-16 08:49:37.838][main][INFO ][RCStep 668] - Polling: ... [2019-08-16 08:49:38.839][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:38.839][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:38.839][main][DEBUG][BasicUtil 17] - took 6015ms. [2019-08-16 08:49:38.839][main][INFO ][RCStep 668] - Polling: ... [2019-08-16 08:49:39.840][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:39.840][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:39.840][main][DEBUG][BasicUtil 17] - took 7016ms. [2019-08-16 08:49:39.840][main][INFO ][RCStep 668] - Polling: ... [2019-08-16 08:49:40.841][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:40.841][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:40.841][main][DEBUG][BasicUtil 17] - took 8017ms. [2019-08-16 08:49:40.842][main][INFO ][RCStep 668] - Polling: ... [2019-08-16 08:49:41.842][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:41.843][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:41.843][main][DEBUG][BasicUtil 17] - took 9019ms. [2019-08-16 08:49:41.843][main][INFO ][RCStep 668] - Polling: ... [2019-08-16 08:49:42.844][main][INFO ][RCStep 672] - partitions:[] [2019-08-16 08:49:42.844][main][INFO ][RCStep 675] - records.count=0 [2019-08-16 08:49:42.844][main][DEBUG][BasicUtil 17] - took 10020ms. [2019-08-16 08:49:42.846][main][DEBUG][RCStep 637] - get ecall/bcall eventId:null
[2019-08-15 05:54:57.176][main][INFO ][RCStep 672] - partitions:[]
重新joined group,partition为空。 。。。
你的问题我都看乱了。
这个不该是consumer先已经join进去了,为什么还会 rejoined group。
都Fetch到offset了, 为啥就poll不到数据?
[2019-09-11 06:05:06.087][main][INFO ][AbstractCoordinator 677] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Discovered group coordinator 192.168.31.14:9092 (id: 2147482641 rack: null) [2019-09-11 06:05:06.094][main][INFO ][ConsumerCoordinator 472] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Revoking previously assigned partitions [] [2019-09-11 06:05:06.094][main][INFO ][AbstractCoordinator 509] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] (Re-)joining group [2019-09-11 06:05:07.017][main][INFO ][RCStep 239] - records.count=0 [2019-09-11 06:05:07.018][main][DEBUG][BasicUtil 17] - took 1006ms. [2019-09-11 06:05:07.018][main][DEBUG][RCStep 235] - start polling ... [2019-09-11 06:05:08.020][main][INFO ][RCStep 239] - records.count=0 [2019-09-11 06:05:08.020][main][DEBUG][BasicUtil 17] - took 2008ms. [2019-09-11 06:05:08.020][main][DEBUG][RCStep 235] - start polling ... [2019-09-11 06:05:09.022][main][INFO ][RCStep 239] - records.count=0 [2019-09-11 06:05:09.022][main][DEBUG][BasicUtil 17] - took 3010ms. [2019-09-11 06:05:09.022][main][DEBUG][RCStep 235] - start polling ... [2019-09-11 06:05:09.148][main][INFO ][AbstractCoordinator 473] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Successfully joined group with generation 1 [2019-09-11 06:05:09.149][main][INFO ][ConsumerCoordinator 280] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Setting newly assigned partitions [CVC.TELEMATICS.RESPONSE.V1.AppMocker001-8, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-9, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-6, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-7, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-10, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-11, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-0, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-1, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-4, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-5, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-2, CVC.TELEMATICS.RESPONSE.V1.AppMocker001-3] [2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-8 to offset 0. [2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-9 to offset 0. [2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-6 to offset 0. [2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-7 to offset 0. [2019-09-11 06:05:09.186][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-10 to offset 0. [2019-09-11 06:05:09.187][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-11 to offset 0. [2019-09-11 06:05:09.187][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-0 to offset 0. [2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-1 to offset 0. [2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-4 to offset 0. [2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-5 to offset 0. [2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-2 to offset 0. [2019-09-11 06:05:09.188][main][INFO ][Fetcher 601] - [Consumer clientId=consumer-1, groupId=telematics-ft-rc] Resetting offset for partition CVC.TELEMATICS.RESPONSE.V1.AppMocker001-3 to offset 22. [2019-09-11 06:05:10.023][main][INFO ][RCStep 239] - records.count=0 [2019-09-11 06:05:10.023][main][DEBUG][BasicUtil 17] - took 4011ms. [2019-09-11 06:05:10.023][main][DEBUG][RCStep 235] - start polling ... [2019-09-11 06:05:11.025][main][INFO ][RCStep 239] - records.count=0 [2019-09-11 06:05:11.025][main][DEBUG][BasicUtil 17] - took 5013ms. [2019-09-11 06:05:11.025][main][DEBUG][RCStep 235] - start polling ...
https://www.orchome.com/451
参考:seek
你的答案