flink的CoProcessFunction使用示例

编程入门 行业动态 更新时间:2024-10-23 01:34:17

flink的CoProcessFunction使用<a href=https://www.elefans.com/category/jswz/34/1770116.html style=示例"/>

flink的CoProcessFunction使用示例

背景

在flink中对两个流进行connect之后进行出处理的场景很常见,我们本文就以书中的一个例子为例说明下实现一个CoProcessFunction的一些要点

实现CoProcessFunction的一些要点

这个例子举例的是当收到某个传感器放行的控制消息时,从传感器传来的温度流消息会被运行向下游传递一段时间

/*** 展示CoProcessFunction+onTimer使用方法的例子*/
public class CoProcessFunctionTimers {public static void main(String[] args) throws Exception {// set up the streaming execution environmentStreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// use event time for the applicationenv.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);// 控制消息流允许传感器消息流通过指定长度的时间DataStream<Tuple2<String, Long>> filterSwitches = env.fromElements(// forward readings of sensor_2 for 10 secondsTuple2.of("sensor_2", 10_000L),// forward readings of sensor_7 for 1 minuteTuple2.of("sensor_7", 60_000L));// 传感器消息流DataStream<SensorReading> readings = env// SensorSource generates random temperature readings.addSource(new SensorSource());//传感器消息流connet控制消息流,并且按照传感器id作为key进行分组DataStream<SensorReading> forwardedReadings = readings//连接控制消息流.connect(filterSwitches)// 按照传感器id分组.keyBy(r -> r.id, s -> s.f0)// 应用CoProcessFunction + onTimer函数.process(new ReadingFilter());forwardedReadings.print();env.execute("Filter sensor readings");}//应用CoProcessFunction + onTimer函数,这已经按照key=传感器id分好组public static class ReadingFilter extends CoProcessFunction<SensorReading, Tuple2<String, Long>, SensorReading> {// 传感器开关状态--键值分区状态,key是传感器idprivate ValueState<Boolean> forwardingEnabled;// 保存传感器开关持续时间的状态--键值分区状态,key是传感器idprivate ValueState<Long> disableTimer;// 初始化键值分区状态 key是传感器idpublic void open(Configuration parameters) throws Exception {forwardingEnabled = getRuntimeContext().getState(new ValueStateDescriptor<>("filterSwitch", Types.BOOLEAN));disableTimer = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timer", Types.LONG));}@Overridepublic void processElement1(SensorReading r, Context ctx, Collector<SensorReading> out) throws Exception {// 处理传感器消息流,首先检查key是传感器id对应的键值分区状态,如果开启,那么这个传感器消息就可以正常通过Boolean forward = forwardingEnabled.value();if (forward != null && forward) {out.collect(r);}}@Overridepublic void processElement2(Tuple2<String, Long> s, Context ctx, Collector<SensorReading> out) throws Exception {//控制流消息过来后,更新键值分区的开关状态为true, key是传感器idforwardingEnabled.update(true);//控制流消息过来后,更新键值分区的开关状态为true的持续时长的定时器, key是传感器idlong timerTimestamp = ctx.timerService().currentProcessingTime() + s.f1;Long curTimerTimestamp = disableTimer.value();if (curTimerTimestamp == null || timerTimestamp > curTimerTimestamp) {// remove current timerif (curTimerTimestamp != null) {ctx.timerService().deleteProcessingTimeTimer(curTimerTimestamp);}// register new timerctx.timerService().registerProcessingTimeTimer(timerTimestamp);disableTimer.update(timerTimestamp);}}// 键值开关状态的持续时间定时器,key是传感器id,注意,在ontimer方法中,也可以通过out.collect的方式向下游算子发送消息public void onTimer(long ts, OnTimerContext ctx, Collector<SensorReading> out) throws Exception {// 定时器时间到了之后,清理掉传感器的开关状态forwardingEnabled.clear();disableTimer.clear();}}
}

以上就是实现一个CoProcessFunction的大概逻辑

更多推荐

flink的CoProcessFunction使用示例

本文发布于:2023-11-16 01:05:08,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1611139.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:示例   flink   CoProcessFunction

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!