kafka stream filter不能过滤数据

王建荣 发表于: 2018-04-13   最后更新时间: 2018-04-13  
  •   0 订阅,431 游览

添加processor代码如下:

addProcessor("cmccTransactionProcessor", new StreamFilter(new Predicate<String, String>() {

            @Override
            public boolean test(String key, String value) {
                System.out.println("false");
                return false;
            }

        }, false), "cmccSource")

创建Processor代码如下:

public class StreamFilter implements ProcessorSupplier<String, String> {

    private final Predicate<String, String> predicate;
    private final boolean filterNot;

    public StreamFilter(Predicate<String, String> predicate, boolean filterNot) {
        this.predicate = predicate;
        this.filterNot = filterNot;
    }

    @Override
    public Processor<String, String> get() {
        return new KStreamFilterProcessor();
    }

    private class KStreamFilterProcessor extends AbstractProcessor<String, String> {
        @Override
        public void process(String key, String value) {
            if (filterNot ^ predicate.test(key, value)) {
                context().forward(key, value);
            }
        }
    }

}

测试发送数据后,下游的sink topic总是能收到数据,搞不明白哪里有问题,求大家看看解答一下







发表于: 3月前   最后更新时间: 3月前   游览量:431
上一条: 到头了!
下一条: 已经是最后了!

评论…


  • 版本时0.10.2.0
    正常情况下,filterNot ^ predicate.test(key, value)判断中,predicate.test(key, value)为true时,消息向下游传送,否则应该过滤掉,不过测试无论如何都是继续向下游processor发送,比较疑惑
  • 评论…
    • in this conversation
      提问