我使用logstash往kafka里丢消息,然后用的官网提供的例子WordCountDemo
去处理的时候报以下错误:
Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Extracted timestamp value is negative, which is not allowed.
at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:111)
at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:144)
at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:415)
at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)
另外,我用kafka console控台直接往topic里丢的消息是正常的,只有使用logstash的时候报这个错误。
我的版本是kafka_2.11-0.10.1.0,求帮助。
是由于logstash中的kafka版本低,并没有设置timestamp(时间戳),导致你处理的时候报这个错误。
可以增加以下配置,设置成当前墙钟
props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
对的,是这个问题,设置一下就好啦,感谢~!
你的答案