kafka connect的sink connector
中put
方法一次拉取的消息量如何配置?
@Override
public void put(final Collection<SinkRecord> sinkRecords) {}
如上sinkRecords.size()
一直都是四百多,配置过consumer.properties
的
max.poll.records
max.partition.fetch.bytes
fetch.min.bytes
fetch.max.wait.ms
都没用
刚开始用kafka connect,希望不吝赐教。
fetch.max.bytes
试试这个,参考来自:
https://www.orchome.com/535
没用呀,我的consumer.properties中配的是这样的,connector运行稳定之后每个批次都是414条
max.poll.records=9999 max.partition.fetch.bytes=104857600 fetch.min.bytes=2097152 fetch.max.wait.ms=10000 fetch.max.bytes=104857600
kafka连接器底层其实就是消费者,一个批次返回的逻辑是:
当要打包消息的批次数量不满足
fetch.min.bytes
时,则等待fetch.max.wait.ms
之后,不管满足不满足,将这个批次的消息发送给消费者。按照你的配置:
批次总大小
是一定是大于fetch.min.bytes
,因为你设置了等待10秒,所以基本只有fetch.min.bytes
在起作用。其他的参数都够用了,所以你只需通过继续增加
fetch.min.bytes
来观察变化。
你好,我刚才把fetch.min.bytes增大了10倍,可每次拉取时sinkRecords.size()还是稳定在414,感觉consumer.properties不起作用
consumer.properties
是没用的,这个是在使用消费者命令行
时,指定配置文件使用的,如:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties
你在代码里,应该类似消费者这种
Properties props = new Properties(); props.setProperty("fetch.min.bytes", "2097152"); ...
嗯嗯,原来如此,十分感谢你的耐心;我还想问一下,kafka sink connect怎么对消费者进行配置呢?
我只知道需要重写
start()
,put()
,flush()
,stop()
方法,貌似没有地方需要新建消费者,也就没有地方对消费者进行配置props.setProperty("fetch.min.bytes", "2097152");
应该写在哪里呢?
我今天在外面,现在没办法帮你查看。
大部分场景下,kafka是批次获取消息的,默认的配置基本能达到物理机的最大能力,一般都卡在处理速度上,而不是拉取上。
我懂了,就是说默认配置就够用了,重点优化put()方法中的处理逻辑,非常感谢您的解答!
你的答案