kafka connect的sink connector中put方法一次拉取的消息量如何配置?
@Override
public void put(final Collection<SinkRecord> sinkRecords) {}
如上sinkRecords.size()一直都是四百多,配置过consumer.properties的
max.poll.records
max.partition.fetch.bytes
fetch.min.bytes
fetch.max.wait.ms
都没用
刚开始用kafka connect,希望不吝赐教。
 
                                 
        
fetch.max.bytes试试这个,参考来自:
https://www.orchome.com/535
没用呀,我的consumer.properties中配的是这样的,connector运行稳定之后每个批次都是414条
max.poll.records=9999 max.partition.fetch.bytes=104857600 fetch.min.bytes=2097152 fetch.max.wait.ms=10000 fetch.max.bytes=104857600kafka连接器底层其实就是消费者,一个批次返回的逻辑是:
当要打包消息的批次数量不满足
fetch.min.bytes时,则等待fetch.max.wait.ms之后,不管满足不满足,将这个批次的消息发送给消费者。按照你的配置:
批次总大小是一定是大于fetch.min.bytes,因为你设置了等待10秒,所以基本只有fetch.min.bytes在起作用。其他的参数都够用了,所以你只需通过继续增加
fetch.min.bytes来观察变化。
你好,我刚才把fetch.min.bytes增大了10倍,可每次拉取时sinkRecords.size()还是稳定在414,感觉consumer.properties不起作用
consumer.properties是没用的,这个是在使用消费者命令行时,指定配置文件使用的,如:bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic test --consumer.config config/consumer.properties你在代码里,应该类似消费者这种
Properties props = new Properties(); props.setProperty("fetch.min.bytes", "2097152"); ...嗯嗯,原来如此,十分感谢你的耐心;我还想问一下,kafka sink connect怎么对消费者进行配置呢?
我只知道需要重写
start(),put(),flush(),stop()方法,貌似没有地方需要新建消费者,也就没有地方对消费者进行配置props.setProperty("fetch.min.bytes", "2097152");应该写在哪里呢?
我今天在外面,现在没办法帮你查看。
大部分场景下,kafka是批次获取消息的,默认的配置基本能达到物理机的最大能力,一般都卡在处理速度上,而不是拉取上。
我懂了,就是说默认配置就够用了,重点优化put()方法中的处理逻辑,非常感谢您的解答!
你的答案