Hadoop Mapreduce

编程入门 行业动态 更新时间:2024-10-23 12:35:52
本文介绍了Hadoop Mapreduce - 访问本地文件系统的处理方法,对大家解决问题具有一定的参考价值,需要的朋友们下面随着小编来一起学习吧! 问题描述

我有一个要求,在Map Reduce代码中应该读取每个节点中的本地文件系统。该程序将在HDFS上运行,我不能在xml文件中更改hadoop的FileSystem属性进行配置。

我尝试了以下解决方案,但都没有给出结果。

方法1

Configuration config = new Configuration() FileSystem localFileSystem = FileSystem.get(config); localFileSystem.set(fs.defaultFS,file:///); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(/ user / input / localFile))));

方式2

Configuration config = new Configuration(); LocalFileSystem localFileSystem = FileSystem.getLocal(config); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(/ user / input / localFile))));

方法3

Configuration config = new Configuration(); LocalFileSystem localFileSystem = FileSystem.getLocal(config); localFileSystem.set(fs.defaultFS,file:///); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(/ user / input / localFile))));

方法4

Configuration config = new Configuration(); LocalFileSystem localFileSystem = FileSystem.getLocal(config); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.getRaw()。open(new Path(/ user / input / localFile))));

这也没有用 [

>每个人都给出了错误:没有这样的文件存在

错误堆栈

attempt_201406050021_0018_m_000000_2:java.io.FileNotFoundException:文件/ home / cloudera / sftp / id_rsa不存在 attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:468) attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:380) attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:231) attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:183) attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.LocalFileSystem.copyFromLoc alFile(LocalFileSystem.java:81) attempt_201406050021_0018_m_000000_2:at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1934) attempt_201406050021_0018_m_000000_2:at com.skanda.ecomm.sftp.FTPMapper。 configure(FTPMapper.java:91)

我希望在这里得到一个积极的解决方案。主要类(Driver类)

c $ c> / * * @ SFTPClient.java @ 2014年5月20日 * * * / package com.skanda.ecomm.sftp; import java.URI; 导入org.apache.hadoop.conf.Configuration; 导入org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; 导入org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; / ** * *< p> * SFTPClient类 *< / p> * * @author skanda * @version 1.0 * * / 公共类SFTPClient扩展Configured implements工具{ public int run(String [] args)throws Exception { Configuration config = getConf(); String inputPath = config.get(ApplicationConstants.INPUT_PATH); String outputPath = config.get(ApplicationConstants.OUTPUT_PATH); String configPath = config.get(ApplicationConstants.CONFIG_PATH); int redurs = Integer.parseInt(config.get(ApplicationConstants.REDUCERS)); $ b $ if(outputPath == null || inputPath == null || configPath == null){ throw new Exception(Usage:\\\+-D configPath =< ; configPath> -D inputPath =< inputPath> -D redurs =< redurs+ -D outputPath =< path>); } JobConf conf = new JobConf(SFTPClient.class); conf.setJobName(SFTP注入客户端); DistributedCache.addCacheFile(new URI(configPath),conf); conf.setMapperClass(FTPMapper.class); conf.setReducerClass(FTPReducer.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(Text.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(IntWritable.class); //配置应该包含对你的namenode的引用 FileSystem fs = FileSystem.get(new Configuration()); fs.delete(new Path(outputPath),true); // true代表递归,删除你给的文件夹 $ b $ conf.setStrings(ApplicationConstants.INPUT_PATH,inputPath); conf.setStrings(ApplicationConstants.OUTPUT_PATH,outputPath); FileInputFormat.setInputPaths(conf,new Path(inputPath)); FileOutputFormat.setOutputPath(conf,new Path(outputPath)); conf.setNumReduceTasks(reducer); conf.setInt(ApplicationConstants.NUNBER_OF_REDUCERS,redurs); JobClient.runJob(conf); 返回0; public static void main(String [] args)throws Exception { int exitCode = ToolRunner.run(new SFTPClient(),args); System.exit(exitCode); $ Map $ b

/ * * @ FTPMapper.java @ 2014年5月20日 * * * / 包com.skanda.ecomm.sftp; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; 导入java.InetAddress; import java.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; 导入org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; 导入org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import com.ftp.mapreduce.CommonUtility; import com.ftp.mapreduce.RetrieveFileNames; import com.jcraft.jsch.hm.Channel; / ** * *< p> * FTP映射器类 *< / p> * * @author skanda * @version 1.0 * * / @SuppressWarnings(unused) public class FTPMapper扩展MapReduceBase实现Mapper< LongWritable,Text,IntWritable,Text> { 私人URI [] localFiles; private String userName; private String hostName; private String folderPath; private int redurs; private byte [] pvtKey; private String fileName; private String startDate; private String endDate; private String sshKeyPath; 私人字符串密码; $ b $ public void configure(JobConf job){ Properties properties = new Properties(); 尝试{ localFiles = DistributedCache.getCacheFiles(job); if(localFiles!= null& localFiles.length == 1){ Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(localFiles [0],conf); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(fileSystem.open(new Path(localFiles [0])))); properties.load(bufferRedaer); userName = properties.getProperty(ApplicationConstants.USER_NAME); redurs = job.getInt(ApplicationConstants.NUNBER_OF_REDUCERS,30); hostName = properties.getProperty(ApplicationConstants.SFTP_SERVER_HOST); folderPath = properties.getProperty(ApplicationConstants.HOSTFILE_DIRECTORY_PATH); fileName = properties.getProperty(ApplicationConstants.FILE_NAME_PATTERN); startDate = properties.getProperty(ApplicationConstants.FILE_START_DATE); endDate = properties.getProperty(ApplicationConstants.FILE_END_DATE); sshKeyPath = properties.getProperty(ApplicationConstants.SSH_KEY_PATH); password = properties.getProperty(ApplicationConstants.PASSWORD); System.out.println(----------------------------------- ---------------); / * FileSystem fs = FileSystem.getLocal(conf); //路径inputPath = fs.makeQualified(new Path(sshKeyPath)); String inputPath = new Path(file:/// home / cloudera /+ sshKeyPath).toUri()。getPath(); fs.copyFromLocalFile(new Path(inputPath),new Path(outputSFTP / idFile)); * / try { Configuration conf1 = new Configuration(); Path pt = new Path(file:///home/cloudera/.ssh/id_rsa); FileSystem fs = FileSystem.get(new URI(file:///home/cloudera/.ssh/id_rsa),conf); LocalFileSystem localFileSystem = fs.getLocal(conf1); BufferedReader bufferRedaer1 = new BufferedReader(new InputStreamReader(localFileSystem.open(pt))); String str = null; ((str = bufferRedaer1.readLine())!= null) { System.out.println(-----------); System.out.println(str); catch(Exception e){ System.out.println(failed again); String computername = InetAddress.getLocalHost()。getHostName(); System.out.println(computername); e.printStackTrace(); } System.out.println(------------------------------ --------------------); 配置config = new Configuration(); config.set(fs.defaultFS,file:////); LocalFileSystem localFileSystem = FileSystem.getLocal(config); bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(sshKeyPath)))); / *配置config = new配置(); //config.set(\"fs.defaultFS,file:///home/cloudera/.ssh/id_rsa); LocalFileSystem fileSystm = FileSystem.getLocal(config); Path path = fileSystm.makeQualified(new Path(/ home / cloudera / .ssh / id_rsa)); * / //FileInputFormat.setInputPaths(job,path); // bufferRedaer = new BufferedReader(new InputStreamReader(fileSystem.open(path))); String key =; 尝试{ String line =; ((line = bufferRedaer.readLine())!= null){ key + = line +\\\; while( } pvtKey = key.getBytes(); } catch(Exception e){ e.printStackTrace(); } finally { //fileSystem.close(); //bufferRedaer.close(); $ b $ catch(IOException e){ e.printStackTrace(); $ b $公共无效映射(LongWritable键,文本值,OutputCollector< IntWritable,Text>输出,Reporter记者)抛出IOException { 列表< String> filterFileNamesList = new ArrayList< String>(); 频道频道= CommonUtility.connectSFTP(userName,hostName,pvtKey); 地图< String,String> fileNamesMap = CommonUtility.getFileNames(channel,folderPath); 列表< String> filterFileNameList_output = RetrieveFileNames.FILTER_BY_NAME.retrieveFileNames(fileNamesMap,filterFileNamesList, fileName,startDate,endDate); (int i = 0; i< filterFileNameList_output.size(); i ++){ int keyGroup = i%reducer; output.collect(new IntWritable(keyGroup),new Text(filterFileNameList_output.get(i))); $ div class =h2_lin>解决方案

当程序运行在hdfs上,并且我的txt文件在这个位置时,这段代码正在为我工​​作: $ b / home / Rishi / Documents / RishiFile /r.txt

public class HadoopRead { public static void main(String [] args){ try { Configuration conf = new Configuration(); Path pt = new Path(/ home / Rishi / Documents / RishiFile / r.txt); FileSystem fs = FileSystem.get(new URI(/ home / Rishi / Documents / RishiFile),conf); LocalFileSystem localFileSystem = fs.getLocal(conf); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(pt))); String str = null; ((str = bufferRedaer.readLine())!= null) { System.out.println(-----------); System.out.println(str); } } catch(Exception e){ e.printStackTrace(); $ p $ file on hdfs

我的主类

import org.apache。 hadoop.conf.Configuration; 导入org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; 导入org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FileDriver extends Configured implements Tool { public static void main(String [] args){ try { ToolRunner .run(new Configuration(),new FileDriver(),args); System.exit(0); } catch(Exception e){ e.printStackTrace(); $ b public int run(String [] arg0)throws Exception { Configuration conf = new Configuration(); Path pt = new Path(file:/// home / winoria / Documents / Ri / r); 工作职位=新职位(conf,新职位); job.setJarByClass(FileDriver.class); job.setMapperClass(FileMapper.class); job.setReducerClass(FileReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job,pt); FileSystem.get(job.getConfiguration())。delete(new Path(Output2),true); FileOutputFormat.setOutputPath(job,new Path(Output2)); job.waitForCompletion(true); 返回0; $ p

映射类:

public class FileMapper扩展了Mapper< LongWritable,Text,Text,Text> { protected void map(LongWritable key,Text value,Context context)引发java.io.IOException,InterruptedException { String str [] = value.toString()。分裂( ); (new Text(str [i]),new Text()); for(int i = 0; i< str.length; i ++){ context.write } };

Reducer类:

public class FileReducer extends Reducer< Text,Text,Text,Text> { $ b protected void reduce(Text key,Iterable< Text> value,Context context)throws java.io.IOException,InterruptedException { int count = 0; for(Text text:value){ count ++; } context.write(key,new Text(count +)); }; }

I have a requirement where in the Map Reduce code should read the local file system in each node. The program will be running on HDFS and I cannot change the FileSystem property for hadoop in xml files for configuration.

I have tried the following solutions, but none gave me results.

Approach 1

Configuration config = new Configuration(); FileSystem localFileSystem = FileSystem.get(config); localFileSystem.set("fs.defaultFS", "file:///"); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path("/user/input/localFile"))));

Approach 2

Configuration config = new Configuration(); LocalFileSystem localFileSystem = FileSystem.getLocal(config); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path("/user/input/localFile"))));

Approach 3

Configuration config = new Configuration(); LocalFileSystem localFileSystem = FileSystem.getLocal(config); localFileSystem.set("fs.defaultFS", "file:///"); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path("/user/input/localFile"))));

Approach 4

Configuration config = new Configuration(); LocalFileSystem localFileSystem = FileSystem.getLocal(config); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.getRaw().open(new Path("/user/input/localFile"))));

This did not work either [Reading HDFS and local files in Java

Each of them gave the error: No such file exists

Error Stack

attempt_201406050021_0018_m_000000_2: java.io.FileNotFoundException: File /home/cloudera/sftp/id_rsa does not exist attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.RawLocalFileSystem.getFileStatus(RawLocalFileSystem.java:468) attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.FilterFileSystem.getFileStatus(FilterFileSystem.java:380) attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:231) attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.FileUtil.copy(FileUtil.java:183) attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.LocalFileSystem.copyFromLocalFile(LocalFileSystem.java:81) attempt_201406050021_0018_m_000000_2: at org.apache.hadoop.fs.FileSystem.copyFromLocalFile(FileSystem.java:1934) attempt_201406050021_0018_m_000000_2: at com.skanda.ecomm.sftp.FTPMapper.configure(FTPMapper.java:91)

I am hoping to get a positive solution here. Let me know where I am going wrong.

Main class (Driver class)

/* * @SFTPClient.java @May 20, 2014 * * */ package com.skanda.ecomm.sftp; import java.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.FileOutputFormat; import org.apache.hadoop.mapred.JobClient; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; /** * * <p> * SFTPClient Class * </p> * * @author skanda * @version 1.0 * */ public class SFTPClient extends Configured implements Tool { public int run(String[] args) throws Exception { Configuration config = getConf(); String inputPath = config.get(ApplicationConstants.INPUT_PATH); String outputPath = config.get(ApplicationConstants.OUTPUT_PATH); String configPath = config.get(ApplicationConstants.CONFIG_PATH); int reducers = Integer.parseInt(config.get(ApplicationConstants.REDUCERS)); if(outputPath == null || inputPath == null || configPath == null) { throw new Exception("Usage: \n" + "-D configPath=<configPath> -D inputPath=<inputPath> -D reducers=<reducers" + "-D outputPath=<path>"); } JobConf conf = new JobConf(SFTPClient.class); conf.setJobName("SFTP Injection client"); DistributedCache.addCacheFile(new URI(configPath),conf); conf.setMapperClass(FTPMapper.class); conf.setReducerClass(FTPReducer.class); conf.setMapOutputKeyClass(IntWritable.class); conf.setMapOutputValueClass(Text.class); conf.setOutputKeyClass(IntWritable.class); conf.setOutputValueClass(IntWritable.class); // configuration should contain reference to your namenode FileSystem fs = FileSystem.get(new Configuration()); fs.delete(new Path(outputPath), true); // true stands for recursively, deleting the folder you gave conf.setStrings(ApplicationConstants.INPUT_PATH, inputPath); conf.setStrings(ApplicationConstants.OUTPUT_PATH, outputPath); FileInputFormat.setInputPaths(conf, new Path(inputPath)); FileOutputFormat.setOutputPath(conf, new Path(outputPath)); conf.setNumReduceTasks(reducers); conf.setInt(ApplicationConstants.NUNBER_OF_REDUCERS, reducers); JobClient.runJob(conf); return 0; } public static void main(String[] args) throws Exception { int exitCode = ToolRunner.run(new SFTPClient(), args); System.exit(exitCode); } }

Mapper

/* * @FTPMapper.java @May 20, 2014 * * */ package com.skanda.ecomm.sftp; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; import java.InetAddress; import java.URI; import java.util.ArrayList; import java.util.List; import java.util.Map; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.filecache.DistributedCache; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocalFileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapred.FileInputFormat; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.MapReduceBase; import org.apache.hadoop.mapred.Mapper; import org.apache.hadoop.mapred.OutputCollector; import org.apache.hadoop.mapred.Reporter; import com.ftp.mapreduce.CommonUtility; import com.ftp.mapreduce.RetrieveFileNames; import com.jcraft.jsch.hm.Channel; /** * * <p> * FTP Mapper Class * </p> * * @author skanda * @version 1.0 * */ @SuppressWarnings("unused") public class FTPMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> { private URI[] localFiles; private String userName; private String hostName; private String folderPath; private int reducers; private byte[] pvtKey; private String fileName; private String startDate; private String endDate; private String sshKeyPath; private String password; public void configure(JobConf job) { Properties properties = new Properties(); try { localFiles = DistributedCache.getCacheFiles(job); if (localFiles != null && localFiles.length == 1) { Configuration conf = new Configuration(); FileSystem fileSystem = FileSystem.get(localFiles[0], conf); BufferedReader bufferRedaer=new BufferedReader(new InputStreamReader(fileSystem.open(new Path(localFiles[0])))); properties.load(bufferRedaer); userName = properties.getProperty(ApplicationConstants.USER_NAME); reducers = job.getInt(ApplicationConstants.NUNBER_OF_REDUCERS, 30); hostName = properties.getProperty(ApplicationConstants.SFTP_SERVER_HOST); folderPath = properties.getProperty(ApplicationConstants.HOSTFILE_DIRECTORY_PATH); fileName = properties.getProperty(ApplicationConstants.FILE_NAME_PATTERN); startDate = properties.getProperty(ApplicationConstants.FILE_START_DATE); endDate = properties.getProperty(ApplicationConstants.FILE_END_DATE); sshKeyPath = properties.getProperty(ApplicationConstants.SSH_KEY_PATH); password = properties.getProperty(ApplicationConstants.PASSWORD); System.out.println("--------------------------------------------------"); /*FileSystem fs = FileSystem.getLocal(conf); //Path inputPath = fs.makeQualified(new Path(sshKeyPath)); String inputPath = new Path("file:///home/cloudera/"+sshKeyPath).toUri().getPath(); fs.copyFromLocalFile(new Path(inputPath), new Path("outputSFTP/idFile") );*/ try{ Configuration conf1 = new Configuration(); Path pt = new Path("file:///home/cloudera/.ssh/id_rsa"); FileSystem fs = FileSystem.get( new URI("file:///home/cloudera/.ssh/id_rsa"), conf); LocalFileSystem localFileSystem = fs.getLocal(conf1); BufferedReader bufferRedaer1 = new BufferedReader(new InputStreamReader(localFileSystem.open(pt))); String str = null; while ((str = bufferRedaer1.readLine())!= null) { System.out.println("-----------"); System.out.println(str); } }catch(Exception e){ System.out.println("failed again"); String computername=InetAddress.getLocalHost().getHostName(); System.out.println(computername); e.printStackTrace(); } System.out.println("--------------------------------------------------"); Configuration config = new Configuration(); config.set("fs.defaultFS", "file:////"); LocalFileSystem localFileSystem = FileSystem.getLocal(config); bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(new Path(sshKeyPath)))); /*Configuration config = new Configuration(); //config.set("fs.defaultFS", "file:///home/cloudera/.ssh/id_rsa"); LocalFileSystem fileSystm = FileSystem.getLocal(config); Path path = fileSystm.makeQualified(new Path("/home/cloudera/.ssh/id_rsa"));*/ //FileInputFormat.setInputPaths(job, path); //bufferRedaer = new BufferedReader(new InputStreamReader(fileSystem.open(path))); String key = ""; try { String line = ""; while ((line = bufferRedaer.readLine()) != null) { key += line + "\n"; } pvtKey = key.getBytes(); } catch(Exception e){ e.printStackTrace(); } finally { //fileSystem.close(); //bufferRedaer.close(); } } } catch (IOException e) { e.printStackTrace(); } } public void map(LongWritable key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException { List<String> filterFileNamesList = new ArrayList<String>(); Channel channel = CommonUtility.connectSFTP(userName, hostName, pvtKey); Map<String, String> fileNamesMap = CommonUtility.getFileNames(channel, folderPath); List<String> filterFileNameList_output = RetrieveFileNames.FILTER_BY_NAME.retrieveFileNames(fileNamesMap, filterFileNamesList, fileName, startDate, endDate); for (int i = 0; i < filterFileNameList_output.size(); i++) { int keyGroup = i % reducers; output.collect(new IntWritable(keyGroup), new Text(filterFileNameList_output.get(i))); } } }

解决方案

This code is working for me when program runs on hdfs and my txt file is in this location:

/home/Rishi/Documents/RishiFile/r.txt

public class HadoopRead { public static void main(String[] args) { try{ Configuration conf = new Configuration(); Path pt = new Path("/home/Rishi/Documents/RishiFile/r.txt"); FileSystem fs = FileSystem.get( new URI("/home/Rishi/Documents/RishiFile"), conf); LocalFileSystem localFileSystem = fs.getLocal(conf); BufferedReader bufferRedaer = new BufferedReader(new InputStreamReader(localFileSystem.open(pt))); String str = null; while ((str = bufferRedaer.readLine())!= null) { System.out.println("-----------"); System.out.println(str); } }catch(Exception e){ e.printStackTrace(); } } }

Word Count Example for reading local file on hdfs

my main class

import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner; public class FileDriver extends Configured implements Tool { public static void main(String[] args) { try{ ToolRunner.run(new Configuration(), new FileDriver(), args); System.exit(0); }catch(Exception e){ e.printStackTrace(); } } public int run(String[] arg0) throws Exception { Configuration conf = new Configuration(); Path pt = new Path("file:///home/winoria/Documents/Ri/r"); Job job = new Job(conf, "new Job"); job.setJarByClass(FileDriver.class); job.setMapperClass(FileMapper.class); job.setReducerClass(FileReducer.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(Text.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(Text.class); job.setInputFormatClass(TextInputFormat.class); job.setOutputFormatClass(TextOutputFormat.class); FileInputFormat.setInputPaths(job, pt); FileSystem.get(job.getConfiguration()).delete(new Path("Output2"), true); FileOutputFormat.setOutputPath(job, new Path("Output2")); job.waitForCompletion(true); return 0; } }

mapper class :

public class FileMapper extends Mapper<LongWritable, Text, Text, Text> { protected void map(LongWritable key, Text value,Context context) throws java.io.IOException ,InterruptedException { String str[] = value.toString().split(" "); for(int i =0; i<str.length;i++){ context.write(new Text(str[i]), new Text()); } }; }

Reducer Class:

public class FileReducer extends Reducer<Text, Text, Text, Text> { protected void reduce(Text key,Iterable<Text> value,Context context) throws java.io.IOException ,InterruptedException { int count=0; for (Text text : value) { count++; } context.write(key, new Text(count+"")); }; }

更多推荐

Hadoop Mapreduce

本文发布于:2023-10-31 18:46:51,感谢您对本站的认可!
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:Hadoop   Mapreduce

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!