求助Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Extracted timestamp value is negative, which is not allowed.?

小蕊 发表于: 2016-11-30   最后更新时间: 2016-11-30  
  •   3 订阅,162 游览

我使用logstash往kafka里丢消息,然后用的官网提供的例子WordCountDemo去处理的时候报以下错误:

Exception in thread "StreamThread-1" org.apache.kafka.streams.errors.StreamsException: Extracted timestamp value is negative, which is not allowed.
    at org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:111)
    at org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117)
    at org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:144)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:415)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:242)

另外,我用kafka console控台直接往topic里丢的消息是正常的,只有使用logstash的时候报这个错误。

我的版本是kafka_2.11-0.10.1.0,求帮助。







发表于: 1月前   最后更新时间: 1月前   游览量:162
上一条: kafka SSL 抓包,看到的协议只有TCP,这样正常吗
下一条: 求助,报The group member's supported protocols are incompatible with those of existing members.?
评论…

  • 是由于logstash中的kafka版本低,并没有设置timestamp(时间戳),导致你处理的时候报这个错误。
    可以增加以下配置,设置成当前墙钟
    props.put(StreamsConfig.TIMESTAMP_EXTRACTOR_CLASS_CONFIG, WallclockTimestampExtractor.class);
  • 评论…
    • in this conversation
      提问