i. hadoop出现的背景
i. hadoop三大子系统组成
三 MapReduce编程基础
MapReduce的概念思想和基本程序框架
ii. WordCount的基本逻辑
iii. 利用MapReduce进行数据统计
iv. 利用MapReduce计算最大最小值
v. 利用MapReduce进行数据去重
vi. 利用MapReduce求平均值
vii. 利用MapReduce求分布
viii. 利用MapReduce排序
ix. 利用MapReduce建索引
x. 利用MapReduce求TopK
xi. 读取Hadoop配置文件的方法
Hadoop中的组件是通过Hadoop自己的配置API来配置的。一个Configuration类的实例(可以在.apache.hadoop.conf包中找到)代表配置属性及其取值的一个集合。
每个属性由一个String来命名,而值的类型可以是多种类型之一,包括Java的基本类型(如boolean, int, long和float),其它有用的类型(如String、Class和java.io.FIle)及String集合。
<?xml version="1.0"?>
<!-- configuration-1.xml -->
<configuration><property><name>color</name><value>yellow</value><description>Color</description></property><property><name>size</name><value>10</value><description>Size</description></property><property><name>weight</name><value>heavy</value><final>true</final><description>Weight</description></property><property><name>size-weight</name><value>${size},${weight}</value><description>Size and weight</description></property>
</configuration>
代码:
public class ConfigurationExample {public static void main(String[] args) {Configuration conf = new Configuration();conf.addResource("configuration-1.xml");System.out.println(conf.get("color")); // "yellow"System.out.println(conf.getInt("size", 0)); // 10System.out.println(conf.get("breadth", "wide")); // "wide"}}
可以使用-conf命令开关来使用各种配置,如:
hadoop fs –conf conf/Hadoop-localhost.xml –ls .
如果省略-conf选项,可以从$HADOOP_HOME的etc/hadoop子目录中找到Hadoop的配置信息。
或,如果设置了HADOOP_CONF_DIR,Hadoop的配置信息将从那个位置读取
运行hadoop任务时可以在命令行通过-D来输入配置参数,注意-D的优先级是最高的,通常用于配置一些任务相关的参数。
控制reducer的数量:
-Dmapreduce.job.reduces=n
配置项优先级:
-D > 用户自定义配置文件 > hadoop系统自身配置文件
优先级高的配置项覆盖优先级低的配置项
xii. 写MapReduce单元测试的方法
@Testpublic void processesValidRecord() throws IOException, InterruptedException {Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" + "99999V0203201N00261220001CN9999999N9-00111+99999999999");new MapDriver<LongWritable, Text, Text, IntWritable>().withMapper(new MaxTemperatureMapper()).withInput(new LongWritable(0), value).withOutput(new Text("1950"), new IntWritable(-11)).runTest();}
map程序:
public void map(LongWritable key, Text value, Context context)throws IOException, InterruptedException {String line = value.toString();String year = line.substring(15, 19);int airTemperature = Integer.parseInt(line.substring(87, 92));context.write(new Text(year), new IntWritable(airTemperature));}
xiii. PageRank和PeopleRank算法的原理
xiv. PeopleRank算法的程序逻辑
a) 6学时:6理论
b) 重点:
i. 分布式文件系统的基本特点
ii. hdfs的块存储结构
HDFS采用master/slave结构模型
HDFS的块比系统的块大得多,默认为64MB。
HDFS中小于一个块大小的文件不会占据整个块的空间。
从2.7.3版本开始,block size由64 MB变成了128 MB的。
iii. Namenode和Datanode的功能与作用
Namenota 文件系统命名空间
整个文件系统的管理节点
维护着文件系统树和整棵树内所有文件和目录
文件/目录的元信息和每个文件对应的数据块列表
接收用户的操作请求
NameNode维护着2张表:
文件系统的目录结构,以及元数据信息
文件与数据块(block)列表的对应关系
namenode安全模式 打开hadoop自动进入
datanode 创建 删除,复制
提供真实文件数据的存储服务。
文件块(block):最基本的存储单位。
HDFS默认Block大小是64MB,以一个256MB文件,共有256/64=4个Block. 不同于普通文件系统的是,HDFS中,如果一个文件小于一个数据块的大小,并不占用整个数据块存储空间。
Replication。多复本。默认是三个。
iv. 通过URL和FileSystem查询HDFS数据的方式
InputStream in = null;try {in = new URL("hdfs://192.168.29.3:9000/input/word").openStream();IOUtils.copyBytes(in, System.out, 4096, false);} finally {IOUtils.closeStream(in);}String uri = "hdfs://zzti:9000/input/word";Configuration conf = new Configuration();FileSystem fs = FileSystem.get(URI.create(uri), conf);FSDataInputStream in = null;try {in = fs.open(new Path(uri));IOUtils.copyBytes(in, System.out, 4096, false);in.seek(10);IOUtils.copyBytes(in, System.out, 4096, false);} finally {IOUtils.closeStream(in);}
v. 通过FileSystem向HDFS写入数据的方式
读取hdfs文件
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 4096, false);
本地copy文件到hdfs
String localSrc = “D:/aaa.txt”;
String dst = “hdfs://192.168.29.3:9000/word/aa.txt”;
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(dst), conf);
OutputStream out = fs.create(new Path(dst), new Progressable() {
public void progress() {
System.out.print(".");
}
});
IOUtils.copyBytes(in, out, 4096, true);
列出文件:
FileStatus[] status = fs.listStatus(paths);
vi. 文件模式匹配符的!书写规范
vii. 路径过滤器的使用
viii. HDFS数据删除的方法
ix. 从Mysql向HDFS导入数据的方法
a) 4学时:4理论
b) 重点:
YARN是Hadoop2.0版本新引入的资源管理系统,直接从MR1演化而来,是作业调度和集群资源管理的一个框架。
核心思想:将MP1中JobTracker的资源管理和作业调度两个功能分开,分别由ResourceManager和ApplicationMaster进程来实现。
1)ResourceManager:负责整个集群的资源管理和调度。
2)ApplicationMaster:负责应用程序相关的事务,比如任务调度、任务监控和容错等。
YARN的出现,使得多个计算框架可以运行在一个集群当中。
1)每个应用程序对应一个ApplicationMaster。
2)目前可以支持多种计算框架运行在YARN上面比如MapReduce、Storm、Spark、Flink等。
i. Yarn在Hadoop系统中的地位
ii. Yarn的运行机制
Yarn通过两个长期运行的守护进程提供自己的核心服务:
管理集群上资源:资源管理器(resource manager)
运行在集群所有节点上且能启动和监控容器的节点管理器(node manager)
容器用于执行特定应用程序的进程,每个容器都有资源限制(内存、CPU等)
iii. FIFO调度器原理
First-In-First-Out
先按照优先级高低调度
如果优先级相同,则按照提交时间先后顺序调度
如果提交时间相同,则按照(队列或者应用程序)名称大小(字符串比较)调度
任务按照队列的方式调用
小作业被一直阻塞,直到大作业完成
iv. Capacity调度器原理
有一个专门的队列时刻保证小作业一提交就可以执行
换句话说,对大作业而言,集群不能利用到100%,会降低集群利用率,使得大作业执行的时间要长
按照内存资源使用量比率调度
不需要预留资源,调度器会在所有运行的作业之间动态平衡资源。
第一个大作业启动时,它是集群唯一运行的作业,因而获得集群中所有的资源。
第二个小作业启动时,被分配到集群的一半资源。
既得到较高的集群利用率,又能保证小作业及时完成。
v. Fair调度器原理
vi. hadoop队列命名规则
vii. Max-min fairness算法
FairScheduler主要关注“公平”,那么在一个共享的集群中,怎样分配资源才算公平?常用的是max-min fairness算法:
这种策略会最大化系统中一个用户收到的最小分配。如果每一个用户都有足够地请求,会给予每个用户一份均等的资源。尽量不让任何用户被“饿死”。
一个例子:资源总量是10,有3个用户A/B/C,需要的资源分别是5/4/3,应该怎样分配资源?第一轮:10个资源分成3份,每个用户得到3.33第二轮:3.33超出了用户C的需求,超出了0.33,将这多余的0.33平均分给A和B,每个用户得0.16所以最后的分配结果是A=3.49,B=3.49,C=3
上面的例子没有考虑权重,如果3个用户的权重分别是0.5/1/0.8,又应该如何分配资源?第一轮:将权重标准化,3个用户的权重比是5:10:8。将所有资源分成5+10+8=23份,按比例分配给各个用户。A得到105/23=2.17,B得到1010/23=4.35,C得到10*8/23=3.48。第二轮:B和C的资源超出需求了,B超过0.25,C超过0.48。将多出资源分配给A。
最后的分配结果是A=2.9,B=4,C=3由于进位的问题会有些误差。
i. MapReduce任务执行过程中各个组件的作用
客户端
提交MapReduce作业
YARN ResourceManager
负责协调集群上计算机资源的分配
YARN NodeManager
负责启动和监视集群中机器上的计算容器
MapReduce AppMaster
负责协调运行MapReduce作业的任务
HDFS
共享作业文件,提供原始数据,保存最终数据
ii. MapRe客户端
提交MapReduce作业
YARN ResourceManager
负责协调集群上计算机资源的分配
YARN NodeManager
负责启动和监视集群中机器上的计算容器
MapReduce AppMaster
负责协调运行MapReduce作业的任务
HDFS
共享作业文件,提供原始数据,保存最终数据
作业的提交过程
Job的submit()方法创建一个内部的JobSummiter实例,并且调用其submitJobInternal()方法。
提交作业后,客户端通过调用waitForCompletion()方法使用轮询的方式每秒轮询作业的进度,如果发现自上次报告后有改变,便把进度报告给控制台。
作业完成后,如果成功,显示作业计数器;如果失败,则导致作业失败的错误被记录到控制台。
iii. hadoop作业失败处理的级别关系
iv. 作业失败的处理方案
在现实的情况中,用户的程序会因为种种原因失败:
代码错误
进程崩溃
机器故障
超时
Hadoop处理作业失败主要处理的是以下实体的作业失败:
任务
AppMaster
NodeManager
ResourceManager
v. shuffle的主要过程和作用---------------
Map阶段为 INputFormat Mapper biner (合并相同key对应的value减少map的输出)Partitioner(决定map输出交给那个reduce处理)
Reduce阶段分为shufle( ) sort(按照key进行排序) reduce
shuffle和sort同时进行
map:任务进度是已处理输入所占的比例。
reduce:整个过程分为三部分:
完成复制和排序阶段,任务的进度是4/6
如果任务已经执行reducer一半的输入,那么任务的进度是5/6,因为已经完成复制和排序阶段,并且已经完成reduce阶段的一半
reducer所有输入完成,意味着最终完成,进度是是6/6
Partitioner负责控制map输出结局key的分割
Peporter用于MapReduce应用程序的进度报告
OutputCollecter收集Mapper或Reducer输出数据
vi. map输出时数据的排序与归并
vii. reduce输出时对文件的合并
viii. 常见hadoop内置计数器
文件系统计数器(File System Counters)
作业计数器(Job Counters)
MapReduce框架计数器(Map-Reduce Framework)
Shuffle 错误计数器(Shuffle Errors)
文件输入格式计数器(File Output Format Counters)
文件输出格式计数器(File Input Format Counters)
自定义计数器的应用
public class MyWordCount {
public static class TokenizerMapper
extends Mapper<Object, Text, HavelangText, IntWritable>{
private final static IntWritable one = new IntWritable(1);
HavelangText word = new HavelangText();public void map(Object key, Text value, Context context) throws IOException, InterruptedException {StringTokenizer itr = new StringTokenizer(value.toString());Counter sensitiveCounter = context.getCounter("Sensitive Words:", "map-read");sensitiveCounter.increment(1L);Counter sensitiveCounter1 = context.getCounter("Sensitive Words:", "map-write");while (itr.hasMoreTokens()) {word.setText(itr.nextToken().toString());sensitiveCounter1.increment(1L);context.write(word, one);}
}
}
public static class IntSumReducer
extends Reducer<HavelangText,IntWritable,Text,IntWritable> {
private IntWritable result = new IntWritable();
public void reduce(HavelangText key, Iterable<IntWritable> values, //values 迭代器。。next方法Context context) throws IOException, InterruptedException {Counter sensitiveCounter = context.getCounter("Sensitive Words:", "reduce-read-key");Counter sensitiveCounter1 = context.getCounter("Sensitive Words:", "reduce-read-value");Counter sensitiveCounter2 = context.getCounter("Sensitive Words:", "reduce-write");sensitiveCounter.increment(1L);int sum = 0;for (IntWritable val : values) {sensitiveCounter1.increment(1L);sum += val.get();}result.set(sum);sensitiveCounter2.increment(1L);context.write(new Text(key.getText()), result);//contxt正文
}
}
public static void main(String[] args) throws Exception {
Job job = new Job();
job.setJarByClass(MyWordCount.class);
job.setJobName(“Job2 MapReduce”);
FileInputFormat.addInputPath(job, new Path("/data/input/work5"));
FileOutputFormat.setOutputPath(job, new Path("/data/out/work5/"));
job.setMapperClass(TokenizerMapper.class);
job.setReducerClass(IntSumReducer.class);
job.setOutputKeyClass(HavelangText.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
x. 字典序的概念和使用
xi. 自定义排序器的应用
public class HavelangText implements WritableComparable {
String text;
int lang;
public HavelangText(){
super();
}
HavelangText(String text){
this.text=text;
this.lang = text.length();
}
HavelangText(String text,int a){this.lang = text.length();
}
@Override
public String toString() {return text ;
}public String getText() {return text;
}public int getLang() {return lang;
}public void setText(String text) {this.text=text;this.lang = text.length();
}public void setLang(int lang) {this.lang = lang;
}@Override
public void write(DataOutput out) throws IOException {// TODO Auto-generated method stubout.writeInt(lang);out.writeUTF(text);
}@Override
public void readFields(DataInput in) throws IOException {// TODO Auto-generated method stubthis.lang=in.readInt();this.text=in.readUTF();}@Override
public int pareTo(Object o) {// TODO Auto-generated method stubif(o.toString().equals(text)) {return 0;}else {if(lang- o.toString().length()>0) {return 1;}if(lang- o.toString().length()<0) {return -1;}return o.toString().pareTo(text);}}
}
更多推荐
haadoop
发布评论