添加processor代码如下:
addProcessor("cmccTransactionProcessor", new StreamFilter(new Predicate<String, String>() {
@Override
public boolean test(String key, String value) {
System.out.println("false");
return false;
}
}, false), "cmccSource")
创建Processor代码如下:
public class StreamFilter implements ProcessorSupplier<String, String> {
private final Predicate<String, String> predicate;
private final boolean filterNot;
public StreamFilter(Predicate<String, String> predicate, boolean filterNot) {
this.predicate = predicate;
this.filterNot = filterNot;
}
@Override
public Processor<String, String> get() {
return new KStreamFilterProcessor();
}
private class KStreamFilterProcessor extends AbstractProcessor<String, String> {
@Override
public void process(String key, String value) {
if (filterNot ^ predicate.test(key, value)) {
context().forward(key, value);
}
}
}
}
测试发送数据后,下游的sink topic总是能收到数据,搞不明白哪里有问题,求大家看看解答一下