入门"/>
SSS —— Spark Structured Streaming 之理解与入门
前言:
最近一直忙着写项目的事情,有两天没写博客了,今天抽空来写一下博客,刚好最近也都在用Spark的这个东西,也算做一下笔记吧;唉~ Structured Streaming的学习之路可谓是步履艰辛啊,因为这个东西是Spark还没退出多久的一个概念,网上基本没有什么教程,在官方的文档上也只有一个 WordCount 的示例;博主在学习这个东西的时碰到问题,也基本上都是在StackOverflow 上面提问,哈哈哈 那上面的人挺有意思的说话又好听,各个都是人才.还是要提一句要搞这个东西还是建议使用Spark的最新版本吧,毕竟这个东西现在还不是很完善,也一直在更新,很多方法函数都是新版本才有.最后进入今天的正题吧.
一. 什么是Structured Streaming
- 官网的解释:
Structured Streaming 是基于Spark SQL引擎构建的可伸缩且容错的流处理引擎。您可以像对静态数据进行批处理计算一样来表示流计算。当流数据继续到达时,Spark SQL引擎将负责递增地,连续地运行它并更新最终结果。您可以在Scala,Java,Python或R中使用Dataset / DataFrame API来表示流聚合,事件时间窗口,流到批处理联接等。计算在同一优化的Spark SQL引擎上执行。最后,系统通过检查点和预写日志来确保端到端的一次容错保证。简而言之,Structured Streaming 提供了快速,可扩展,容错,端到端的精确一次流处理,而用户无需推理流。
在内部,默认情况下,结构化流查询是使用微批量处理引擎处理的,该引擎将数据流作为一系列小批量作业进行处理,从而实现了低至100毫秒的端到端延迟以及一次精确的容错保证。但是,从Spark 2.3开始,我们引入了一种称为“ 连续处理”的新低延迟处理模式,该模式可以实现一次最少保证的低至1毫秒的端到端延迟。在不更改查询中的Dataset / DataFrame操作的情况下,您将能够根据应用程序需求选择模式。 - 我的理解:
Structured Streaming 从其第一个单词 ‘Structured’ 就可以知道他是面向结构化的,什么是结构化,向我们平时这种存放在关系型数据库啊这种存放的数据,有一定的模式对它进行约束的我们就叫这种数据为结构数据,什么是非结构化数据呢,就比如平时的浏览记录啊,服务器日志这种数据,它们的数据格式没有特定的模式,这种就叫做非结构化数据;说了这么多你就把结构化数据理解为表(table)就可以了.
再来看Structured Streaming中的第二的单词 ‘Streaming’,它表示的是流,我们学过Java的都知道,Java在对文件进行读写操作的时候也是基于流来实现的,那就叫做IO流,流通俗点来讲就像河中的水一样,一直都在流动的,就好比流水线上的工人,这个该懂了吧,我们的商品从生产出来,就一直在流水线上进行移动,工人在这个流水线上干着固定的工作,各司其职,商品在这个环节被这个工人加工后,它又会被放回流水线,到下一个工人那儿进行下一次加工,最后当全部流程都走完商品也就完工了.这就是流的概念.
好,再回到Structured Streaming上来,它的全称就是结构化流,我们 ‘生产车间’ 上的这个商品就换成了结构化数据(表),
结构化数据就是我们要处理的对象,每次流进来的就是一张表,这个就是 Structured Streaming 的核心;还有一个关键地方和其他流处理不一样的概念就是,Structured Streaming支持事件窗口,也就是它可以将一个时间段的数据解析成一张表传入进来,然后你对这张表进行操作.
二. Word Count 入门示例
- 正所谓说在多也没用,给我看看你的代码我就知道了,那我们就先来从最简单的Word Count入门结构化流的编程
- 实现 Structured Streaming 的入口很简单几乎和Spark SQL 一致
- 首先还是要导入必要的包,还要初始化有一个Spark Session 对象出来
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import splitspark_session = SparkSession \.builder \.appName("WordCountTest") \.getOrCreate()
- 创建DataFrame,这里就稍微和Spark SQL不一样了,这里通过从指定的源读取数据,这些源可以是,kafka,socket…
我们这里指定socket源就可以了,因为最简单也方便测试.
# 我们指定读入流数据,从socket中读取
source_df = spark \.readStream \.format("socket") \.option("host", "localhost") \.option("port", 9999) \.load()
# 返回的是一个下面这种结构的 DataFrame
'''
+------------------------+
| value |
+------------------------+
| socket data |
+------------------------+
'''
- 有了DataFrame(表)我们就可以使用Spark SQL 中的静态方法对这个DataFrame做查询了
# 这里先用split函数对每一行数据进行分割
'''
+--------------------+ +-------------------------+
| value | split(source_df.value,' ') | value |
+--------------------+ ---------------------------> +-------------------------+
| Bob Nick Nina | | ['Bob','Nick','Nina'] |
+--------------------+ +-------------------------++-----------+| value |+-----------+| Bob |explode() +-----------+
-------------> | Nick |+-----------+| Nina |+-----------+
最后再将 value列名 改名为word
'''
words = source_df.select(explode(split(lines.value, " ")).alias("word")
)
# 通过对word字段进行分组,然后利用聚合函数count实现统计单词的个数
wordCounts = words.groupBy("word").count()
'''
+--------+-------+
| word | count |
+--------+-------+
| Bob | 1 |
+--------+-------+
| Nick | 1 |
+--------+-------+
| Nina | 1 |
+--------+-------+
'''
- 然后我们要把上面的查询,指定一个输出流,也就是要把整个查询构建成一个完整查询任务
# 指定写入流,输出的模式为全模式(待会儿后面会讲),指定输出的目的地为控制台
query_task = wordCounts \.writeStream \.outputMode("complete") \.format("console") \.start()
- 最后,也是重中之重,前面只是把一个任务构建好了,但实际上还是没有运行的要通过下面的方法,才能将上面的任务真正的运行起来.
# 这样程序就会阻塞在这里,一直等待 query_task 终止才会结束
query_task.awaitTermination()
- Word Count程序已经编写完成,但还别慌着运行,我们还得先准备socket的数据,要准备socket的数据很简单,你可以通过python的socket模块自己编写一个,现在有更简单的一个办法就是使用netcat命令进行测试
- 在终端中执行命令:
nc -lp 9999
# 先不要关闭终端 等下会用
- 运行 Word Count 程序
- 并在前面执行netcat命令的终端中输入一些句子
I love the world
I love life
Hello word
- 然后你就会在运行Word Count 程序的控制台中发现下面的内容:
------------------------------------
------------------------------------
batch 1
+----------------+-----------------+
| word | count |
+----------------+-----------------+
| I | 1 |
+----------------+-----------------+
| love | 1 |
+----------------+-----------------+
| the | 1 |
+----------------+-----------------+
| world | 1 |
+----------------+-----------------+------------------------------------
------------------------------------
batch 2
+----------------+-----------------+
| word | count |
+----------------+-----------------+
| I | 2 |
+----------------+-----------------+
| love | 2 |
+----------------+-----------------+
| the | 1 |
+----------------+-----------------+
| world | 1 |
+----------------+-----------------+
| life | 1 |
+----------------+-----------------+------------------------------------
------------------------------------
batch 3
+----------------+-----------------+
| word | count |
+----------------+-----------------+
| I | 2 |
+----------------+-----------------+
| love | 2 |
+----------------+-----------------+
| the | 1 |
+----------------+-----------------+
| world | 2 |
+----------------+-----------------+
| life | 1 |
+----------------+-----------------+
| hello | 1 |
+----------------+-----------------+
三. 编程模型
- 基本概念:
Structured Streaming的核心思想就是将一张表变成了动态的表,也就是说这张表是无状态的,他的行数是不确定的,会随着时间的偏移,表的长度会发生变化,它会将后面到达的数据添加到这张表的末尾.我们可以在这张表上做跟静态表相同的查询工作,是在这张表上做增量查询;
它的示意图如下:
它其实是通过触发器来空值整个窗口时间的,就比如触发器上一次触发时到触发器这一次触发,中间这段时间间隔内的数据,Structured Streaming 会将对这段数据进行一次查询并将这段数据作为新的行添加到历史表的末尾,然后将合并后的结果表进行输出(注意这里是在 Complete 模式下才会这样)
在Complete模式下,我们的触发器设置为1秒,那么它工作图就可以用下图来进行表示:
-
三种输出模式:
还记得前面我们运行的word count 程序吗,其中构建query_task 的时候我们指定了一个Complete模式即全模式,在这个模式下每一次查询会将历史的记录也一并查出来,显示一张完整的表,这样就够了吗,当然肯定是不够的,所以还另外的两种模式.下面就是这三种模式的一些概述:
(1) complete (完整模式) -整个更新的结果表将被写入外部存储器。由存储连接器决定如何处理整个表的写入。
(2) append (追加模式) -仅将自上次触发以来追加在结果表中的新行写入外部存储器。这仅适用于预期结果表中现有行不会更改的查询。
(3) update (更新模式) -仅自上次触发以来在结果表中已更新的行将被写入外部存储(自Spark 2.1.1才可用)注意,这与完成模式的不同之处在于此模式仅输出自上次触发以来已更改的行。如果查询不包含聚合,它将等同于追加模式。 -
完整模式讲解:
因为我们前面的word count程序是根据完整模式来运行的所以理解起来应该也相对容易一些
上面这个就是word count 程序运行的示意图,第一个我从socket流中读入的DataFrame (source_df),它被称为输入表,最后我们通过分组和聚合得到的DataFrame (wordCounts) 他被称为结果表,在还没有启动时通过 source_df 查询出来的 wordCounts DataFrame是没有变化的应为此时还是一个静态的DataFrame,当一旦运行以后,一有新数据的到来,spark会对其做一次’增量查询’,该查询将先前的运行计数与新数据结合起来以计算更新的计数.需要注意的是,它从流数据源读取最新的可用数据,对其进行增量处理以更新结果,然后丢弃该源数据。它仅保留更新结果所需的最小中间状态数据(例如,前面示例中的中间计数)
四. 支持的输入源
Spark Structured Streaming 内置了四种输入源:
- File Source:
读取写入目录的文件作为数据流。支持的文件格式为text,csv,json,orc,parquet。有关最新列表以及每种文件格式的受支持选项,请参见DataStreamReader界面的文档。请注意,文件必须原子地放在给定目录中,在大多数文件系统中,这可以通过文件移动操作来实现。 - Kafka Source:
从Kafka读取数据,支持0.10.0或更高版本的Kafka - Socket Source:
这个一般用于测试使用,因为这不能提供端到端的容错保证
从套接字连接读取UTF8文本数据。监听服务器套接字位于驱动程序处. - Rate Source:
以每秒指定的行数生成数据,每个输出行包含timestamp和value。where timestamp是Timestamp包含消息分发时间的类型,并且value是Long包含消息计数的类型,从第一行的0开始。此源旨在进行测试和基准测试。
下面是各种输入源的一些配置参数:
Source | 参数配置 | 是否支持容错 |
---|---|---|
File Source | • path:输入目录的路径,并且对所有文件格式都是公用的。 • maxFilesPerTrigger:每个触发器要考虑的最大新文件数(默认值:no max) • latestFirst:是否首先处理最新的新文件,当有大量积压文件时(默认值:false)有用 • fileNameOnly:是否根据以下内容检查新文件仅文件名而不是完整路径(默认值:false)。设置为“ true”时,以下文件将被视为同一文件,因为它们的文件名“ dataset.txt”是相同的: “ file:///dataset.txt” “ s3:// a / dataset.txt“ ” s3n://a/b/dataset.txt“ ” s3a://a/b/c/dataset.txt“ | 是 |
Socket Source | host:要连接的主机,必要参数 port:要连接的端口,必要参数 | 否 |
Kafka Source | • kafka.bootstrap.servers 配置kafka集群的地址,必要参数 • subscribe 消费的主题名,必要的参数 | 是 |
Rate Source | • rowsPerSecond(例如100,默认值:1):每秒应生成多少行。 • rampUpTime(例如5s,默认值:0s):在生成速度变为rowsPerSecond之前,需要多长时间提升。使用比秒更细的粒度将被强转为为整数秒。 • numPartitions(例如10,默认值:Spark的默认并行性):生成的行的分区号。 源将尽最大努力达到目标rowsPerSecond,但是查询可能受到资源的限制,并且numPartitions可以进行调整以帮助达到所需的速度。 | 是 |
五.支持的接收器
Spark Structured Streaming 支持下面的六种接收器:
Sink | 支持输出模式 | 配置参数 | 容错 |
---|---|---|---|
File Sink | append | path:也就是输出文件的地址。 支持对分区表的写入。 可以按照时间进行分区 | 是(但是只有一次) |
Kafka Sink | append, update, complete | 参数和前面输入源的类似 | 是(至少一次) |
foreach sink | append, update, complete | 会在其内部传入一个自定义的类,然后这个类中的process() 方法会将结果表中的每行传入进来进行处理 | 是(至少一次) |
foreachBatch Sink | append, update, complete | 这个和前面一个差不多,只不过这个接收器需要传入的是一个函数对象, 在运行过程中,spark会自行调用此函数,将结果表整个以DataFrame的方式进行传入 | 取决与内部实现 |
console Sink | append, update, complete | 这个接收器也就是我们前面所用到的接收器他是将数据打印到控制台 numRows: 控制台最大显示的表长度,默认为20 truncate: 是否截断输出,默认为True | 否 |
Memory SInk | append, complete | 此接收器的作用是将结果表存储在内存当中, 在完成模式下,重新启动的查询将重新创建完整表。 | 否 |
更多推荐
SSS —— Spark Structured Streaming 之理解与入门
发布评论