返回到文章

采纳

编辑于

kafka stream filter不能过滤数据

kafka

添加processor代码如下:

addProcessor("cmccTransactionProcessor", new StreamFilter(new Predicate<String, String>() {

            @Override
            public boolean test(String key, String value) {
                System.out.println("false");
                return false;
            }

        }, false), "cmccSource")

创建Processor代码如下:

public class StreamFilter implements ProcessorSupplier<String, String> {

    private final Predicate<String, String> predicate;
    private final boolean filterNot;

    public StreamFilter(Predicate<String, String> predicate, boolean filterNot) {
        this.predicate = predicate;
        this.filterNot = filterNot;
    }

    @Override
    public Processor<String, String> get() {
        return new KStreamFilterProcessor();
    }

    private class KStreamFilterProcessor extends AbstractProcessor<String, String> {
        @Override
        public void process(String key, String value) {
            if (filterNot ^ predicate.test(key, value)) {
                context().forward(key, value);
            }
        }
    }

}

测试发送数据后,下游的sink topic总是能收到数据,搞不明白哪里有问题,求大家看看解答一下