在Reducer阶段访问Mapper的计数器(在完成工作之前)(Accessing Mapper's counter in Reducer phase (before finishing t

编程入门 行业动态 更新时间:2024-10-27 04:38:31
在Reducer阶段访问Mapper的计数器(在完成工作之前)(Accessing Mapper's counter in Reducer phase (before finishing the job))

从标题来看非常明显,我的目标是在完成特定工作之前在缩减阶段使用Mapper的计数器。

我遇到了一些与这个问题高度相关的问题,但没有一个问题解决了我所有的问题。 ( 从减速器访问映射器的计数器 , Hadoop,MapReduce自定义Java计数器在线程“main”中的异常java.lang.IllegalStateException:状态为DEFINE而不是RUNNING等的作业)

@Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue(); }

我的问题是群集不包含任何作业历史记录。

我调用mapreduce作业的方式:

private void firstFrequents(String outpath) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Cluster cluster = new Cluster(conf); conf.setInt("minFreq", MIN_FREQUENCY); Job job = Job.getInstance(conf, "APR"); // Counters counters = job.getCounters(); job.setJobName("TotalTransactions"); job.setJarByClass(AssociationRules.class); job.setMapperClass(FirstFrequentsMapper.class); job.setReducerClass(CandidateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("input")); FileOutputFormat.setOutputPath(job, new Path(outpath)); job.waitForCompletion(true); }

制图员:

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FirstFrequentsMapper extends Mapper<Object, Text, Text, IntWritable> { public enum Counters { TotalTransactions } private IntWritable one = new IntWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\\\t+|,+"); int iter = 0; for (String string : line) { context.write(new Text(line[iter]), one); iter++; } context.getCounter(Counters.TotalTransactions).increment(1); } }

减速器

public class CandidateReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private int minFrequency; private long totalTransactions; @Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); minFrequency = conf.getInt("minFreq", 1); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); totalTransactions = currentJob.getCounters().findCounter(FirstFrequentsMapper.Counters.TotalTransactions).getValue(); System.out.print(totalTransactions); } public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException { int counter = 0; for (IntWritable val : values) { counter+=val.get(); } /* Item frequency calculated*/ /* Write it to output if it is frequent */ if (counter>= minFrequency) { context.write(key,new IntWritable(counter)); } } }

As it is quite obvious from title, my aim is to use the Mapper's counter in the reduce phase, before finishing the particular job.

I have come across a few questions which were highly related to this question, but non of them solved all my problems. (Accessing a mapper's counter from a reducer, Hadoop, MapReduce Custom Java Counters Exception in thread "main" java.lang.IllegalStateException: Job in state DEFINE instead of RUNNING, etc.)

@Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); mapperCounter = currentJob.getCounters().findCounter(COUNTER_NAME).getValue(); }

My problem is that the cluster does not contain any job history.

The way how I call the mapreduce job:

private void firstFrequents(String outpath) throws IOException, InterruptedException, ClassNotFoundException { Configuration conf = new Configuration(); Cluster cluster = new Cluster(conf); conf.setInt("minFreq", MIN_FREQUENCY); Job job = Job.getInstance(conf, "APR"); // Counters counters = job.getCounters(); job.setJobName("TotalTransactions"); job.setJarByClass(AssociationRules.class); job.setMapperClass(FirstFrequentsMapper.class); job.setReducerClass(CandidateReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileInputFormat.addInputPath(job, new Path("input")); FileOutputFormat.setOutputPath(job, new Path(outpath)); job.waitForCompletion(true); }

Mapper:

import java.io.IOException; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Mapper; public class FirstFrequentsMapper extends Mapper<Object, Text, Text, IntWritable> { public enum Counters { TotalTransactions } private IntWritable one = new IntWritable(1); public void map(Object key, Text value, Context context) throws IOException, InterruptedException { String[] line = value.toString().split("\\\t+|,+"); int iter = 0; for (String string : line) { context.write(new Text(line[iter]), one); iter++; } context.getCounter(Counters.TotalTransactions).increment(1); } }

Reducer

public class CandidateReducer extends Reducer<Text, IntWritable, Text, IntWritable>{ private int minFrequency; private long totalTransactions; @Override public void setup(Context context) throws IOException, InterruptedException{ Configuration conf = context.getConfiguration(); minFrequency = conf.getInt("minFreq", 1); Cluster cluster = new Cluster(conf); Job currentJob = cluster.getJob(context.getJobID()); totalTransactions = currentJob.getCounters().findCounter(FirstFrequentsMapper.Counters.TotalTransactions).getValue(); System.out.print(totalTransactions); } public void reduce(Text key, Iterable<IntWritable> values, Context context)throws IOException, InterruptedException { int counter = 0; for (IntWritable val : values) { counter+=val.get(); } /* Item frequency calculated*/ /* Write it to output if it is frequent */ if (counter>= minFrequency) { context.write(key,new IntWritable(counter)); } } }

最满意答案

获取计数器值的正确setup()或reduce()实现正是您提到的帖子中显示的那个 :

Counter counter = context.getCounter(CounterMapper.TestCounters.TEST); long counterValue = counter.getValue();

其中TEST是计数器的名称,在枚举TestCounters声明。

我没有看到你声明一个Cluster变量的原因......

此外,在您在注释中提到的代码中,您应该将getValue()方法的返回结果存储在变量中,作为上面的counterValue变量。

也许,你会发现这篇文章也很有用。

更新:根据您的编辑,我相信您要查找的是MAP_INPUT_RECORDS的数量,这是一个默认计数器,因此您无需重新实现它。

要从Driver类中获取计数器的值,您可以使用(取自此帖子 ):

job.getCounters().findCounter(COUNTER_NAME).getValue();

The correct setup(), or reduce() implementation, to get the value of a counter, is exactly the one shown in the post that you mention:

Counter counter = context.getCounter(CounterMapper.TestCounters.TEST); long counterValue = counter.getValue();

where TEST is the name of the counter, which is declared in an enum TestCounters.

I don't see the reason why you declare a Cluster variable...

Also, in the code that you mention in your comments, you should store the returned result of the getValue() method in a variable, as the counterValue variable above.

Perhaps, you will find this post useful, as well.

UPDATE: Based on your edit, I believe that all you are looking for is the number of MAP_INPUT_RECORDS which is a default counter, so you don't need to re-implement it.

To get the value of a counter from the Driver class, you can use (taken from this post):

job.getCounters().findCounter(COUNTER_NAME).getValue();

更多推荐

本文发布于:2023-07-31 06:17:00,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1341903.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:计数器   阶段   工作   Mapper   Reducer

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!