Flink 1. 13(二)时间和窗口

编程入门 行业动态 更新时间:2024-10-12 03:16:53

Flink 1. 13(二)时间和<a href=https://www.elefans.com/category/jswz/34/1771087.html style=窗口"/>

Flink 1. 13(二)时间和窗口

Flink(二)时间和窗口

      • 一.DataStream
        • 1.概述
        • 2.执行环境
        • 4.源算子
        • 5.转换算子
          • 5.1 基本转换算子
          • 5.2 聚合算子
        • 6.输出算子
          • 8.1 输出到文件
          • 8.2 输出到Kafka
          • 8.3 输出到redis
          • 8.4 输出到Elasticsearch
          • 8.5 JDBCSink — 输出到MySQL
          • 8.6 JDBCSink — 输出到ClickHouse
          • 8.7 自定义Sink 输出到Hbase
        • 7.富函数
        • 8.物理分区
      • 二.时间和窗口
        • 1.时间语义
        • 2.水位线
        • 3.窗口
        • 4.窗口API(重点)
          • 4.1 窗口分类
          • 4.2 窗口组成
          • 4.3 窗口函数 - 增量聚合函数
          • 4.4 窗口函数 - 全窗口函数
          • 4.5 增量函数 + 全窗口函数(常用)
        • 5.其他窗口API
          • 5.1 触发器
          • 5.2 移除器(Evictor)
        • 6.延迟数据处理 (三重保障)
        • 7.关于窗口、水位线延迟、窗口延迟、侧输出流总结

一.DataStream

1.概述

DataStream(数据流)本身是 Flink 中一个用来表示数据集合的类(Class),我们编写的Flink 代码其实就是基于这种数据类型的处理,所以这套核心 API 就以 DataStream 命名。对于批处理和流处理,我们都可以用这同一套 API 来实现

一个 Flink 程序,其实就是对 DataStream 的各种转换。具体来说,代码基本上都由以下几部分构成

  • 获取执行环境(execution environment)
  • 读取数据源(source)
  • 定义基于数据的转换操作(transformations)
  • 定义计算结果的输出位置(sink)
  • 触发程序执行(execute)

2.执行环境

编 写 Flink 程 序 的 第 一 步 , 就 是 创 建 执 行 环 境 。 我们要 获 取 的 执 行 环 境 , 是StreamExecutionEnvironment 类的对象,这是所有 Flink 程序的基础。在代码中创建执行环境的方式,就是调用这个类的静态方法,具体有以下三种

1.getExecutionEnvironment

直接调用 getExecutionEnvironment 方法。它会根据当前运行的上下文直接得到正确的结果:如果程序是独立运行的,就返回一个本地执行环境;如果是创建了 jar包,然后从命令行调用它并提交到集群执行,那么就返回集群的执行环境。也就是说,这个方法会根据当前运行的方式,自行决定该返回什么样的运行环境

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

2. createLocalEnvironment

这个方法返回一个本地执行环境。可以在调用时传入一个参数,指定默认的并行度;如果不传入,则默认并行度就是本地的 CPU 核心数

StreamExecutionEnvironment localEnv = StreamExecutionEnvironment.createLocalEnvironment();

3. createRemoteEnvironment

这个方法返回集群执行环境。需要在调用时指定 JobManager 的主机名和端口号,并指定要在集群中运行的 Jar 包

在获取到程序执行环境后,我们还可以对执行环境进行灵活的设置。比如可以全局设置程序的并行度、禁用算子链,还可以定义程序的时间语义、配置容错机制等

三种处理模式

  • 流执行模式(STREAMING)
    这是 DataStream API 最经典的模式,一般用于需要持续实时处理的无界数据流。默认情况下,程序使用的就是 STREAMING 执行模式,Flink 本身持有的就是流处理的世界观,即使是批量数据,也可以看作“有界流”来进行处理。所以 STREAMING 执行模式对于有界数据和无界数据都是有效的

  • 批执行模式(BATCH)
    专门用于批处理的执行模式, 这种模式下,Flink 处理作业的方式类似于 MapReduce 框架对于不会持续计算的有界数据,我们用这种模式处理会更方便

  • 自动模式(AUTOMATIC)
    在这种模式下,将由程序根据输入数据源是否有界,来自动选择执行模式

如何设置BATCH模式?

(1)通过命令行配置

bin/flink run -Dexecution.runtime-mode=BATCH ...

在提交作业时,增加 execution.runtime-mode 参数,指定值为 BATCH

(2)通过代码配置

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
4.源算子

Flink 可以从各种来源获取数据,然后构建 DataStream 进行转换处理。一般将数据的输入来源称为数据源(data source),而读取数据的算子就是源算子(source operator)。所以,source就是我们整个处理程序的输入端

Flink 代码中通用的添加 source 的方式,是调用执行环境的 addSource()方法:

DataStream<String> stream = env.addSource(...);

方法传入一个对象参数,需要实现 SourceFunction 接口;返回 DataStreamSource,读取数据的 source 操作是一个算子,得到的是一个数据流(DataStream)

普通可直接使用的源算子

public class SourceTest {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 方式一.从文件读取数据DataStreamSource<String> source = env.readTextFile("./input/clicks.txt");source.print();// 方式二.从集合读取数据ArrayList<Integer> list = new ArrayList<>();list.add(10);list.add(20);DataStreamSource<Integer> source1 = env.fromCollection(list);source1.print();// 方式三.读取Socket文本流DataStreamSource<String> source2 = env.socketTextStream("hadoop102", 7777);env.execute();}
}

Kafka源

引入依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
public class KafkaSource {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.配置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"100");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 参数一是主题 参数二是反序列化类型 DataStreamSource<String> kafkaSource =env.addSource(new FlinkKafkaConsumer<String>("click", new SimpleStringSchema(), properties));kafkaSource.print();env.execute();}
}

自定义源

大多数情况下,前面的数据源已经能够满足需要。但是凡事总有例外,如果遇到特殊情况,我们想要读取的数据源来自某个外部系统,而 flink 既没有预实现的方法、也没有提供连接器,又该怎么办呢?
那就只好自定义实现 SourceFunction 了

接下来我们创建一个自定义的数据源,实现 SourceFunction 接口
主要重写两个关键方法:

  • run()方法:使用运行时上下文对象(SourceContext)向下游发送数据
  • cancel()方法:通过标识位控制退出循环,来达到中断数据源的效果

自定义pojo Event

public class Event {public String user;  // 用户名public String url; // 用户访问的urlpublic Long timestamp; // 访问时间戳// 省略构造方法  get set toString
}

重写两个方法每隔1秒生成随机的模拟数据

public class ClickSource implements SourceFunction<Event> {// 声明一个布尔变量,作为控制数据生成的标识位private Boolean running = true;@Overridepublic void run(SourceContext<Event> ctx) throws Exception {Random random = new Random();    // 在指定的数据集中随机选取数据String[] users = {"Mary", "Alice", "Bob", "Cary","Sun","Joe"};String[] urls = {"./home", "./cart", "./fav", "./prod?id=1", "./prod?id=2","./like"};while (running) {ctx.collect(new Event(users[random.nextInt(users.length)],urls[random.nextInt(urls.length)],Calendar.getInstance().getTimeInMillis()));// 隔1秒生成一个点击事件,方便观测Thread.sleep(1000);}}@Overridepublic void cancel() {running = false;}
}

这里要注意的是 SourceFunction 接口定义的数据源,并行度只能设置为 1,所以如果我们想要自定义并行的数据源的话,需要实现ParallelSourceFunction

5.转换算子
5.1 基本转换算子

map

map 主要用于将数据流中的数据进行转换,形成新的数据流。简单来说,就是一个“一一映射”,消费一个元素就产出一个元素

public interface MapFunction<T, O> extends Function, Serializable {O map(T var1) throws Exception;
}

filter

filter 转换操作,顾名思义是对数据流执行一个过滤,通过一个布尔条件表达式设置过滤
条件,对于每一个流内元素进行判断,若为 true 则元素正常输出,若为 false 则元素被过滤掉

public interface FilterFunction<T> extends Function, Serializable {boolean filter(T var1) throws Exception;
}

flatMap

flatMap 操作又称为扁平映射,主要是将数据流中的整体(一般是集合类型)拆分成一个一个的个体使用。消费一个元素,可以产生 0 到多个元素。flatMap 可以认为是“扁平化”(flatten)和“映射”(map)两步操作的结合

public interface FlatMapFunction<T, O> extends Function, Serializable {void flatMap(T var1, Collector<O> var2) throws Exception;
}
5.2 聚合算子

按键分区keyBy后才能使用聚合算子

对于 Flink 而言,DataStream 是没有直接进行聚合的 API 的。因为我们对海量数据做聚合肯定要进行分区并行处理,这样才能提高效率。所以在 Flink 中,要做聚合,需要先进行分区;这个操作就是通过 keyBy 来完成的

基于不同的 key,流中的数据将被分配到不同的分区中去,这样一来,所有具有相同的 key 的数据,都将被发往同一个分区,那么下一步算子操作就将会在同一个 slot中进行处理了,这里所说的分区,其实就是并行处理的子任务,也就对应着任务槽(task slot)

public interface KeySelector<IN, KEY> extends Function, Serializable {KEY getKey(IN var1) throws Exception
}

简单聚合

有了按键分区的数据流 KeyedStream,我们就可以基于它进行聚合操作了。Flink 为我们内置实现了一些最基本、最简单的聚合 API,主要有以下几种

  • sum():在输入流上,对指定的字段做叠加求和的操作
  • min():在输入流上,对指定的字段求最小值
  • max():在输入流上,对指定的字段求最大值
  • minBy():与 min()类似,在输入流上针对指定字段求最小值。不同的是,min()只计算指定字段的最小值,其他字段会保留最初第一个数据的值;而 minBy()则会返回包含字段最小值的整条数据
  • maxBy():与 max()类似,在输入流上针对指定字段求最大值。两者区别与
    min()/minBy()完全一致

归约聚合reduce 两个输入和返回值类型都一样的

public interface ReduceFunction<T> extends Function, Serializable {T reduce(T var1, T var2) throws Exception;
}
6.输出算子

Flink 的 DataStream API 专门提供了向外部写入数据的方法:addSink。与 addSource 类似,addSink 方法对应着一个“Sink”算子,主要就是用来实现与外部系统连接、并将数据提交写入的;Flink 程序中所有对外的输出操作,一般都是利用 Sink 算 子完成的

Sink 一词有“下沉”的意思,有些资料会相对于“数据源”把它翻译为“数据汇”。不论怎样理解,Sink 在 Flink 中代表了将结果数据收集起来、输出到外部的意思,所以我们这里统一把它直观地叫作“输出算子”

之前我们一直在使用的 print 方法其实就是一种 Sink,它表示将数据流写入标准控制台打印输出。查看源码可以发现,print 方法返回的就是一个 DataStreamSink

8.1 输出到文件

StreamingFileSink 为批处理和流处理提供了一个统一的 Sink,它可以将分区文件写入 Flink支持的文件系统

它的主要操作是将数据写入桶(buckets),每个桶中的数据都可以分割成一个个大小有限的分区文件,这样一来就实现真正意义上的分布式文件存储。我们可以通过各种配置来控制“分桶”的操作;默认的分桶方式是基于时间的,我们每小时写入一个新的桶。换句话说,每个桶内保存的文件,记录的都是 1 小时的输出数据

StreamingFileSink 支持行编码(Row-encoded)和批量编码(Bulk-encoded,比如 Parquet)
格式。这两种不同的方式都有各自的构建器(builder),调用方法也非常简单,可以直接调用
StreamingFileSink 的静态方法:

  • 行编码:StreamingFileSink.forRowFormat(basePath,rowEncoder)
  • 批量编码:StreamingFileSink.forBulkFormat(basePath,bulkWriterFactory)

在创建行或批量编码 Sink 时,我们需要传入两个参数,用来指定存储桶的基本路径(basePath)和数据的编码逻辑(rowEncoder 或 bulkWriterFactory)

public class SinkToFile {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(2);DataStreamSource<Event> source = env.addSource(new ClickSource());// 泛型String是要写入的数据类型StreamingFileSink<String> fileSink = StreamingFileSink.<String>forRowFormat(new Path("./output"),new SimpleStringEncoder<>("UTF-8")).withRollingPolicy(DefaultRollingPolicy.builder().withMaxPartSize(1024 * 1024 * 1024).withRolloverInterval(TimeUnit.MINUTES.toMillis(15)).build()).build();source.map(Event::toString).addSink(fileSink);env.execute();}
}
8.2 输出到Kafka
public class SinkToKafka {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.配置Properties properties = new Properties();properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092");properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,StringDeserializer.class);properties.put(ConsumerConfig.GROUP_ID_CONFIG,"100");properties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");// 3.从Kafka读取数据DataStreamSource<String> kafkaSource =env.addSource(new FlinkKafkaConsumer<String>("click", new SimpleStringSchema(), properties));SingleOutputStreamOperator<String> map = kafkaSource.map(data -> {String[] strs = data.split(" ");return new Event(strs[0], strs[1], Long.valueOf(strs[2]), Long.valueOf(strs[3])).toString();});// 4.写入到Kafkamap.addSink(new FlinkKafkaProducer<String>("hadoop102:9092","events",new SimpleStringSchema()));env.execute();}
}

这里我们可以看到,addSink 传入的参数是一个 FlinkKafkaProducer。这也很好理解,因为需要向 Kafka 写入数据,自然应该创建一个生产者。FlinkKafkaProducer 继承了抽象类TwoPhaseCommitSinkFunction,这是一个实现了“两阶段提交”的 RichSinkFunction。两阶段提交提供了 Flink 向 Kafka 写入数据的事务性保证,能够真正做到精确一次(exactly once)的状态一致性

8.3 输出到redis

Flink 没有直接提供官方的 Redis 连接器,不过 Bahir 项目还是担任了合格的辅助角色,为我们提供了 Flink-Redis 的连接工具

具体测试步骤如下

(1)导入的 Redis 连接器依赖

<dependency><groupId>org.apache.bahir</groupId><artifactId>flink-connector-redis_2.11</artifactId><version>1.0</version>
</dependency>

(2)启动 Redis 集群

这里我们为方便测试,只启动了单节点 Redis

(3)编写输出到 Redis 的示例代码

public class SinkToRedis {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();// 2.配置FlinkJedisPoolConfig conf = newFlinkJedisPoolConfig.Builder().setHost("175.178.154.194").setPassword("zks0.0").setDatabase(0).build();DataStreamSource<Event> source = env.addSource(new ClickSource());source.addSink(new RedisSink<>(conf, new RedisMapper<Event>() {@Override // 调用哪个redis数据结构public RedisCommandDescription getCommandDescription() {return new RedisCommandDescription(RedisCommand.SET);}// key@Overridepublic String getKeyFromData(Event event) {return event.getUser();}// value@Overridepublic String getValueFromData(Event event) {return event.toString();}}));env.execute();}
}
8.4 输出到Elasticsearch

Flink 为 ElasticSearch 专门提供了官方的 Sink 连接器,Flink 1.13 支持当前最新版本的
ElasticSearch

写入数据的 ElasticSearch 的测试步骤如下
(1)添加 Elasticsearch 连接器依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>

(2)启动 Elasticsearch 集群

(3)编写输出到 Elasticsearch 的示例代码

与 RedisSink 类 似 , 连 接 器 也 为 我 们 实 现 了 写 入 到 Elasticsearch 的SinkFunction—ElasticsearchSink区别在于,这个类的构造方法是私有(private)的,我们需要使用 ElasticsearchSink 的 Builder 内部静态类,调用它的 build()方法才能创建出真正的SinkFunction。 而 Builder 的构造方法中又有两个参数:

  • httpHosts:连接到的 Elasticsearch 集群主机列表
  • elasticsearchSinkFunction:这并不是我们所说的 SinkFunction,而是用来说明具体处理逻辑、准备数据向 Elasticsearch 发送请求的函数

具体的操作需要重写中 elasticsearchSinkFunction 中的 process 方法,我们可以将要发送的数据放在一个 HashMap 中,包装成 IndexRequest 向外部发送 HTTP 请求

public class SinkToES {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Event> source = env.addSource(new ClickSource());// 2.定义host的列表List<HttpHost> hostList = new ArrayList<>();hostList.add(new HttpHost("175.178.154.194",9200));// 3.定义ES Sink FunctionElasticsearchSinkFunction<Event> elasticsearchSinkFunction = new ElasticsearchSinkFunction<Event>() {// requestIndexer的add方法发送请求  代替了客户端@Overridepublic void process(Event event, RuntimeContext runtimeContext, RequestIndexer requestIndexer) {for (int i = 1; i <= 5 ; i++) {IndexRequest request = new IndexRequest("clicks").id(String.valueOf(i));request.source(JSONUtils.objectToJson(event), XContentType.JSON);requestIndexer.add(request);}}};source.addSink(new ElasticsearchSink.Builder<>(hostList,elasticsearchSinkFunction).build());env.execute();}
}
8.5 JDBCSink — 输出到MySQL

写入数据的 MySQL 的测试步骤如下
(1)添加依赖

<dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_${scala.binary.version}</artifactId><version>${flink.version}</version>
</dependency>
<dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>8.0.28</version>
</dependency>

(2)启动 MySQL,在 database 库下建表 clicks

mysql> create table clicks(
-> user varchar(20) not null,
-> url varchar(100) not null);

public class SinkToMySQL {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();DataStreamSource<Event> source = env.addSource(new ClickSource());source.addSink(JdbcSink.sink("insert into clicks(user,url) values(?,?)",((preparedStatement, event) -> {preparedStatement.setString(1,event.user);preparedStatement.setString(2,event.url);}),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withUrl("jdbc:mysql://175.178.154.194:3306/flink?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8").withDriverName("com.mysql.cj.jdbc.Driver").withUsername("root").withPassword("xxxx").build()));env.execute();}
}
8.6 JDBCSink — 输出到ClickHouse
<dependency>
<groupId>ru.yandex.clickhouse</groupId><artifactId>clickhouse-jdbc</artifactId><version>0.2.4</version><exclusions><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-databind</artifactId></exclusion><exclusion><groupId>com.fasterxml.jackson.core</groupId><artifactId>jackson-core</artifactId></exclusion></exclusions>
</dependency>

和MySQL基本一致,只不过连接的时候没有用户名和密码

new JdbcExecutionOptions.Builder().withBatchSize(5).build(),new JdbcConnectionOptions.JdbcConnectionOptionsBuilder().withDriverName("ru.yandex.clickhouse.ClickHouseDriver").withUrl("jdbc:clickhouse://hadoop102:8123/default").build());
8.7 自定义Sink 输出到Hbase
public class DimHbaseSinkFunction extends RichSinkFunction<JSONObject> {private Connection connection;// open 并行子任务都会执行一次open、close 常用来执行初始化操作@Overridepublic void open(Configuration parameters) throws Exception {try {String url = "jdbc:phoenix:hadoop102,hadoop103,hadoop104:2181";Properties props = new Properties();props.put("phoenix.schema.isNamespaceMappingEnabled","true");connection = DriverManager.getConnection(url,props);} catch (SQLException e) {throw new RuntimeException(e);}}// 自定义输出 value 数据格式  sinkTable(hbase),database(mysql),tableName(mysql),before,after(数据),type@Overridepublic void invoke(JSONObject value, Context context) throws Exception {//获取要插入的数据 {id:1,name:kun}JSONObject after = value.getJSONObject("after");// 获取key 作为hbase的列Set<String> keySet = after.keySet();// 获取values 赋值Collection<Object> values = after.values();// 获取SQL语句String sql = "upsert into RTDW_HBASE" +"."+ value.getString("sinkTable")+"("+ StringUtils.join(keySet,",")+")" +"values('" + StringUtils.join(values,"','")+ "')";System.out.println(sql);PreparedStatement ps = null;try{connection.setAutoCommit(false);// 预编译SQLps = connection.prepareStatement(sql);// 执行插入修改操作ps.executeUpdate();connectionmit();ps.close();}catch (SQLException e){e.printStackTrace();}}}
7.富函数

“富函数类”也是 DataStream API 提供的一个函数类的接口,所有的 Flink 函数类都有其Rich 版本。富函数类一般是以抽象类的形式出现的。例如:RichMapFunction、RichFilterFunction、RichReduceFunction 等

既然“富”,那么它一定会比常规的函数类提供更多、更丰富的功能。与常规函数类的不同主要在于,富函数类可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能

Rich Function 有生命周期的概念。典型的生命周期方法有:

  • open()方法,是 Rich Function 的初始化方法,也就是会开启一个算子的生命周期。当一个算子的实际工作方法例如 map()或者 filter()方法被调用之前,open()会首先被调用。所以像文件 IO 的创建,数据库连接的创建,配置文件的读取等等这样一次性的工作,都适合在 open()方法中完成

  • close()方法,是生命周期中的最后一个调用的方法,类似于解构方法。一般用来做一些清理工作

1.注意富函数是抽象类,不是接口,并且类中的方法不止一个,所以不能用Lambda表达式,只能自己写一个类

2.每一个并行子任务都会执行一次open、close

3.富函数和状态编程有着息息相关的关系

public class RichFunctionTest {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);// 2.从自定义数据源读取DataStreamSource<Event> source = env.addSource(new ClickSource());source.map(new MyRich()).setParallelism(2).print();env.execute();}// 对于open和close 一个并行度会被调用一次public static class MyRich extends RichMapFunction<Event,String>{@Overridepublic void open(Configuration parameters) throws Exception {super.open(parameters);System.out.println("open生命周期被调用 " + getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic void close() throws Exception {super.close();System.out.println("close生命周期被调用 " + getRuntimeContext().getIndexOfThisSubtask());}@Overridepublic String map(Event event) throws Exception {return event.getUrl();}}
}
8.物理分区

有些时候,我们还需要手动控制数据分区分配策略。比如当发生数据倾斜的时候,系统无法自动调整,这时就需要我们重新进行负载均衡,将数据流较为平均地发送到下游任务操作分区中去。Flink 对于经过转换操作之后的 DataStream,提供了一系列的底层操作接口,能够帮我们实现数据流的手动重分区。为了同 keyBy 相区别,我们把这些操作统称为“物理分区”操作。物理分区与 keyBy 另一大区别在于,keyBy 之后得到的是一个KeyedStream,而物理分区之后结果仍是 DataStream,且流中元素数据类型保持不变。从这一点也可以看出,分区算子并不对数据进行转换处理,只是定义了数据的传输方式

常见的物理分区策略有随机分配(Random)、轮询分配(Round-Robin)、重缩放(Rescale)和广播(Broadcast),下边我们分别来做了解

1.随机分区(shuffle)

最简单的重分区方式就是直接“洗牌”。通过调用 DataStream 的.shuffle()方法,将数据随
机地分配到下游算子的并行任务中去

source.map(Event::getUser).shuffle().print().setParallelism(3);

map算子并行度1,经过shuffle后,随机的将数据分到3个并行的print算子中

2. 轮询分区(Round-Robin)

轮询也是一种常见的重分区方式。简单来说就是“发牌”,按照先后顺序将数据做依次分发,通过调用 DataStream 的.rebalance()方法,就可以实现轮询重分区。rebalance使用的是 Round-Robin 负载均衡算法,可以将输入流数据平均分配到下游的并行任务中去

source.map(Event::getUser).rebalance().print().setParallelism(3);

map算子并行度1,经过rebalance后,轮询将数据分到3个并行的print算子中

3.重缩放分区

重缩放分区和轮询分区非常相似。当调用 rescale()方法时,其实底层也是使用 Round-Robin算法进行轮询,但是只会将数据轮询发送到下游并行任务的一部分中,如图 5-11 所示。也就是说,“发牌人”如果有多个,那么rebalance 的方式是每个发牌人都面向所有人发牌;而 rescale的做法是分成小团体,发牌人只给自己团体内的所有人轮流发牌


4. 广播(broadcast)

这种方式其实不应该叫做“重分区”,因为经过广播之后,数据会在不同的分区都保留一份,可能进行重复处理。可以通过调用 DataStream 的 broadcast()方法,将输入数据复制并发送到下游算子的所有并行任务中去

5. 全局分区(global)

全局分区也是一种特殊的分区方式。这种做法非常极端,通过调用.global()方法,会将所有的输入流数据都发送到下游算子的第一个并行子任务中去。这就相当于强行让下游任务并行度变成了 1,所以使用这个操作需要非常谨慎,可能对程序造成很大的压力

自定义分区

当 Flink 提 供 的 所 有 分 区 策 略 都 不 能 满 足 用 户 的 需 求 时 , 我 们 可 以 通 过 使 用partitionCustom()方法来自定义分区策略

在调用时,方法需要传入两个参数,第一个是自定义分区器(Partitioner)对象,第二个是应用分区器的字段,它的指定方式与 keyBy 指定 key 基本一样:可以通过字段名称指定,也可以通过字段位置索引来指定,还可以实现一个 KeySelector

public class CustomPartition {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env =StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);DataStreamSource<Event> source = env.addSource(new ClickSource());source.partitionCustom(new Partitioner<String>() {@Overridepublic int partition(String name, int i) {if(name.length() == 3) return 0;return 1;}}, new KeySelector<Event, String>() {@Overridepublic String getKey(Event event) throws Exception {return event.getUser();  // 根据名字分区}}).print().setParallelism(2);env.execute();}}

二.时间和窗口

1.时间语义

1.处理时间Processing Time

处理时间的概念非常简单,就是指执行处理操作的机器目前所处的系统时间

2. 事件时间(Event Time) (核心、重要)

事件时间,是指每个事件在对应的设备上发生的时间,也就是数据生成的时间数据一旦产生,这个时间自然就确定了,所以它可以作为一个属性嵌入到数据中。这其实就是这条数据记录的“时间戳”(Timestamp)

2.水位线


我们想统计8点-9点的数据,那这个窗口的时间以谁为标准呢?如果以系统时间为准,即到了9点这个窗口就关闭,显然是不合理的,因为由于延迟,可能有些数据没来得及在9点前进入窗口,但它是9点前发生的事件

一种简单的想法是,在数据流中加入一个时钟标记,记录当前的事件时间;这个标记可以直接广播到下游,当下游任务收到这个标记,就可以更新自己的时钟了。由于类似于水流中用来做标志的记号,在 Flink 中,这种用来衡量事件时间(Event Time)进展的标记,就被称作“水位线”(Watermark)

具体实现上,水位线可以看作一条特殊的数据记录,它是插入到数据流中的一个标记点,主要内容就是一个时间戳,用来指示当前的事件时间。而它插入流中的位置,就应该是在某个数据到来之后;这样就可以从这个数据中提取时间戳,作为当前水位线的时间戳了

有序流周期性插入水位线(理解即可,实际中保证有序很难)

实际应用中,如果当前数据量非常大,可能会有很多数据的时间戳是相同的,这时每来一条数据就提取时间戳、插入水位线就做了大量的无用功。而且即使时间戳不同,同时涌来的数据时间差会非常小(比如几毫秒),往往对处理计算也没什么影响。所以为了提高效率,一般会每隔一段时间生成一个水位线,这个水位线的时间戳,就是当前最新数据的时间戳,如图,这时的水位线,其实就是有序流中的一个周期性出现的时间标记

无序流水位线

这里所说的“乱序”(out-of-order),是指数据的先后顺序不一致,在消息队列中由于某种原因导致数据乱序了

解决思路也很简单:我们插入新的水位线时,要先判断一下时间戳是否比之前的大,否则就不再生成新的水位线,也就是说,只有数据的时间戳比当前时钟大,才能推动时钟前进,这时才插入水位线


如果考虑到大量数据同时到来的处理效率,我们同样可以周期性地生成水位线。这时只需要保存一下之前所有数据中的最大时间戳,需要插入水位线时,就直接以它作为时间戳生成新的水位线

水位线设置延迟(重要)

没有延迟会导致什么情况

上图,当 9 秒产生的数据到来之后,我们就直接将时钟推进到了9 秒;如果有一个窗口结束时间就是 9 秒(比如,要统计 0~9 秒的所有数据),那么这时窗口就应该关闭、将收集到的所有数据计算输出结果了。但事实上,由于数据是乱序的,还可能有时间戳为 7 秒、8 秒的数据在 9 秒的数据之后才到来,这就是“迟到数据”(late data)。它们本来也应该属于 0~9 秒这个窗口,但此时窗口已经关闭,于是这些数据就被遗漏了,这会导致统计结果不正确

如何设置延迟

为了让窗口能够正确收集到迟到的数据,我们也可以等上一会(不需要很大的时间,一般是毫秒,最大也就几秒,比如2秒);也就是用当前已有数据的最大时间戳减去 2 秒,就是水位线的时间戳,这样的话,9 秒的数据到来之后,事件时钟不会直接推进到 9 秒,而是进展到了 7 秒;必须等到11 秒的数据到来之后,事件时钟才会进展到 9 秒,这时迟到数据也都已收集齐,0~9 秒的窗口就可以正确计算结果了

水位线的特性

现在我们可以知道,水位线就代表了当前的事件时间时钟,而且可以在数据的时间戳基础上加一些延迟来保证不丢数据,这一点对于乱序流的正确处理非常重要

我们可以总结一下水位线的特性:

  • 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
  • 水位线主要的内容是一个时间戳,用来表示当前事件时间的进展
  • 水位线是基于数据的时间戳生成的
  • 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
  • 水位线可以通过设置延迟,来保证正确处理乱序数据
  • 一个水位线 Watermark(t),表示在当前流中事件时间已经达到了时间戳 t, 这代表 t 之前的所有数据都到齐了,之后流中不会出现时间戳 t’ ≤ t 的数据

代码设置水位线

在 Flink 的 DataStream API 中 , 有 一 个 单 独 用 于 生 成 水 位 线 的 法.assignTimestampsAndWatermarks(),它主要用来为流中的数据分配时间戳,并生成水位线来指示事件时间:

public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks( WatermarkStrategy<T> watermarkStrategy)

数据里已经有时间戳了吗,为什么这里还要“分配”呢?这是因为原始的时间戳只是写入日志数据的一个字段,如果不提取出来并明确把它分配给数据,Flink 是无法知道数据真正产生的时间的。当然,有些时候数据源本身就提供了时间戳信息,比如读取 Kafka 时,我们就可以从 Kafka 数据中直接获取时间戳,而不需要单独提取字段分配

.assignTimestampsAndWatermarks()方法需要传入一个 WatermarkStrategy 作为参数,这就是 所 谓 的 “ 水 位 线 生 成 策 略 ” 。 WatermarkStrategy 中 包 含 了 一 个 “ 时 间 戳 分 配器”TimestampAssigner 和一个“水位线生成器”WatermarkGenerator

  • TimestampAssigner:主要负责从流中数据元素的某个字段中提取时间戳,并分配给元素。时间戳的分配是生成水位线的基础
  • WatermarkGenerator:主要负责按照既定的方式,基于时间戳生成水位线。在WatermarkGenerator 接口中,主要又有两个方法:onEvent()和 onPeriodicEmit()
@Public
public interface WatermarkGenerator<T> {void onEvent(T var1, long var2, WatermarkOutput var4);void onPeriodicEmit(WatermarkOutput var1);
}
  • onEvent:每个事件(数据)到来都会调用的方法,它的参数有当前事件、时间戳,以及允许发出水位线的一个 WatermarkOutput,可以基于事件做各种操作
  • onPeriodicEmit:周期性调用的方法,可以由 WatermarkOutput 发出水位线。周期时间为处理时间,可以调用环境配置的.setAutoWatermarkInterval()方法来设置,默认为200ms
public class WaterMarkTest {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.getConfig().setAutoWatermarkInterval(100); // 100毫秒生成一次水位线env.addSource(new ClickSource())// 乱序流的WaterMark生成.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 延迟2秒保证数据正确.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Override // 时间戳的提取器public long extractTimestamp(Event event, long l) {return event.getTimestamp();}}));env.execute();}
}

总结

为了保证数据正确,我们设置了水位线,水位线其实就是一个时间戳,为了保证数据都在窗口范围内,我们设置了延迟时间,水位线的真正时间戳就是数据中的时间戳 - 延迟时间 - 1 ,单位是毫秒

窗口定义是前闭后开的,假如我们需要8点到9点的数据,实际上是包括8点整的数据,不包括9点整的数据,假设延迟时间为5秒,这样8点0分5秒1ms的数据来了后,Flink水位线是8点,就开始统计了,9点整的数据对应的水位线其实是8点59分55秒999ms,这时候不会关闭窗口,因为要等待延迟数据,当Flink水位线是8点59分59秒999毫秒时,关闭窗口

水位线的传递

如果一个任务收到了来自上游并行任务的不同的水位线,说明上游各个分区处理得有快有慢,进度各不相同比如上游有两个并行子任务都发来了水位线,一个是 5 秒,一个是 7 秒;这代表第一个并行任务已经处理完 5 秒之前的
所有数据,而第二个并行任务处理到了 7 秒。那这时自己的时钟怎么确定呢?当然也要以“这之前的数据全部到齐”为标准。如果我们以较大的水位线 7 秒作为当前时间,那就表示“7 秒前的数据都已经处理完”,这显然不是事实——第一个上游分区才处理到 5 秒,5~7 秒的数据还会不停地发来;而如果以最小的水位线 5 秒作为当前时钟就不会有这个问题了,因为确实所有上游分区都已经处理完,不会再发 5 秒前的数据了。这让我们想到“木桶原理”:所有的上游并行任务就像围成木桶的一块块木板,它们中最短的那一块,决定了我们桶中的水位

3.窗口

定义

下图,我们设置了水位线延迟以后,在水位线为10时,11 12 秒的数据也进来了,但它们不属于0 - 10 秒

在 Flink 中,窗口其实并不是一个“框”,流进来的数据被框住了就只能进这一个窗口。相比之下,我们应该把窗口理解成一个“桶”,在 Flink 中,窗口可以把流切割成有限大小的多个“存储桶”(bucket);每个数据都会分发到对应的桶中,当到达窗口结束时间时,就对每个桶中收集的数据进行计算处理


窗口分类

1. 按照驱动类型分类

时间窗口以时间点来定义窗口的开始(start)和结束(end),所以截取出的就是某一时间段的数据。到达结束时间时,窗口不再收数据,触发计算输出结果,并将窗口关闭销毁。所以可以说基本思路就是“定点发车”

计数窗口基于元素的个数来截取数据,到达固定的个数时就触发计算并关闭窗口。这相当于座位有限、“人满就发车”,是否发车与时间无关。每个窗口截取数据的个数,就是窗口的大小


2. 按照窗口分配数据的规则分类

滚动窗口有固定的大小,是一种对数据进行“均匀切片”的划分方式。窗口之间没有重叠,也不会有间隔,是“首尾相接”的状态。如果我们把多个窗口的创建,看作一个窗口的运动,那就好像它在不停地向前“翻滚”一样。这是最简单的窗口形式,我们之前所举的例子都是滚动窗口。也正是因为滚动窗口是“无缝衔接”,所以每个数据都会被分配到一个窗口,而且只会属于一个窗口

滚动窗口可以基于时间定义,也可以基于数据个数定义;需要的参数只有一个,就是窗口的大小(window size)。比如我们可以定义一个长度为 1 小时的滚动时间窗口,那么每个小时就会进行一次统计;或者定义一个长度为 10 的滚动计数窗口,就会每 10 个数进行一次统计

滑动窗口的大小也是固定的。区别在于,窗口之间并不是首尾相接的,而是可以“错开”一定的位置。如果看作一个窗口的运动,那么就像是向前小步“滑动”一样。既然是向前滑动,那么每一步滑多远,就也是可以控制的。所以定义滑动窗口的参数有两个:除去窗口大小(window size)之外,还有一个“滑动步长”(window slide),它其实就代表了窗口计算的频率

我们可以看到,与前两种窗口不同,会话窗口的长度不固定,起始和结束时间也是不确定的,各个分区之间窗口没有任何关联。会话窗口之间一定是不会重叠的,而且会留有至少为 size 的间隔(session gap)。在一些类似保持会话的场景下,往往可以使用会话窗口来进行数据的处理统计,两个数据之间间隔大于gap,即开启新的窗口


还有一类比较通用的窗口,就是“全局窗口”。这种窗口全局有效,会把相同 key 的所有数据都分配到同一个窗口中;说直白一点,就跟没分窗口一样。无界流的数据永无止尽,所以这种窗口也没有结束的时候,默认是不会做触发计算的。如果希望它能对数据进行计算处理,还需要自定义“触发器”(Trigger)

4.窗口API(重点)
4.1 窗口分类

(1)按键分区窗口(Keyed Windows)

经过按键分区 keyBy 操作后,数据流会按照 key 被分为多条逻辑流(logical streams),这就是 KeyedStream。基于 KeyedStream 进行窗口操作时, 窗口计算会在多个并行子任务上同时执行。相同 key 的数据会被发送到同一个并行子任务,而窗口操作会基于每个 key 进行单独的处理。所以可以认为,每个 key 上都定义了一组窗口,各自独立地进行统计计算

stream.keyBy(...)
.window(...)

(2)非按键分区(Non-Keyed Windows)

如果没有进行 keyBy,那么原始的 DataStream 就不会分成多条逻辑流。这时窗口逻辑只能在一个任务(task)上执行,就相当于并行度变成了 1。所以在实际应用中一般不推荐使用这种方式。在代码中,直接基于 DataStream 调用.windowAll()定义窗口

stream.windowAll(...)

这里需要注意的是,对于非按键分区的窗口操作,手动调大窗口算子的并行度也是无效的,windowAll本身就是一个非并行的操作

4.2 窗口组成

窗口 = 窗口分配器 + 窗口函数

窗口分配器(Window Assigners)定义窗口分配器(Window Assigners)是构建窗口算子的第一步,它的作用就是定义数据应该被“分配”到哪个窗口

streamOperator.keyBy(Event::getUser).window(TumblingEventTimeWindows.of(Time.hours(1)));// 滚动时间窗口streamOperator.keyBy(Event::getUser).window(SlidingEventTimeWindows.of(Time.hours(1),Time.minutes(10)));// 滑动时间窗口streamOperator.keyBy(Event::getUser).window(EventTimeSessionWindows.withGap(Time.minutes(1)));// 会话时间窗口streamOperator.keyBy(Event::getUser).window(GlobalWindows.create());// 全局窗口streamOperator.keyBy(Event::getUser).countWindow(100);   // 滚动计数窗口streamOperator.keyBy(Event::getUser).countWindow(100,10); // 滑动计数窗口

窗口函数

窗口分配器只说明了数据分到哪个窗口,我们还需要窗口函数才能实现对数据的处理

4.3 窗口函数 - 增量聚合函数

典型的增量聚合函数有两个:ReduceFunction 和 AggregateFunction

(1)归约函数(ReduceFunction)

中间聚合的状态和输出的结果,都和输入的数据类型是一样的

public interface ReduceFunction<T> extends Function, Serializable {T reduce(T var1, T var2) throws Exception;
}

归约函数使用

stream.redcue(new MyRedcueFunction())

(2)聚合函数(AggregateFunction)

ReduceFunction 可以解决大多数归约聚合的问题,但是这个接口有一个限制,就是聚合状态的类型、输出结果的类型都必须和输入数据类型一样。这就迫使我们必须在聚合前,先将数据转换(map)成预期结果类型;而在有些情况下,还需要对状态进行进一步处理才能得到输出结果,这时它们的类型可能不同,使用 ReduceFunction 就会非常麻烦

AggregateFunction 可以看作是 ReduceFunction 的通用版本,这里有三种类型:输入类型(IN)、累加器类型(ACC)和输出类型(OUT)。输入类型 IN 就是输入流中元素的数据类型;累加器类型 ACC 则是我们进行聚合的中间状态类型;而输出类型当然就是最终计算结果的类型了

接口中有四个方法:

  • createAccumulator():创建一个累加器,这就是为聚合创建了一个初始状态,每个聚合任务只会调用一次

  • add():将输入的元素添加到累加器中。这就是基于聚合状态,对新来的数据进行进一步聚合的过程。方法传入两个参数:当前新到的数据 value,和当前的累加器accumulator;返回一个新的累加器值,也就是对聚合状态进行更新。每条数据到来之后都会调用这个方法

  • getResult():从累加器中提取聚合的输出结果。也就是说,我们可以定义多个状态,然后再基于这些聚合的状态计算出一个结果进行输出。比如之前我们提到的计算平均值,就可以把 sum 和 count 作为状态放入累加器,而在调用这个方法时相除得到最终结果。这个方法只在窗口要输出结果时调用

  • merge():合并两个累加器,并将合并后的状态作为一个累加器返回。这个方法只在需要合并窗口的场景下才会被调用;最常见的合并窗口(Merging Window)的场景就是会话窗口(Session Windows)

public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable 
{ACC createAccumulator();ACC add(IN value, ACC accumulator);OUT getResult(ACC accumulator);ACC merge(ACC a, ACC b);
}

聚合函数使用

stream.aggregate(new MyAggregateFunction())
4.4 窗口函数 - 全窗口函数

窗口操作中的另一大类就是全窗口函数。与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算。很明显,这就是典型的批处理思路了——先攒数据,等一批都到齐了再正式启动处理流程,其实属于处理函数的一类

如果使用了keyBy

ProcessWindowFunction<IN, OUT, KEY, W extends Window>

如果没有key,用ProcessAllWindowFunction

ProcessAllWindowFunction<IN, OUT, W extends Window>

IN 输入类型
OUT 输出类型
KEY 分组字段的类型
W 窗口类型 一般是TimeWindow

4.5 增量函数 + 全窗口函数(常用)

我们之前在调用 WindowedStream 的.reduce()和.aggregate()方法时,只是简单地直接传入了一个ReduceFunction 或 AggregateFunction 进行增量聚合。除此之外,其实还可以传入第二个参数:一个全窗口函数,可以是 WindowFunction(一般不用) 或者 ProcessWindowFunction(功能更全)

此时ProcessWindowFunction的输入为增量聚合函数的输出

5.其他窗口API
5.1 触发器

触发器主要是用来控制窗口什么时候触发计算。所谓的“触发计算”,本质上就是执行窗口函数,所以可以认为是计算得到结果并输出的过程

基于 WindowedStream 调用.trigger()方法,就可以传入一个自定义的窗口触发器(Trigger)

stream.keyBy(...).window(...).trigger(new MyTrigger()).process(new xxx)

Trigger 是一个抽象类,自定义时必须实现下面四个抽象方法:

  • onElement():窗口中每到来一个元素,都会调用这个方法
  • onEventTime():当注册的事件时间定时器触发时,将调用这个方法
  • onProcessingTime ():当注册的处理时间定时器触发时,将调用这个方法
  • clear():当窗口关闭销毁时,调用这个方法。一般用来清除自定义的状态

上面的前三个方法可以响应事件,那它们又是怎样跟窗口操作联系起来的呢?这就需要了解一下它们的返回值。这三个方法返回类型都是 TriggerResult,这是一个枚举类型(enum),其中定义了对窗口进行操作的四种类型

  • CONTINUE(继续):什么都不做
  • FIRE(触发):触发计算,输出结果
  • PURGE(清除):清空窗口中的所有数据,销毁窗口
  • FIRE_AND_PURGE(触发并清除):触发计算输出结果,并清除窗口
5.2 移除器(Evictor)

移除器主要用来定义移除某些数据的逻辑。基于 WindowedStream 调用.evictor()方法,就可以传入一个自定义的移除器(Evictor)。Evictor 是一个接口,不同的窗口类型都有各自预实现的移除器

stream.keyBy(...).window(...).evictor(new MyEvictor())

Evictor 接口定义了两个方法:

  • evictBefore():定义执行窗口函数之前的移除数据操作
  • evictAfter():定义执行窗口函数之后的以处数据操作

默认情况下,预实现的移除器都是在执行窗口函数(window fucntions)之前移除数据的

6.延迟数据处理 (三重保障)

第一重保障 - 水位线延迟

第二重保障 - 窗口延迟不关闭

在事件时间语义下,窗口中可能会出现数据迟到的情况。这是因为在乱序流中,水位线(watermark)并不一定能保证时间戳更早的所有数据不会再来。当水位线已经到达窗口结束时间时,窗口会触发计算并输出结果,这时一般也就要销毁窗口了;如果窗口关闭之后,又有本属于窗口内的数据姗姗来迟,默认情况下就会被丢弃

不过在多数情况下,直接丢弃数据也会导致统计结果不准确。为了解决迟到数据的问题,Flink 提供了一个特殊的接口,可以为窗口算子设置一个“允许的最大延迟”(Allowed Lateness)。也就是说,我们可以设定允许延迟一段时间,在这段时间内,窗口不会销毁,继续到来的数据依然可以进入窗口中并触发计算。直到水位线推进到了窗口结束时间 + 延迟时间,才真正将窗口的内容清空,正式关闭窗口

基于 WindowedStream 调用.allowedLateness()方法,传入一个 Time 类型的延迟时间,就可以表示允许这段时间内的延迟数据

stream.keyBy(...).window(TumblingEventTimeWindows.of(Time.mintues(1))).allowedLateness(Time.minutes(1))

比如上面的代码中,我们定义了 1 小时的滚动窗口,并设置了允许 1 分钟的延迟数据。也就是说,在不考虑水位线延迟的情况下,对于 8 点~9 点的窗口,本来应该是水位线到达 9 点整就触发计算并关闭窗口;现在允许延迟 1 分钟,那么 9 点整就只是触发一次计算并输出结果,并不会关窗。后续到达的数据,只要属于 8 点~9 点窗口,依然可以在之前统计的基础上继续叠加,并且再次输出一个更新后的结果。直到水位线到达了 9 点零 1 分,这时就真正清空状态、关闭窗口,之后再来的迟到数据就会被丢弃了

第三重保障 - 侧输出流

我们自然会想到,即使可以设置窗口的延迟时间,终归还是有限的,后续的数据还是会被丢弃。如果不想丢弃任何一个数据,又该怎么做呢?

Flink 还提供了另外一种方式处理迟到数据。我们可以将未收入窗口的迟到数据,放入“侧输出流”(side output)进行另外的处理。所谓的侧输出流,相当于是数据流的一个“分支”,这个流中单独放置那些错过了该上的车、本该被丢弃的数据。基于 WindowedStream 调用.sideOutputLateData() 方法,就可以实现这个功能。方法需要传入一个“输出标签”(OutputTag),用来标记分支的迟到数据流。因为保存的就是流中的原始数据,所以 OutputTag 的类型与流中数据类型相同

具体应用时,只要在处理函数的.processElement()或者.onTimer()方法中,调用上下文的.output()方法就可以了

这里 output()方法需要传入两个参数,第一个是一个“输出标签”OutputTag,用来标识侧输出流,一般会在外部统一声明;第二个就是要输出的数据

我们可以在外部先将 OutputTag 声明出来:

OutputTag<String> outputTag = new OutputTag<String>("side-output") {};

如果想要获取这个侧输出流,可以基于处理之后的 DataStream 直接调用.getSideOutput()方法,传入对应的 OutputTag,这个方式与窗口 API 中获取侧输出流是完全一样的

DataStream<String> stringStream = longStream.getSideOutput(outputTag);

案例(重要,理解三种保障)

窗口计算触发、窗口关闭都是由Flink水位线决定的,而计算的数据是真实的符合时间条件的数据,比如我设置水位线延迟2秒,那么0-22秒是收集第一个窗口数据的,因为有的数据在0-20秒内可能会进不来,所以多等2秒,确保事件时间为0-20秒的都进了窗口,但是多等2秒可能21秒、21.6秒的数据可能也已经进来了,没关系,Flink的窗口是分桶的,我们计算的时候多余的数据时不会被计算的,依旧计算的是0-20秒的数据!

下面这段代码,作用就是统计一个窗口内不同的页面被访问了多少次,主要关心三种保障是怎么发挥作用的即可

我们设置了一个20秒滚动窗口,并且水位线延迟了2秒,并且允许窗口延迟5秒关闭,而且还设置了侧输出流来处理延迟了太久的数据

读到这里,我们应该知道

1.哪些数据会被认为是第一个窗口的数据?(以下是自1970年开始毫秒单位)

时间戳在0-20000内的数据会是第一个窗口,包括0,但不包括20000,因为窗口的定义是前闭后开的!!!

同理,时间戳在20000 - 40000的数据会在第二个20秒的窗口

40000 - 60000会在第三个20秒的窗口

但是只有22000的数据到来后,第一个窗口才会进行计算,因为22000对应水位线为20秒

2.窗口会持续多长时间?

27秒的数据来了以后,第一个窗口就关闭了,因为此时Flink的水位线是25秒 = 窗口本身的20秒 + 允许延迟5秒

也就是说,只要27秒的数据及其以后的数据不来,且数据时间戳在0-20000的数据都会被视为第一个窗口,并且持续计算,如果27秒的数据来了,那么第一个窗口就关闭了,要是再来0-20000的数据就会进入侧输出流

同理,47秒的数据来了后,第二个窗口也就关闭了

public class LateDataTest {public static void main(String[] args) throws Exception {// 1.创建执行环境StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);env.getConfig().setAutoWatermarkInterval(100); // 100毫秒生成一次水位线SingleOutputStreamOperator<Event> streamOperator = env.socketTextStream("175.178.154.194", 7777).map(str -> {String[] strs = str.split(" ");return new Event(strs[0], strs[1], Long.valueOf(strs[2]), Long.valueOf(strs[3]));}).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2)) // 第一重保证 延迟2秒保证数据正确.withTimestampAssigner(new SerializableTimestampAssigner<Event>() {@Override // 时间戳的提取器public long extractTimestamp(Event event, long l) {return event.getTimestamp();}}));streamOperator.print("input: ");// 定义一个输出标签OutputTag<Event> late = new OutputTag<Event>("late"){};SingleOutputStreamOperator<String> res = streamOperator.keyBy(Event::getUrl).window(TumblingEventTimeWindows.of(Time.seconds(20))).allowedLateness(Time.seconds(5)) // 第二重保证  窗口延迟5秒.sideOutputLateData(late) // 第三重保证 侧输出流.aggregate(new UrlCount.MyAgg(), new UrlCount.MyProcessFunction());res.print("result: ");res.getSideOutput(late).print("late: ");env.execute();}
}

首先第一个窗口的数据时0-20秒(不包括20秒),可以看到符合条件的只有前三条数据,并且只有水位线到达20秒(也就是事件时间为22秒的数据到来后),触发了计算,但此时窗口没有关闭


当27秒的数据到达后,也就是水位线时间为25秒时,第一个窗口关闭,此时再进来属于第一个窗口的数据,就会进入侧输入流

7.关于窗口、水位线延迟、窗口延迟、侧输出流总结

1.我们计算的数据肯定是在我们规定的时间范围内的,比如滚动窗口设置1小时,那么8点-9点的数据肯定都在这个窗口内,而且这个窗口内也不会有其他时间的数据,因为Flink的窗口是分桶的

2.为了保证延迟数据的到来,我们可以延迟水位线,多等一会来确保8-9点的数据都来了

水位线时间戳 = 实际事件时间 - 水位线延迟时间 - 1 (单位是毫秒)。窗口触发计算、窗口关闭时间都是以水位线为驱动

3.为了更加确保延迟数据的到来,我们允许窗口延迟关闭,即可以再设置个窗口关闭时间

窗口关闭时间 = 窗口正常时间 + 窗口延迟关闭时间,和水位线延迟时间没有关系

4.只要达到窗口计算触发时间并且窗口没关闭,来数据立即触发计算

5.窗口关闭后,再来属于关闭窗口的数据,只能进入侧输出流

更多推荐

Flink 1. 13(二)时间和窗口

本文发布于:2024-03-08 20:08:39,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1722162.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:窗口   时间   Flink

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!