MapReduce 工作流程总结

编程入门 行业动态 更新时间:2024-10-11 01:20:39

MapReduce <a href=https://www.elefans.com/category/jswz/34/1771434.html style=工作流程总结"/>

MapReduce 工作流程总结

客户端Job提交阶段

一个待处理的数据集首先会进入客户端,最先运行的是Driver类,初始化job配置信息,其中包括自定义分区信息、虚拟存储切片信息、数据输入输出路径、Mapper和Reducer的输入输出KV类型。

接着在Job类中,客户端首先会确定Job的state是否为DEFINE,如果是则处理API的兼容问题,然后开始尝试与Yarn服务器建立连接。如果连接失败则启动本地运行模式。

接下来就开始启动Job提交的准备工作,首先会检查数据输出目录是否存在,如果存在则报错。然后开始创建Job的临时工作目录以及初始化JobID,如果此时是Yarn模式,会在临时工作目录下看到准备提交到集群的jar包,如果是本地模式则跳过。

客户端接下来就开始检查输入的目标文件是否可支持切分,如果不可切分就只输出单独一个Split,如果可切分则开始递归遍历目标目录下的所有文件。每个文件会被单独切片然后统一放进Splits数组,在划分切片信息时,要注意文件大小要超过默认切片大小(默认切片大小为BlockSize,本地模式默认32M,具体大小可以根据配置信息修改上下限)的1.1倍才会被切分。Splits的个数等于MapTask个数。并且Splits会按切片大小进行排序,大的切片在前。

而虚拟存储切片(CombineTextInputFormat)可以让客户端自己定义具体切片如何划分而不采用系统默认的TextInputFormat

还需要说明的是FileInputFormat机制是简单地将数据按行切割,Key为行偏移量,Value为整行数据。以下是FileInputFormat、TextInputFormat、CombineTextInputFormat的继承关系。真正切割数据是在Mapper阶段进行,Job提交阶段只是进行切片信息规划。

然后Job会在临时工作目录下创建包含所有有关该Job的配置信息的xml文件。之后将Job的state置为Running,将临时工作目录的资源提交至集群,并删除该临时工作目录。


Mapper阶段

AM(ApplicationMaster)在接收到从Client提交的Job信息后,会根据Splits开启与之数量一致的MapTask,进行数据的并行处理,这时候FileInputFormat就开始真正对Job提交的源数据文件进行切割读取。

在Mapper类的map方法中,输入数据的KV(key value)类型分别是LongWritable和Text类型(MR内置的序列化数据类型)对应着被分割的Job提交的源文件数据的行偏移量和行数据。而且是每行数据都执行一次map方法,该对象成员变量数据不会发生改变(说明Mapper就创建了一个对象然后不停地调用该对象的map方法)

此时通过重写的map方法就可以进行真正的业务逻辑处理,但同时也需要定义好输出的KV(key value)泛型,通过context.write写出。如果该KV类型需要根据业务逻辑进行重新自定义,则可以让其实现MR提供的Writable接口,然后重写write和readFields两个方法。之所以这么做还是因为要将对象数据进行序列化通过网络传递给Reducer进行后续处理。这里重写的时候要注意序列化和反序列化对象成员的顺序必须一致,还必须要创建一个空参构造函数。

    @Overridepublic void write(DataOutput out) throws IOException {out.writeLong(upFlow);out.writeLong(downFlow);out.writeLong(sumFlow);}@Overridepublic void readFields(DataInput in) throws IOException {this.upFlow = in.readLong();this.downFlow = in.readLong();this.sumFlow = in.readLong();}

在context.write()写出KV(key value)数据后,会进行数据的分区处理。默认的分区数为1,也可以自定义分区数,还可以自定义分区类进行数据分区处理。还可以根据业务逻辑自定义Combiner类进行数据的合并,例如数据求和,这样到ReduceTask的values集合就只有一个合并后的值了。

默认分区为1时,所有的数据都只会被存进该分区文件中;而只在Driver类中定义分区数,MR会根据数据的Key取hashCode对分区数取余来划分数据,这对于大部分业务逻辑显然是不可接受的,所以就还有一种自定义分区类(需要继承Partitioner类重写getPartition方法)的方法来指定数据存储于哪个分区文件中,然后还要在Driver类中指定该自定义分区类和分区数。要注意分区号要从零步长为1递增,否则报IO异常。还要注意定义的Partition数要和Driver中定义的ReduceTask数一致。Partition数等于ReduceTask数。

还要注意Partition的泛型类要与Mapper的输出KV类型一致。


Shuffle阶段

在Map方法之后,Reduce方法之前的数据处理过程被称为Shuffle。 

从Map方法写出的数据会进入如上图的内存的环形缓冲区(循环队列),一半空间用来写入KV数据的索引,另一半空间用来写入KV数据。这里存在一个80%阈值机制,当写入的数据占据整个缓冲区的80%时会开启另一个线程,用来进行反向逆写。也就是此时存在两个线程,一个线程将已经写入缓冲区80%的数据溢写进磁盘中,但在溢写前会进行一次快速排序(按照Key的字典顺序);另一个反向逆写线程就从剩下的20%空间反向继续写入新的从Map方法传递来的数据。这里存在一个问题:如果该反向逆写线程写入新数据的速度比溢写线程写出旧数据的速度更快,就有可能会造成旧数据的丢失(被反向逆写线程的新数据覆盖)。对于该问题,MR会暂时地暂停反向逆写线程以等待溢写线程继续写出旧数据于磁盘中。这种双线程同时写入写出内存数据能有效利用时间提升数据的处理效率。

不断重复写入溢写的过程,会在磁盘中得到多个相同的分区块,此时Mapper也会对磁盘数据进行归并排序。当所有数据都写入完毕后,此时Mapper还会对所有的数据进行归并排序。有些场景下Combiner会对分区中相同的key的value进行归并。接下来就是Reducer的处理时间。


Reducer阶段

在Mapper对一些分区归并排序后,Reducer就已经可以主动向Mapper拉取数据。AM会根据Partition的数量创建相同数量的ReduceTask进行数据并行处理。ReduceTask从MapTask远程拷贝的数据会先进入内存缓冲区中(一个ReduceTask处理一个分区),如果文件大小超出内存阈值则会先把相同分区的数据合并后再溢写到磁盘中。当磁盘中的文件数目超出一定阈值后也会先将相同分区的数据进行合并,然后再进行文件的归并排序。当所有的文件都拷贝完毕后,ReduceTask会对所有文件进行一次归并排序。如果key是自定义的序列化类型,则可以采取分组排序的方法。

由于默认的排序是对key按字典排序,我们也可以让key通过继承Comparable接口来对compareTo方法进行重写,进行二次排序。

Reduce方法读取分区数据时是以一次读一组数据(相同key)的规则读入。输入reduce方法中的KV类型的key是Map方法输出的key类型,而values参数则是相同key的value集合。接下来就可以继续写对应的业务逻辑,然后context.write写出。写出时会调用数据的toString方法进行格式化输出。

如果要指定数据输出文件,可以自定义OutputFormat和RecordWriter类,通过分别重写getRecordWriter和write、close方法控制最后输出文件的结果。此时不管有几个ReduceTask(分区)都不会影响最后的输出文件结果。自定义OutputFormat和自定义分区还是有一点区别,在自定义分区后,分区数决定了ReduceTask的个数,输出数据的时候一个分区对应着一个输出文件;自定义OutputFormat相当于是自定义输出文件数量和文件名。不过两者都能改变最后对数据的存储,依照业务逻辑进行数据隔离处理。

更多推荐

MapReduce 工作流程总结

本文发布于:2024-03-11 18:20:27,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1729596.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:工作流程   MapReduce

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!