从标题来看非常明显,我的目标是在完成特定工作之前在缩减阶段使用Mapper的计数器。
我遇到了一些与这个问题高度相关的问题,但没有一个问题解决了我所有的问题。 ( 从减速器访问映射器的计数器 , Hadoop,MapReduce自定义Java计数器在线程“main”中的异常java.lang.IllegalStateException:状态为DEFINE而不是RUNNING等的作业)
@Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue(); }我的问题是群集不包含任何作业历史记录。
我调用mapreduce作业的方式:
private void firstFrequents(String outpath) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Cluster cluster = new Cluster(conf); conf.setInt("minFreq", MIN_FREQUENCY); Job job = Job.getInstance(conf, "APR"); // Counters counters = job.getCounters(); job.setJobName("TotalTransactions"); job.setJarByClass(AssociationRules.class); job.setMapperClass(FirstFrequentsMapper.class); job.setReducerClass(CandidateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("input")); FileOutputFormat.setOutputPath(job, new Path(outpath)); job.waitForCompletion(true); }制图员:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FirstFrequentsMapper extends Mapper<Object, Text, Text, IntWritable> { public enum Counters { TotalTransactions } private IntWritable one = new IntWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\\\t+|,+"); int iter = 0; for (String string : line) { context.write(new Text(line[iter]), one); iter++; } context.getCounter(Counters.TotalTransactions).increment(1); } }减速器
public class CandidateReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private int minFrequency; private long totalTransactions; @Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); minFrequency = conf.getInt("minFreq", 1); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); totalTransactions = currentJob.getCounters().findCounter(FirstFrequentsMapper.Counters.TotalTransactions).getValue(); System.out.print(totalTransactions); } public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException { int counter = 0; for (IntWritable val : values) { counter+=val.get(); } /* Item frequency calculated*/ /* Write it to output if it is frequent */ if (counter>= minFrequency) { context.write(key,new IntWritable(counter)); } } }As it is quite obvious from title, my aim is to use the Mapper's counter in the reduce phase, before finishing the particular job.
I have come across a few questions which were highly related to this question, but non of them solved all my problems. (Accessing a mapper's counter from a reducer, Hadoop, MapReduce Custom Java Counters Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING, etc.)
@Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue(); }My problem is that the cluster does not contain any job history.
The way how I call the mapreduce job:
private void firstFrequents(String outpath) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Cluster cluster = new Cluster(conf); conf.setInt("minFreq", MIN_FREQUENCY); Job job = Job.getInstance(conf, "APR"); // Counters counters = job.getCounters(); job.setJobName("TotalTransactions"); job.setJarByClass(AssociationRules.class); job.setMapperClass(FirstFrequentsMapper.class); job.setReducerClass(CandidateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("input")); FileOutputFormat.setOutputPath(job, new Path(outpath)); job.waitForCompletion(true); }Mapper:
import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FirstFrequentsMapper extends Mapper<Object, Text, Text, IntWritable> { public enum Counters { TotalTransactions } private IntWritable one = new IntWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\\\t+|,+"); int iter = 0; for (String string : line) { context.write(new Text(line[iter]), one); iter++; } context.getCounter(Counters.TotalTransactions).increment(1); } }Reducer
public class CandidateReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private int minFrequency; private long totalTransactions; @Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); minFrequency = conf.getInt("minFreq", 1); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); totalTransactions = currentJob.getCounters().findCounter(FirstFrequentsMapper.Counters.TotalTransactions).getValue(); System.out.print(totalTransactions); } public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException { int counter = 0; for (IntWritable val : values) { counter+=val.get(); } /* Item frequency calculated*/ /* Write it to output if it is frequent */ if (counter>= minFrequency) { context.write(key,new IntWritable(counter)); } } }最满意答案
获取计数器值的正确setup()或reduce()实现正是您提到的帖子中显示的那个 :
Counter counter = context.getCounter(CounterMapper.TestCounters.TEST); long counterValue = counter.getValue();其中TEST是计数器的名称,在枚举TestCounters声明。
我没有看到你声明一个Cluster变量的原因......
此外,在您在注释中提到的代码中,您应该将getValue()方法的返回结果存储在变量中,作为上面的counterValue变量。
也许,你会发现这篇文章也很有用。
更新:根据您的编辑,我相信您要查找的是MAP_INPUT_RECORDS的数量,这是一个默认计数器,因此您无需重新实现它。
要从Driver类中获取计数器的值,您可以使用(取自此帖子 ):
job.getCounters().findCounter(COUNTER_NAME).getValue();The correct setup(), or reduce() implementation, to get the value of a counter, is exactly the one shown in the post that you mention:
Counter counter = context.getCounter(CounterMapper.TestCounters.TEST); long counterValue = counter.getValue();where TEST is the name of the counter, which is declared in an enum TestCounters.
I don't see the reason why you declare a Cluster variable...
Also, in the code that you mention in your comments, you should store the returned result of the getValue() method in a variable, as the counterValue variable above.
Perhaps, you will find this post useful, as well.
UPDATE: Based on your edit, I believe that all you are looking for is the number of MAP_INPUT_RECORDS which is a default counter, so you don't need to re-implement it.
To get the value of a counter from the Driver class, you can use (taken from this post):
job.getCounters().findCounter(COUNTER_NAME).getValue();更多推荐
发布评论