Flink DataStream Evictors(驱逐器)

编程入门 行业动态 更新时间:2024-10-08 08:23:47

<a href=https://www.elefans.com/category/jswz/34/1769678.html style=Flink DataStream Evictors(驱逐器)"/>

Flink DataStream Evictors(驱逐器)

Flink DataStream Evictors(驱逐器)

Flink 窗口模式允许特别的算子Evictor(驱逐器),应用在WindowAssignertrigger之间。通过evictor()方法使用。

Evictor驱逐器能够在element进入Window窗口聚合之前进行移除数据或者在进入Window窗口聚合后,Trigger触发计算操作之前移除数据。

Evictor的2个方法:

  • evictBefore():移除窗口元素,在Window Function之前调用。

  • evictAfter():移除窗口元素,在Window Function之后调用。

两个方法的参数都一样,分别是:

  • Iterable<TimestampedValue> elements:当前窗口中的元素
  • int size:当前窗口中的元素数量
  • W window:当前窗口
  • EvictorContext evictorContext:evict的上下文

Flink自带的3个Evictor:

  • CountEvictor:保持窗口中用户指定数量的元素,并从窗口缓冲区的开头丢弃剩余的元素。

下面是CountEvictor类代码,我们可以直接定位到evict方法。看实现逻辑。

@PublicEvolving
public class CountEvictor<W extends Window> implements Evictor<Object, W> {private static final long serialVersionUID = 1L;private final long maxCount;private final boolean doEvictAfter;private CountEvictor(long count, boolean doEvictAfter) {this.maxCount = count;this.doEvictAfter = doEvictAfter;}private CountEvictor(long count) {this.maxCount = count;this.doEvictAfter = false;}@Overridepublic void evictBefore(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {if (!doEvictAfter) {evict(elements, size, ctx);}}@Overridepublic void evictAfter(Iterable<TimestampedValue<Object>> elements, int size, W window, EvictorContext ctx) {if (doEvictAfter) {evict(elements, size, ctx);}}private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (size <= maxCount) {return;} else {int evictedCount = 0;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext();){iterator.next();evictedCount++;if (evictedCount > size - maxCount) {break;} else {iterator.remove();}}}}/*** Creates a {@code CountEvictor} that keeps the given number of elements.* Eviction is done before the window function.** @param maxCount The number of elements to keep in the pane.*/public static <W extends Window> CountEvictor<W> of(long maxCount) {return new CountEvictor<>(maxCount);}/*** Creates a {@code CountEvictor} that keeps the given number of elements in the pane* Eviction is done before/after the window function based on the value of doEvictAfter.** @param maxCount The number of elements to keep in the pane.* @param doEvictAfter Whether to do eviction after the window function.*/public static <W extends Window> CountEvictor<W> of(long maxCount, boolean doEvictAfter) {return new CountEvictor<>(maxCount, doEvictAfter);}
}
  • DeltaEvictor:取一个DeltaFunction和一个threshold,计算窗口缓冲区中最后一个元素与其余每个元素之间的差值,并删除Delta大于或等于阈值的值。

DeltaEvictor的evict的逻辑,需要实现一个deltaFunction对象的getDelta方法。与传入的threshold参数比较。来进行evict数据。

private void evict(Iterable<TimestampedValue<T>> elements, int size, EvictorContext ctx) {TimestampedValue<T> lastElement = Iterables.getLast(elements);for (Iterator<TimestampedValue<T>> iterator = elements.iterator(); iterator.hasNext();){TimestampedValue<T> element = iterator.next();if (deltaFunction.getDelta(element.getValue(), lastElement.getValue()) >= this.threshold) {iterator.remove();}}
}
  • TimeEvictor:以interval毫秒为单位作为参数,对于给定窗口,它查找max_ts其元素的最大时间戳,并删除时间戳小于的所有元素max_ts-interval。

与CountEvictor的逻辑类似。windowSize是一个时间的毫秒值。

private void evict(Iterable<TimestampedValue<Object>> elements, int size, EvictorContext ctx) {if (!hasTimestamp(elements)) {return;}long currentTime = getMaxTimestamp(elements);long evictCutoff = currentTime - windowSize;for (Iterator<TimestampedValue<Object>> iterator = elements.iterator(); iterator.hasNext(); ) {TimestampedValue<Object> record = iterator.next();if (record.getTimestamp() <= evictCutoff) {iterator.remove();}}
}

默认情况下,Evictor在Window Function之前实现evict逻辑,通过代码应该可以看出。

更多推荐

Flink DataStream Evictors(驱逐器)

本文发布于:2024-03-09 18:48:27,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1725789.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Flink   DataStream   Evictors

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!