RDD算子操作(基本算子和常见算子)

编程入门 行业动态 更新时间:2024-10-28 09:26:19

RDD<a href=https://www.elefans.com/category/jswz/34/1748093.html style=算子操作(基本算子和常见算子)"/>

RDD算子操作(基本算子和常见算子)

目录

一、基本算子

        1.map算子

        2.flatMap算子

        3.filter算子

         4.foreach算子

        5.saveAsTextFile算子

        6.redueceByKey算子

二、常用Transformation算子       

         1.mapValues算子

        2.groupBy算子

        3.distinct算子

        4.union算子

        5.join算子

        6.intersection算子

        7.glom算子

        8.groupByKey算子

        9.sortBy算子

        10.sortByKey算子

三、常用Action算子

        1.countByKey算子

        2.collect算子

        3.reduce算子

        4.takeSample算子

        5.takeOrdered算子

四、分区操作算子

        1.mapPartitions算子

        2.foreachPartition算子

        3.partitionBy算子

        4.repartition算子和coalesce算子


一、基本算子

        RDD中map、filter、flatMap及foreach等函数为最基本算子,都是都RDD中每个元素进行操作,将元素传递到函数中进行转换。

        1.map算子

        map(f:T=>U): RDD[T]=>RDD[U],表示将RDD经由某一函数f后,转变为另一个RDD。

        功能:map算子,是将RDD的数据一条条处理(处理的逻辑 基于map算子中接受的处理函数),返回新的RDD。

#cording:utf-8
from pyspark import SparkConf,SparkContextif __name__ == "__main__":# 构建SparkContext对象conf = SparkConf().setAppName('test').setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5,6],3)# 定义方法,作为算子的传入函数体def add(data):return data * 10print(rdd.map(add).collect())# 更简单的方式 是定义lambda表达式来写匿名函数print(rdd.map(lambda data:data * 10).collect())'''对于算子的接受函数来说,两种方法都可以lambda表达式 适用于 一行代码就搞定的函数体,如果是多行,需要定义独立的方法
'''
        2.flatMap算子

        flatMap(f:T=>Seq[U]): RDD[T]=>RDD[U]),表示将RDD经由某一函数f后,转变为一个新的 RDD,但是与map 不同,RDD中的每一个元素会被映射成新的0到多个元素(f 函数返回的是一个序列Seq)。

        功能:对RDD执行map操作,然后进行解除嵌套操作

#cording:utf-8
from pyspark import SparkConf,SparkContextif __name__ == "__main__":# 构建SparkContext对象conf = SparkConf().setAppName('test').setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize(["hadoop hadoop spark","spark hadoop hadoop","hadoop flink spark"])#得到所有的单词,组成RDDrdd2 = rdd.map(lambda line: line.split(" "))rdd3 = rdd.flatMap(lambda line: line.split(" "))print(rdd2.collect())print(rdd3.collect())
        3.filter算子

        filter(f.T=>Bool): RDD[T]=>RDD[T],表示将 RDD经由某一函数f后,只保留f返回True的数据,组成新的RDD。

        功能:过滤想要的数据进行保留,返回值是True的数据保留,返回值是False的数据则会被丢弃。

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setAppName('test').setMaster('local[*]')sc = SparkContext(conf=conf)# 通过filter算子过滤奇数rdd = sc.parallelize((1,2,3,4,5,6,7,8,9,10))result_rdd = rdd.filter(lambda x: x % 2 == 1)print(result_rdd.collect())

         4.foreach算子

       foreach(func),将函数 func应用在数据集的每一个元素上,通常用于更新一个累加器,或者和外部存储系统进行交互,例如 Redis。

        功能:对RDD的每一个元素,执行你提供的逻辑的操作(和map一个意思),但是这个方法没有返回值。

        ps:该算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 5, 4, 2, 3, 6])print(rdd.foreach(lambda x: 10 * x))print('----------------------------------')print(rdd.foreach(lambda x: print(10 * x)))
        5.saveAsTextFile算子

        saveAsTextFile(path:String),数据集内部的元素会调用其 toString方法,转换为字符串形式,然后根据传入的路径保存成文本文件,既可以是本地文件系统,也可以是HDFS等。

        ps:该算子是分区(Executor)直接执行的,跳过Driver,由分区所在的Executor直接执行。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 5, 4, 2, 3, 6])rdd.saveAsTextFile('hdfs://pyspark01/output/out1')

        6.redueceByKey算子

        功能:针对KV型RDD,自动按照key分组,然后根据你提供的聚合逻辑,完成组内数据(value)的聚合操作。

#cording:utf-8
from pyspark import SparkConf,SparkContextif __name__ == "__main__":# 构建SparkContext对象conf = SparkConf().setAppName('test').setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a',1),('b',1),('a',1),('a',1),('b',1),('c',1),('a',1)])#使用reduceByKey函数进行聚合reduce_rdd = rdd.reduceByKey(lambda a,b : a + b).collect()print("聚合结果:",reduce_rdd)

二、常用Transformation算子       

         1.mapValues算子

        功能:针对二元元组RDD,对其内部的二元元组的Value执行map操作。

#cording:utf-8
from pyspark import SparkConf,SparkContextif __name__ == "__main__":# 构建SparkContext对象conf = SparkConf().setAppName('test').setMaster("local[*]")sc = SparkContext(conf=conf)rdd = sc.parallelize([('a',2),('b',11),('a',1)])#使用map函数map_rdd = rdd.map(lambda x: (x[0],x[1]*10)).collect()print("结果:",map_rdd)# 使用mapValue函数value_rdd = rdd.mapValues(lambda value: value*10).collect()print("结果:",value_rdd)

        2.groupBy算子

        功能:将RDD数据进行分组。

#cording:utf8from pyspark import SparkConf,SparkContextif __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)# 创建数据test_rdd = sc.parallelize([('a',1),('b',1),('a',2),('b',2),('b',3)])# 通过groupBy函数对数据进行分组# groupBy函数传入函数的意思是:通过这个函数,来确定按照谁来分组(返回谁即可)# 分组规则和SQL一致:也就是相同的在同一个组(Hash分组)result_1 = test_rdd.groupBy(lambda t: t[0])result_2 = result_1.map(lambda t: (t[0],list(t[1])))print(result_1.collect())print(result_2.collect())

        3.distinct算子

        功能:对RDD数据进行去重复,返回新的RDD。

#cording:utf8from pyspark import SparkConf,SparkContextif __name__ == '__main__':conf = SparkConf().setAppName('test').setMaster('local[*]')sc = SparkContext(conf=conf)rdd_1 =  sc.parallelize((1,2,1,2,3,4,5,6))rdd_2 = sc.parallelize([('a',1),('b',1),('a',1),('a',1),('b',1),('c',1),('a',1)])# 使用distinct算子进行去重print('数字:',rdd_1.distinct().collect())print('元组:',rdd_2.distinct().collect())

        4.union算子

        功能:将两个RDD合并成一个RDD返回。只合并不去重,RDD的类型不同也是可以合并的。

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setAppName('test').setMaster('local[*]')sc = SparkContext(conf=conf)# 通过union算子合并RDDrdd_1 = sc.parallelize((1,2,3,4,5))rdd_2 = sc.parallelize((6,7,8,9,10))print(rdd_1.union(rdd_2).collect())

        5.join算子

        功能:对两个RDD执行join操作(可实现SQL外/内连接),join算子只能用于二元元组。

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setAppName('test').setMaster('local[*]')sc = SparkContext(conf=conf)rdd1 = sc.parallelize([(1001,'zhangsan'),(1002,'lisi'),(1003,'wangwu'),(1004,'zhaoliu')])rdd2 = sc.parallelize([(1001,'销售部'),(1002,'科技部')])# 通过join算子来进行rdd之间的关联# 对于join算子来说,关联条件按照二元元组的key来进行关联print(rdd1.join(rdd2).collect())# 左外连接,右外连接可以更换一下rdd的顺序或者调用rightOuterJoin即可print(rdd1.leftOuterJoin(rdd2).collect())

        6.intersection算子

        功能:求两个RDD的交集,返回一个新的RDD。

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setAppName('test').setMaster('local[*]')sc = SparkContext(conf=conf)rdd1 = sc.parallelize([('a',1),('b',3)])rdd2 = sc.parallelize([('a',1),('c',1)])# 通过intersection算子求出RDD的交集 取出并返回新的RDDprint(rdd1.intersection(rdd2).collect())

        7.glom算子

        功能:将RDD的数据,加上嵌套,这个嵌套按照分区来进行,比如RDD数据[1,2,3,4,5]有两个分区,那么glom后,数据变成:[[1,2,3],[4,5]]。

#corfding:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setAppName('test').setMaster('local[*]')sc = SparkContext(conf=conf)rdd1 = sc.parallelize([1,2,3,4,5,6,7,8,9,10])print(rdd1.glom().collect())# 解嵌套操作print(rdd1.glom().flatMap(lambda x: x).collect())

        8.groupByKey算子

        功能:针对KV型RDD,自动按照key分组。

#cording:utf8from pyspark import SparkConf,SparkContextif __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)# 创建数据test_rdd = sc.parallelize([('a',1),('b',1),('a',2),('b',2),('b',3)])# 使用groupByKey算子result_1 = test_rdd.groupByKey()#查看结果result_2 = result_1.map(lambda t: (t[0],list(t[1])))print(result_1.collect())print(result_2.collect())

        9.sortBy算子

        功能:对RDD数据进行排序,基于你指定的排序依据。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([('c',3),('f',1),('b',11),('c',3),('e',1),('n',9),('a',1)],3)# 使用sortBy对RDD执行排序# 按照value 数字进行排序# 参数1函数:表示的是,告知spark,按照数据的哪个列进行排序# 参数2:True表示升序 False表示降序# 参数3:排序的分区数'''注意:如果要全局有序,排序分区数设置为1'''print('按照value排序:',rdd.sortBy(lambda x: x[1], ascending=True, numPartitions=3).collect())# 按照key进行排序print('按照key排序:',rdd.sortBy(lambda x: x[0], ascending=True, numPartitions=3).collect())
        10.sortByKey算子

        功能:针对KV型RDD,按照Key进行排序

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([('a',1),('E',1),('C',1),('D',1),('b',1),('g',1),('h',1),( "y" ,1),('u',1),('i',1),('o',1),('p',1),( 'm',1),('n',1),('L',1),('k',1),('f',1)],3)# 根据字母的小写排序print(rdd.sortByKey(ascending=True, numPartitions=1, keyfunc=lambda key: key.lower()).collect())

三、常用Action算子

        1.countByKey算子

        功能:统计key出现的次数(一般适用于KV型RDD)

import json
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.textFile('../input/words.txt')rdd2 = rdd.flatMap(lambda x: x.split(' ')).map(lambda x: (x,1))# 通过countByKey来对key进行计数,这是一个Action算子result = rdd2.countByKey()print(result)print(type(result))

        2.collect算子

        功能:将RDD各个分区的数据,统一收集到Driver中,形成一个list对象。这个算子,是将RDD各个分区数据都拉取到Driver,注意的是,RDD是分布式对象,其数据量可以很大,所以用这个算子之前,要心知肚明的了解结果数据集不会太大,不然,会把Driver内存撑爆。

        3.reduce算子

        功能:对RDD数据集按照你传入的逻辑进行聚合。

import json
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5,6])print(rdd.reduce(lambda a,b: a+b))

        4.takeSample算子

        功能:随机抽样RDD数据,随机数种子数字可以随便传,如果传同一个数字,那么取出的结果是一致的。一般参数三不传,spark会自动给与一个随机的种子。


from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10,1,2])print('True:',rdd.takeSample(True,22))print('False:',rdd.takeSample(False,22))print('无随机种子1:',rdd.takeSample(True,5))print('无随机种子2:', rdd.takeSample(True, 5))print('有随机种子1:',rdd.takeSample(True,5,1))print('有随机种子2:', rdd.takeSample(True, 5, 1))

        5.takeOrdered算子

        功能:对RDD进行排序取前N个。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([1,5,4,2,3,6])print('普通:',rdd.takeOrdered(3))# 函数操作只会对结果产生影响,不会影响数据本身print("传入函数:",rdd.takeOrdered(3, lambda x: -x))

四、分区操作算子

        1.mapPartitions算子

        功能:与map功能相似,但区别是,mapPartition一次被传递的是一整个分区的数据,是作为一个迭代器(一次性list)对象传入过来,而map是一个一个数据的传递。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)def process(iter):result = list()for it in iter:result.append(it * 10)return result# mapPartitions算子相比于map算子,节省了大量打IO操作,每一个分区只需要进行一次IO操作即可print('输出结果:',rdd.mapPartitions(process).collect())

        2.foreachPartition算子

        功能:和普通的foreach一致,一次处理的是一整个分区的数据。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)def process(iter):result = list()for it in iter:result.append(it * 10)print(result)rdd.foreachPartition(process)

        3.partitionBy算子

        功能:对RDD进行自定义分区操作。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([('hadoop',1),('hadoop',1),('hello',1),('spark',1),('flink',1),('spark',1)])# 使用partitionBy自定义分区def process(x):if 'hadoop' == x or 'hello' == x:return 0if 'spark' == x:return 1return 2# 使用glom算子将每个分区的数据进行嵌套print('显示分区:',rdd.partitionBy(3, process).glom().collect())

        4.repartition算子和coalesce算子

        功能:对RDD的分区执行重新分区(仅数量)

        ps:对分区的数量进行操作,一定要慎重,一般情况下,我们写Spark代码除了要求全局排序设置为1个分区外多数时候,所有API中关于分区相关的代码我们都不太理会。因为,如果你改分区了,会影响并行计算(内存迭代的并行管道数量)后面学分区如果增加,极大可能导致shuffle。

#cording:utf8
from pyspark import SparkConf,SparkContext
if __name__ == '__main__':conf = SparkConf().setMaster('local[*]').setAppName('test')sc = SparkContext(conf=conf)rdd = sc.parallelize([1, 5, 4, 2, 3, 6],3)# repartition 修改分区# 减少分区print("减少分区为1:",rdd.repartition(1).getNumPartitions())# 增加分区print("增加分区为5:", rdd.repartition(5).getNumPartitions())# coalesce 修改分区# 减少分区print("减少分区为1:",rdd.coalesce(1).getNumPartitions())# 增加分区 ps:coalesce增加分区数量需要指定参数shuffle为True才能1成功修改print("减少分区为5:", rdd.coalesce(5).getNumPartitions())print("减少分区为5:",rdd.coalesce(5, shuffle=True).getNumPartitions())

更多推荐

RDD算子操作(基本算子和常见算子)

本文发布于:2023-12-04 23:45:46,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1662472.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:算子   常见   操作   RDD

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!