Flink_EventTime 与 Window

编程入门 行业动态 更新时间:2024-10-22 20:31:23

Flink_EventTime 与 <a href=https://www.elefans.com/category/jswz/34/1768041.html style=Window"/>

Flink_EventTime 与 Window

文章目录

  • 1 EventTime 的引入
  • 2. Watermark
    • 2.1 基本概念
    • 2.2 Watermark 的引入
  • 3 EventTimeWindow API
    • 3.1 滚动窗口(TumblingEventTimeWindows)
    • 3.2 滑动窗口(SlidingEventTimeWindows)
    • 3.3 会话窗口(EventTimeSessionWindows)

1 EventTime 的引入

在 Flink 的流式处理中,绝大部分的业务都会使用 eventTime,一般只在 eventTime 无法使用时,才会被迫使用 ProcessingTime 或者 IngestionTime。 如果要使用 EventTime,那么需要引入 EventTime 的时间属性,引入方式如下所示:

val env = StreamExecutionEnvironment.getExecutionEnvironment 
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)

2. Watermark

2.1 基本概念


2.2 Watermark 的引入

val env = StreamExecutionEnvironment.getExecutionEnvironment 
// 从调用时刻开始给 env 创建的每一个 stream 追加时间特征 
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime) val stream = 
env.readTextFile("eventTest.txt").assignTimestampsAndWatermarks( 
new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(200 
)) {override def extractTimestamp(t: String): Long = {// EventTime 是日志生成时间,我们从日志中解析 EventTime t.split(" ")(0).toLong } })

3 EventTimeWindow API


3.1 滚动窗口(TumblingEventTimeWindows)

参考代码

package com.czxy.flink.stream.waterwindowimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._object TumblingEventTimeWindowsDemo {def main(args: Array[String]): Unit = {/*** 步骤:* 1.创建流处理环境* 2.设置EventTime* 3.构建数据源* 4.设置水印* 5.逻辑处理* 6.引入滚动窗口TumblingEventTimeWindows* 7.聚合操作* 8.输出打印* 9.执行程序*///1.创建流处理环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.设置EventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//3.构建数据源//数据格式: 1000 helloval socketSource = env.socketTextStream("node01",9999)//4.设置水印val waterMark: DataStream[String] = socketSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {override def extractTimestamp(element: String): Long = {val eventTime: Long = element.split(" ")(0).toLongeventTime}})//5.逻辑处理val groupStream: KeyedStream[(String, Int), String] = waterMark.map(x=>x.split(" ")(1)).map((_,1)).keyBy(_._1)//6.引入滚动窗口TumblingEventTimeWindowsval windowStream: WindowedStream[(String, Int), String, TimeWindow] = groupStream.window(TumblingEventTimeWindows.of(Time.seconds(3)))//7.聚合操作val result: DataStream[(String, Int)] = windowStream.sum(1)// val resultDataStream: DataStream[(String, Int)] = windowStream.reduce((v1, v2)=>(v1._1,v1._2+v2._2))//8.输出打印result.print()//9.执行程序env.execute(this.getClass.getSimpleName)}
}

结果是按照 Event Time 的时间窗口计算得出的,而无关系统的时间(包括输入的快慢)

3.2 滑动窗口(SlidingEventTimeWindows)

参考代码

package com.czxy.flink.stream.waterwindowimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._object SlidingEventTimeWindowsDemo {def main(args: Array[String]): Unit = {/** * 步骤:* 1.创建流处理环境* 2.设置EventTime* 3.构建数据源* 4.设置水印* 5.逻辑处理* 6.引入滑动窗口SlidingEventTimeWindows* 7.聚合操作* 8.输出打印* 9.执行程序*///1.创建流处理环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.设置EventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//3.构建数据源//数据格式为:1000 helloval socketSource: DataStream[String] = env.socketTextStream("node01", 9999)//4.设置水印val waterMark: DataStream[String] = socketSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {override def extractTimestamp(element: String): Long = {val eventTime: Long = element.split(" ")(0).toLongeventTime}})//5.逻辑处理val groupStream: KeyedStream[(String, Int), String] = waterMark.map(x => x.split(" ")(1)).map((_, 1)).keyBy(_._1)//6.引入滑动窗口SlidingEventTimeWindowsval windowStream: WindowedStream[(String, Int), String, TimeWindow] = groupStream.window(SlidingEventTimeWindows.of(Time.seconds(5), Time.seconds(2)))//7.聚合计算val result: DataStream[(String, Int)] = windowStream.sum(1)//8.打印输出result.print()//9.执行程序env.execute(this.getClass.getSimpleName)}
}

3.3 会话窗口(EventTimeSessionWindows)

相邻两次数据的 EventTime 的时间差超过指定的时间间隔就会触发执行。如果加入 Watermark,那么当触发执行时,所有满足时间间隔而还没有触发的 Window 会同时触发执 行。

参考代码

package com.czxy.flink.stream.waterwindowimport org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala.{DataStream, KeyedStream, StreamExecutionEnvironment, WindowedStream}
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows
import org.apache.flink.streaming.api.windowing.time.Time
import org.apache.flink.streaming.api.windowing.windows.TimeWindow
import org.apache.flink.api.scala._object EventTimeSessionWindowsDemo {def main(args: Array[String]): Unit = {/*** 步骤:* 1.创建流处理环境* 2.设置EventTime* 3.构建数据源* 4.设置水印* 5.逻辑处理* 6.引入会话窗口EventTimeSessionWindows* 7.聚合操作* 8.输出打印*///1.创建流处理环境val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment//2.设置EventTimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)//3.构建数据源//数据格式为:1000 helloval socketSource: DataStream[String] = env.socketTextStream("node01", 9999)//4.设置水印val waterMark: DataStream[String] = socketSource.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.seconds(0)) {override def extractTimestamp(element: String): Long = {val eventTime: Long = element.split(" ")(0).toLongeventTime}})//5.逻辑处理val groupStream: KeyedStream[(String, Int), String] = waterMark.map(x => x.split(" ")(1)).map((_, 1)).keyBy(_._1)//6.引入会话窗口EventTimeSessionWindowsval windowStream: WindowedStream[(String, Int), String, TimeWindow] = groupStream.window(EventTimeSessionWindows.withGap(Time.seconds(3)))//7.聚合计算val result = windowStream.sum(1)//8.打印输出result.print()//9.执行程序env.execute(this.getClass.getSimpleName)}
}

更多推荐

Flink_EventTime 与 Window

本文发布于:2023-07-27 22:08:21,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1225159.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Flink_EventTime   Window

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!