kafka stream filter不能过滤数据

王建荣 发表于: 2018-04-13   最后更新时间: 2018-04-13 11:01:54   4,284 游览

添加processor代码如下:

addProcessor("cmccTransactionProcessor", new StreamFilter(new Predicate<String, String>() {

            @Override
            public boolean test(String key, String value) {
                System.out.println("false");
                return false;
            }

        }, false), "cmccSource")

创建Processor代码如下:

public class StreamFilter implements ProcessorSupplier<String, String> {

    private final Predicate<String, String> predicate;
    private final boolean filterNot;

    public StreamFilter(Predicate<String, String> predicate, boolean filterNot) {
        this.predicate = predicate;
        this.filterNot = filterNot;
    }

    @Override
    public Processor<String, String> get() {
        return new KStreamFilterProcessor();
    }

    private class KStreamFilterProcessor extends AbstractProcessor<String, String> {
        @Override
        public void process(String key, String value) {
            if (filterNot ^ predicate.test(key, value)) {
                context().forward(key, value);
            }
        }
    }

}

测试发送数据后,下游的sink topic总是能收到数据,搞不明白哪里有问题,求大家看看解答一下

发表于 2018-04-13
添加评论

正常情况下,filterNot ^ predicate.test(key, value)判断中,predicate.test(key, value)为true时,消息向下游传送,否则应该过滤掉,不过测试无论如何都是继续向下游processor发送,比较疑惑

版本时0.10.2.0

你的答案

查看kafka相关的其他问题或提一个您自己的问题