kafka-stream运行报错

代码如下:

public static void main(String[] args) throws IOException {

    Map<String, Object> props = new HashMap<>();
    props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application");
    props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "vrv145:9092,vrv148:9092");
    props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    StreamsConfig config = new StreamsConfig(props);

    KStreamBuilder builder = new KStreamBuilder();

    KStream<String, String> source = builder.stream("net");
    KTable<String, Long> counts = source
            .flatMapValues(value -> Arrays.asList(value.toLowerCase(Locale.getDefault()).split(" ")))
            .map((key, value) -> new KeyValue<>(value, value)).groupByKey().count("Counts");

    counts.print();

    KafkaStreams streams = new KafkaStreams(builder, config);
    streams.start();

}

报错如下:

Exception in thread "StreamThread-1" java.lang.UnsatisfiedLinkError: C:UserslizjAppDataLocalTemplibrocksdbjni4991853238127536656.dll: Can't find dependent libraries
    at java.lang.ClassLoader$NativeLibrary.load(Native Method)
    at java.lang.ClassLoader.loadLibrary0(ClassLoader.java:1937)
    at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1822)
    at java.lang.Runtime.load0(Runtime.java:809)
    at java.lang.System.load(System.java:1086)
    at org.rocksdb.NativeLibraryLoader.loadLibraryFromJar(NativeLibraryLoader.java:78)
    at org.rocksdb.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:56)
    at org.rocksdb.RocksDB.loadLibrary(RocksDB.java:64)
    at org.rocksdb.RocksDB.<clinit>(RocksDB.java:35)
    at org.rocksdb.Options.<clinit>(Options.java:22)
    at org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:115)
    at org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:148)
    at org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:39)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore$7.run(MeteredKeyValueStore.java:100)
    at org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:188)
    at org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:131)
    at org.apache.kafka.streams.state.internals.CachingKeyValueStore.init(CachingKeyValueStore.java:62)
    at org.apache.kafka.streams.processor.internals.AbstractTask.initializeStateStores(AbstractTask.java:86)
    at org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:141)
    at org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:834)
    at org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1207)
    at org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1180)
    at org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:937)
    at org.apache.kafka.streams.processor.internals.StreamThread.access$500(StreamThread.java:69)
    at org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:236)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:255)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:339)
    at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
    at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
    at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
    at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
    at org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
    at org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)






发表于: 7月前   最后更新时间: 7月前   游览量:1026
上一条: kafka 在异常断开某一台服务器,会导致重复消费
下一条: AdminClient创建完Topic后,KafkaProducer可以正常生产数据,但是KafkaConsumer消费不到数据,求助

评论…


  • https://github.com/confluentinc/kafka-streams-examples/issues/91
    我用的kafka版本是0.10.2.0
    求教大神,这个怎么解决啊
  • 评论…
    • in this conversation