Flink DataStream Evictors(驱逐器)"/>
Flink DataStream Evictors(驱逐器)
Flink DataStream Evictors(驱逐器)
Flink 窗口模式允许特别的算子Evictor
(驱逐器),应用在WindowAssigner
和trigger
之间。通过evictor()
方法使用。
Evictor
驱逐器能够在element
进入Window
窗口聚合之前进行移除数据或者在进入Window窗口聚合后,Trigger触发计算操作之前移除数据。
Evictor的2个方法:
-
evictBefore():移除窗口元素,在Window Function之前调用。
-
evictAfter():移除窗口元素,在Window Function之后调用。
两个方法的参数都一样,分别是:
- Iterable<TimestampedValue> elements:当前窗口中的元素
- int size:当前窗口中的元素数量
- W window:当前窗口
- EvictorContext evictorContext:evict的上下文
Flink自带的3个Evictor:
- CountEvictor:保持窗口中用户指定数量的元素,并从窗口缓冲区的开头丢弃剩余的元素。
下面是CountEvictor类代码,我们可以直接定位到evict方法。看实现逻辑。
@PublicEvolving
public class CountEvictor<W extends Window> implements Evictor<Object, W> {private static final long serialVersionUID = 1L;private final long maxCount;private final boolean doEvictAfter;private CountEvictor(long count, boolean doEvictAfter) {this.maxCount = count;this.doEvictAfter = doEvictAfter;}private CountEvictor(long count) {this.maxCount = count;this.doEvictAfter = false;}@Overridepublic void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {if (!doEvictAfter) {evict(elements, size, ctx);}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {if (doEvictAfter) {evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (size <= maxCount) {return;} else {int evictedCount = 0;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){iterator.next();evictedCount++;if (evictedCount > size - maxCount) {break;} else {iterator.remove();}}}}/*** Creates a {@code CountEvictor} that keeps the given number of elements.* Eviction is done before the window function.** @param maxCount The number of elements to keep in the pane.*/public static <W extends Window> CountEvictor<W> of(long maxCount) {return new CountEvictor<>(maxCount);}/*** Creates a {@code CountEvictor} that keeps the given number of elements in the pane* Eviction is done before/after the window function based on the value of doEvictAfter.** @param maxCount The number of elements to keep in the pane.* @param doEvictAfter Whether to do eviction after the window function.*/public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter) {return new CountEvictor<>(maxCount, doEvictAfter);}
}
- DeltaEvictor:取一个DeltaFunction和一个threshold,计算窗口缓冲区中最后一个元素与其余每个元素之间的差值,并删除Delta大于或等于阈值的值。
DeltaEvictor的evict的逻辑,需要实现一个deltaFunction对象的getDelta方法。与传入的threshold参数比较。来进行evict数据。
private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {TimestampedValue<T> lastElement = Iterables.getLast(elements);for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){TimestampedValue<T> element = iterator.next();if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {iterator.remove();}}
}
- TimeEvictor:以interval毫秒为单位作为参数,对于给定窗口,它查找max_ts其元素的最大时间戳,并删除时间戳小于的所有元素max_ts-interval。
与CountEvictor的逻辑类似。windowSize是一个时间的毫秒值。
private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (!hasTimestamp(elements)) {return;}long currentTime = getMaxTimestamp(elements);long evictCutoff = currentTime - windowSize;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}
}
默认情况下,Evictor在Window Function之前实现evict逻辑,通过代码应该可以看出。
更多推荐
Flink DataStream Evictors(驱逐器)
发布评论