求助Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Extracted timestamp value is negative, which is not allowed.?

小蕊 发表于: 2016-11-30   最后更新时间: 2016-11-30 16:08:32   5,124 游览

我使用logstash往kafka里丢消息,然后用的官网提供的例子WordCountDemo去处理的时候报以下错误:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Extracted timestamp value is negative, which is not allowed.
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:111)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:144)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:415)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

另外,我用kafka console控台直接往topic里丢的消息是正常的,只有使用logstash的时候报这个错误。

我的版本是kafka_2.11-0.10.1.0,求帮助。

发表于 2016-11-30

是由于logstash中的kafka版本低,并没有设置timestamp(时间戳),导致你处理的时候报这个错误。

可以增加以下配置,设置成当前墙钟

props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);

小蕊 -> 半兽人 8年前

对的,是这个问题,设置一下就好啦,感谢~!

你的答案

查看kafka相关的其他问题或提一个您自己的问题