MapReduce2中自定义排序分组

编程入门 行业动态 更新时间:2024-10-24 01:52:06

MapReduce2中<a href=https://www.elefans.com/category/jswz/34/1771438.html style=自定义排序分组"/>

MapReduce2中自定义排序分组

 

1 Map 、Reduce和主类 

  

package com.wzt.mapreduce.secondsort;import java.io.IOException;import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;import com.wzt.mapreduce.wordcount.WCRunner;public class SecSortMain {public static class SecSortMapper extends Mapper<LongWritable, Text, FirstSortEntity, IntWritable> {protected void map(LongWritable key, Text value, Context context)throws  IOException, InterruptedException {String line = value.toString();String[] spilted = line.split(" ");// 为了显示效果而输出Mapper的输出键值对信息System.out.println("Mapper输出<" + spilted[0] + "," + spilted[1] + ">"+this);context.write(new FirstSortEntity(spilted[0], Integer.parseInt(spilted[1]))  , new IntWritable(Integer.parseInt(spilted[1])) );};}public static class SecSortReducer extends Reducer<FirstSortEntity, IntWritable , FirstSortEntity, IntWritable> {@Overrideprotected void reduce(FirstSortEntity key,Iterable<IntWritable> values,Context context)throws IOException, InterruptedException {// 显示次数表示redcue函数被调用了多少次,表示k2有多少个分组System.out.println("Reducer输入分组<" + key+ ",N(N>=1)>"+this);StringBuffer sb = new StringBuffer() ; for (IntWritable value : values) {//count += value.get();// 显示次数表示输入的k2,v2的键值对数量sb.append( value+" , " ) ;System.out.println("Reducer输入键值对<" + key.toString() + "," + value.get() + ">  组"+sb.toString() );}
//			if(sb.length()>0){
//				sb.deleteCharAt( -1 ) ;
//			}context.write(key, key.getSecondkey());//context.write(key.getFirstkey(),  new Text(sb.toString() ));}}public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {Configuration conf = new Configuration() ; Job job = Job.getInstance(conf) ;job.setJarByClass(WCRunner.class );job.setMapperClass( SecSortMapper.class );job.setMapOutputKeyClass( FirstSortEntity.class);job.setMapOutputValueClass( IntWritable.class );//设置分区方法job.setPartitionerClass( SSPartintioner.class);//不同//会有几个reduce去执行最后的汇总数据, 有几个分区就要有几个reduce ,最后就会生成几个reduce ,这里设置为1 ,没看到调用但是确实分区了,没弄明白job.setNumReduceTasks(1);//当任务数为1的时候设置Partitioner是没有用的//数据做总的排序job.setSortComparatorClass(MySSSortComparator.class) ; //排序//总数据  记性分组 job.setGroupingComparatorClass( GroupComparator.class );//分组job.setReducerClass( SecSortReducer.class );job.setOutputKeyClass( FirstSortEntity.class );job.setOutputValueClass(IntWritable.class );//		FileInputFormat.setInputPaths(job,  "/wc/input/xiyou.txt");
//		FileOutputFormat.setOutputPath(job,  new Path("/wc/output6"));FileInputFormat.setInputPaths(job,  "/sort/input");FileOutputFormat.setOutputPath(job,  new Path("/sort/output1"));job.waitForCompletion(true) ; }
}

 

2 自定义 组合key 

 

package com.wzt.mapreduce.secondsort;import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparable;
/*** 自定义组合件 * @author root**/
public class FirstSortEntity implements WritableComparable<FirstSortEntity>{private Text firstkey ; private IntWritable secondkey ;public FirstSortEntity( ) {}public FirstSortEntity(Text firstkey, IntWritable secondkey) {this.firstkey = firstkey;this.secondkey = secondkey;}public FirstSortEntity(String firstkey, int secondkey) {this.firstkey = new Text(firstkey);this.secondkey = new IntWritable(secondkey);}public Text getFirstkey() {return firstkey;}public void setFirstkey(Text firstkey) {this.firstkey = firstkey;}public IntWritable getSecondkey() {return secondkey;}public void setSecondkey(IntWritable secondkey) {this.secondkey = secondkey;}/*** 对象序列化*/@Overridepublic void write(DataOutput out) throws IOException {out.writeUTF(firstkey.toString() );out.writeInt(  secondkey.get() );}//对象反序列化@Overridepublic void readFields(DataInput in) throws IOException {firstkey = new Text(in.readUTF() );secondkey = new IntWritable(in.readInt()); }/*** 排序在map执行后数据传出后 会调用这个方法对key进行排序 * 数据map后,如果设置了分区并且reduce>1 的话,会执行分区类方法,进行分区*/@Overridepublic int compareTo(FirstSortEntity entity) {//利用这个来控制升序或降序//this本对象写在前面代表是升序//this本对象写在后面代表是降序return this.firstkeypareTo( entity.getFirstkey());//return this.secondkey.get()>entity.getSecondkey().get()?1:-1;	}@Overridepublic String toString() {return this.getFirstkey() +" "+this.getSecondkey()+ "   "  ;} }

 3 自定义分区 

 

package com.wzt.mapreduce.secondsort;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Partitioner;//自定义 分区
public class SSPartintioner extends Partitioner<FirstSortEntity, IntWritable>{/*** key map输出的key* value map 输出的value *  map后的数据 经过排序后传进这个分区方法,如果返回的值相同的数据,值相同的数据会分配到一组中 ,即 放到一堆 *  到此 数据为N堆,并且数据是经过排序的 */@Overridepublic int getPartition(FirstSortEntity key, IntWritable value,int numPartitions) {System.out.println("Partitioner  key:"+key.getFirstkey()+"  value:"+value+"  "+ ( ( key.getFirstkey().hashCode()&Integer.MAX_VALUE)%numPartitions ) +"   "+this);//System.out.println("Partitioner  key:"+key.getFirstkey()+"  value:"+value+"  "+ ((key.getSecondkey().get()&Integer.MAX_VALUE)%numPartitions) +"   "+this);return (key.getFirstkey().hashCode()&Integer.MAX_VALUE)%numPartitions;//return (key.getSecondkey().get()&Integer.MAX_VALUE)%numPartitions;}}

   个人理解以上都是在Map阶段进行,即本地操作,以下为Map到Reduce这段进行的

 

4  自定义整体排序 

  

package com.wzt.mapreduce.secondsort;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;//组内自定义排序策略
/*** @author root**/
public class MySSSortComparator extends WritableComparator{public MySSSortComparator() {//注册处理的试题类型 super(FirstSortEntity.class,true);}/***  reduce 处理数据之前 *  对全量数据排序 *  逻辑:分组一样则按照第二个参数排序  ,分组不一样,则按照第一个参数排序  */@Overridepublic int compare(WritableComparable a, WritableComparable b) {FirstSortEntity e1 = (FirstSortEntity)a;FirstSortEntity e2 = (FirstSortEntity)b;System.out.println( e1.getFirstkey()+"==MySSSortComparator 排序 。。 "+e2.getFirstkey());//首先要保证是同一个组内,同一个组的标识就是第一个字段相同if(!e1.getFirstkey().equals( e2.getFirstkey())){return e1.getFirstkey()pareTo(e2.getFirstkey());}else{return e1.getSecondkey().get() - e2.getSecondkey().get() ; }}
}

 

5 自定义分组  

 

   

package com.wzt.mapreduce.secondsort;import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;//对象分组策略 
//数据放到 reduce前 ,对数据进行分组 
public class GroupComparator extends WritableComparator{public GroupComparator() { //注册处理的试题类型 super(FirstSortEntity.class,true ) ; }/*** 对排序后的数据 分组, * 第一个参数相同的,放到一个key的 迭代器 集合中  */@Overridepublic int compare(WritableComparable a, WritableComparable b) {FirstSortEntity e1 = (FirstSortEntity)a;FirstSortEntity e2 = (FirstSortEntity)b;System.out.println( e1.getFirstkey()+"==GroupComparator = 分组=="+e2.getFirstkey());return  e1.getFirstkey().toString()pareTo( e2.getFirstkey().toString());//return  e1.getSecondkey()pareTo( e2.getSecondkey());}
}

 在以后就是主类中的reduce进行数据处理

  下面这个类作为自己的记录,这里没用:

   

package com.wzt.mapreduce.secondsort;import java.io.ByteArrayInputStream;
import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;//自定义分组比较器
//这个类 暂时没用, 分组比较器的 实现,但没有测试 
public class SSGroupComparator implements RawComparator<FirstSortEntity>{@Overridepublic int compare(FirstSortEntity o1, FirstSortEntity o2) {return o1.getSecondkey().get()>o2.getSecondkey().get()?1:-1;}@Overridepublic int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) {//对象可以这样反序列化 //IntWritable d ; System.out.println( "SSGroupComparator   自定义分组 =" );ByteArrayInputStream bis = new ByteArrayInputStream(b1);DataInput in1 = new DataInputStream(bis); FirstSortEntity entity1 = new FirstSortEntity();ByteArrayInputStream bis2 = new ByteArrayInputStream(b2);DataInput in2 = new DataInputStream(bis2); FirstSortEntity entity2 = new FirstSortEntity();try {entity1.readFields(in1);entity2.readFields(in2);} catch (IOException e) {e.printStackTrace();}return entity1.getFirstkey()pareTo( entity2.getFirstkey());}}

 

 

 

 

 

更多推荐

MapReduce2中自定义排序分组

本文发布于:2024-03-04 02:00:39,感谢您对本站的认可!
本文链接:https://www.elefans.com/category/jswz/34/1707960.html
版权声明:本站内容均来自互联网,仅供演示用,请勿用于商业和其他非法用途。如果侵犯了您的权益请与我们联系,我们将在24小时内删除。
本文标签:自定义

发布评论

评论列表 (有 0 条评论)
草根站长

>www.elefans.com

编程频道|电子爱好者 - 技术资讯及电子产品介绍!