多个文件"/>
MapReduce一次读取多个文件
MapReduce一次读取多个文件
求出每一行数据所在的文件名
学习目标(获取数据所在文件的名称)
主要还是map中的改变
在map中通过context调用getInputSplit()这个方法,再将此对象强转为FileSplit就能通过getPath()方法的getName()方法获取文件名了
上代码:
package com.damo01;import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;import java.io.IOException;/*** @author 嗨皮骚* @version v 1.0* @date 2019/11/18*/
public class MyMapper extends Mapper<LongWritable, Text,Text,Text> {@Overrideprotected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {//通过context调用getInputSplit()这个方法,因为我们要读取多个文件,所以要强转为FileSplitFileSplit fileSplit = (FileSplit) context.getInputSplit();//fileSplit调用getPath获取路径信息Path path = fileSplit.getPath();//通过路径信息path获取文件的名字String name = path.getName();context.write(new Text(name),value);//这样我们就将文件的名字及其每一行的内容传给reduce了}
}
后面的代码和以前一样,没有什么特别的操作
package com.damo01;import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;import java.io.IOException;/*** @author 嗨皮骚* @version v 1.0* @date 2019/11/18*/
public class MyReduce extends Reducer<Text,Text,Text,Text> {@Overrideprotected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {//遍历拿到的同一个key的数据,直接输出便是for (Text value : values) {context.write(new Text(key.toString()),value);}}
}
Driver的代码和以前一样,没有什么特别的操作
ackage com.damo01;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;/*** @author 嗨皮骚* @version v 1.0* @date 2019/11/18*/
public class MyDriver extends Configured implements Tool {@Overridepublic int run(String[] strings) throws Exception {//实例一个job对象参数(Configuration对象,随便一个字符串的名字)Job job = Job.getInstance(new Configuration(), "demo01");//这一步是为了获取文件或目录内的文件,我使用的是本地的地址job.setInputFormatClass(TextInputFormat.class);TextInputFormat.addInputPath(job,new Path("E:\\教学\\学习资料3\\11.18\\day24"));//这一步是为了让jar包在集群中跑起来job.setJarByClass(MyDriver.class);//配置map类job.setMapperClass(MyMapper.class);job.setMapOutputKeyClass(Text.class);job.setMapOutputValueClass(Text.class);//配置reduce类job.setReducerClass(MyReduce.class);//配置输出类型job.setOutputKeyClass(Text.class);job.setOutputValueClass(Text.class);//这一步是输出结果的目录job.setOutputFormatClass(TextOutputFormat.class);TextOutputFormat.setOutputPath(job,new Path("E:\\教学\\学习资料3\\11.18\\day24\\结果"));//开启job的运行,并打印信息,返回状态码return job.waitForCompletion(true)?0:1;}public static void main(String[] args) throws Exception {//运行MapReduceint run = ToolRunner.run(new MyDriver(), args);System.out.println(run==0?"成功":"失败");}
}
Hadoop的pom依赖
<repositories><repository><id>cloudera</id><url>/</url></repository></repositories><dependencies><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-common</artifactId><version>2.7.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-hdfs</artifactId><version>2.7.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-client</artifactId><version>2.7.4</version></dependency><dependency><groupId>org.apache.hadoop</groupId><artifactId>hadoop-mapreduce-client-core</artifactId><version>2.7.4</version></dependency><!-- --><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>4.11</version><scope>test</scope></dependency><dependency><groupId>org.testng</groupId><artifactId>testng</artifactId><version>RELEASE</version></dependency></dependencies><build><plugins><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-compiler-plugin</artifactId><version>3.0</version><configuration><source>1.8</source><target>1.8</target><encoding>UTF-8</encoding><!-- <verbal>true</verbal>--></configuration></plugin><plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-shade-plugin</artifactId><version>2.4.3</version><executions><execution><phase>package</phase><goals><goal>shade</goal></goals><configuration><minimizeJar>true</minimizeJar></configuration></execution></executions></plugin></plugins></build>
更多推荐
MapReduce一次读取多个文件
发布评论