当前位置:首页 > 技术 > 后端 > 正文内容

MapReduce之WritableComparable排序

anan3个月前 (07-30)后端74

@

目录


排序概述

  • 排序是MapReduce框架中最重要的操作之一。

  • Map Task和ReduceTask均会默认对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

  • 黑默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

  • 对于MapTask,它会将处理的结果暂时放到一个缓冲区中,当缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次排序,并将这些有序数据写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行一次合并,以将这些文件合并成一个大的有序文件。

  • 对于ReduceTask,它从每个MapTak上远程拷贝相应的数据文件,如果文件大小超过一定阑值,则放到磁盘上,否则放到内存中。如果磁盘上文件数目达到一定阈值,则进行一次合并以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

  • 排序器:排序器影响的是排序的速度(效率,对什么排序?),QuickSorter

  • 比较器:比较器影响的是排序的结果(按照什么规则排序)

获取Mapper输出的key的比较器(源码)

public RawComparator getOutputKeyComparator() {

// 从配置中获取mapreduce.job.output.key.comparator.class的值,必须是RawComparator类型,如果没有配置,默认为null
    Class<? extends RawComparator> theClass = getClass(JobContext.KEY_COMPARATOR, null, RawComparator.class);

// 一旦用户配置了此参数,实例化一个用户自定义的比较器实例
    if (theClass != null){
      return ReflectionUtils.newInstance(theClass, this);
   }
   
//用户没有配置,判断Mapper输出的key的类型是否是WritableComparable的子类,如果不是,就抛异常,如果是,系统会自动为我们提供一个key的比较器
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
  }

案例实操(区内排序)

需求
对每个手机号按照上行流量和下行流量的总和进行内部排序。


思考
因为Map Task和ReduceTask均会默认对数据按照key进行排序,所以需要把流量总和设置为Key,手机号等其他内容设置为value

FlowBeanMapper.java

public class FlowBeanMapper extends Mapper<LongWritable, Text, LongWritable, Text>{
	
	private LongWritable out_key=new LongWritable();
	private Text out_value=new Text();
	
	@Override
	protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
		String[] words = value.toString().split("\t");
		
		//封装总流量为key
		out_key.set(Long.parseLong(words[3]));//切分后,流量和的下标为3
		
		//封装其他内容为value
		out_value.set(words[0]+"\t"+words[1]+"\t"+words[2]);
		
		context.write(out_key, out_value);
	}

}

FlowBeanReducer.java

public class FlowBeanReducer extends Reducer<LongWritable, Text, Text, LongWritable>{
	
	@Override
	protected void reduce(LongWritable key, Iterable<Text> values,
			Reducer<LongWritable, Text, Text, LongWritable>.Context context) throws IOException, InterruptedException {
		
		for (Text value : values) {
			context.write(value, key);
		}
	}
	
}

FlowBeanDriver.java

public class FlowBeanDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("E:\\mroutput\\flowbean");
		Path outputPath=new Path("e:/mroutput/flowbeanSort1");
		
		//作为整个Job的配置
		Configuration conf = new Configuration();
		
		//保证输出目录不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			fs.delete(outputPath, true);
		}
		
		// ①创建Job
		Job job = Job.getInstance(conf);
		
		// ②设置Job
		// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
		job.setMapperClass(FlowBeanMapper.class);
		job.setReducerClass(FlowBeanReducer.class);
		
		// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
		// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
		//由于Mapper和Reducer输出的Key-value类型不一致(maper输出类型是long-text,而reducer是text-value)
		//所以需要额外设定
		job.setMapOutputKeyClass(LongWritable.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(LongWritable.class);
		
		// 设置输入目录和输出目录
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		// 默认升序排,可以设置使用自定义的比较器
		//job.setSortComparatorClass(DecreasingComparator.class);
		
		// ③运行Job
		job.waitForCompletion(true);
			
	}
}

运行结果(默认升序排)

自定义排序器,使用降序

  • 方法一:自定义类,这个类必须是RawComparator类型,通过设置mapreduce.job.output.key.comparator.class自定义的类的类型。
    自定义类时,可以继承WriableComparator类,也可以实现RawCompartor
    调用方法时,先调用RawCompartor. compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2),再调用RawCompartor.compare()

  • 方法二:定义Mapper输出的key,让key实现WritableComparable,实现CompareTo()

MyDescComparator.java

public class MyDescComparator extends WritableComparator{
	
	@Override
    public int compare(byte[] b1, int s1, int l1,byte[] b2, int s2, int l2) {
      long thisValue = readLong(b1, s1);
      long thatValue = readLong(b2, s2);
      //这里把第一个-1改成1,把第二个1改成-1,就是降序排
      return (thisValue<thatValue ? 1 : (thisValue==thatValue ? 0 : -1));
    }

}

运行结果

Key实现Comparable进行比较

思路二:把map输出时的key封装为一个bean,这个key包含上行流量、下行流量、总流量,value只有手机号

FlowBean.java

public class FlowBean implements WritableComparable<FlowBean>{
	
	private long upFlow;
	private long downFlow;
	private Long sumFlow;
	
	public FlowBean() {
		
	}

	public long getUpFlow() {
		return upFlow;
	}

	public void setUpFlow(long upFlow) {
		this.upFlow = upFlow;
	}

	public long getDownFlow() {
		return downFlow;
	}

	public void setDownFlow(long downFlow) {
		this.downFlow = downFlow;
	}

	public long getSumFlow() {
		return sumFlow;
	}

	public void setSumFlow(long sumFlow) {
		this.sumFlow = sumFlow;
	}

	// 序列化   在写出属性时,如果为引用数据类型,属性不能为null
	@Override
	public void write(DataOutput out) throws IOException {
		
		out.writeLong(upFlow);
		out.writeLong(downFlow);
		out.writeLong(sumFlow);
		
		
	}

	//反序列化   序列化和反序列化的顺序要一致
	@Override
	public void readFields(DataInput in) throws IOException {
		upFlow=in.readLong();
		downFlow=in.readLong();
		sumFlow=in.readLong();
		
	}

	@Override
	public String toString() {
		return  upFlow + "\t" + downFlow + "\t" + sumFlow;
	}

	// 系统封装的比较器在对比key时,调用key的compareTo进行比较
	// 降序比较总流量
	@Override
	public int compareTo(FlowBean o) {
		return -this.sumFlow.compareTo(o.getSumFlow());
	}
	
}

FlowBeanMapper.java

public class FlowBeanMapper extends Mapper<LongWritable, Text, FlowBean, Text>{
	
	private FlowBean out_key=new FlowBean();
	private Text out_value=new Text();
	
	
	@Override
	protected void map(LongWritable key, Text value, Context context)
			throws IOException, InterruptedException {
		
		String[] words = value.toString().split("\t");
		
		//封装总流量为key
		out_key.setUpFlow(Long.parseLong(words[1]));
		out_key.setDownFlow(Long.parseLong(words[2]));
		out_key.setSumFlow(Long.parseLong(words[3]));
		
		out_value.set(words[0]);
		
		context.write(out_key, out_value);
	
	}

}

FlowBeanReducer.java

public class FlowBeanReducer extends Reducer<FlowBean, Text, Text, FlowBean>{
	
	@Override
	protected void reduce(FlowBean key, Iterable<Text> values,
			Reducer<FlowBean, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
		
		for (Text value : values) {
			context.write(value, key);
		}
	}
	
}

FlowBeanDriver.java

public class FlowBeanDriver {
	
	public static void main(String[] args) throws Exception {
		
		Path inputPath=new Path("E:\\mroutput\\flowbean");
		Path outputPath=new Path("e:/mroutput/flowbeanSort2");
		
		//作为整个Job的配置
		Configuration conf = new Configuration();
		
		//保证输出目录不存在
		FileSystem fs=FileSystem.get(conf);
		
		if (fs.exists(outputPath)) {
			
			fs.delete(outputPath, true);
			
		}
		
		// ①创建Job
		Job job = Job.getInstance(conf);
		
		// ②设置Job
		// 设置Job运行的Mapper,Reducer类型,Mapper,Reducer输出的key-value类型
		job.setMapperClass(FlowBeanMapper.class);
		job.setReducerClass(FlowBeanReducer.class);
		
		// Job需要根据Mapper和Reducer输出的Key-value类型准备序列化器,通过序列化器对输出的key-value进行序列化和反序列化
		// 如果Mapper和Reducer输出的Key-value类型一致,直接设置Job最终的输出类型
		
		job.setMapOutputKeyClass(FlowBean.class);
		job.setMapOutputValueClass(Text.class);
		
		job.setOutputKeyClass(Text.class);
		job.setOutputValueClass(FlowBean.class);
		
		// 设置输入目录和输出目录
		FileInputFormat.setInputPaths(job, inputPath);
		FileOutputFormat.setOutputPath(job, outputPath);
		
		
		// ③运行Job
		job.waitForCompletion(true);
		
	}

}
打赏
版权声明:所有来源为第三方内容,若本站收录的文章无意侵犯了贵司版权,请给下面邮箱地址来信,我们会及时处理和回复,谢谢。

管理员邮箱:42004990@qq.com

微信公众号

分享给朋友:

相关文章

.NET Core学习笔记(7)——Exception最佳实践

.NET Core学习笔记(7)——Exception最佳实践

1.为什么不要给每个方法都写try catch为每个方法都编写try catch是错误的做法,理由如下:a.重复嵌套的try catch是无用的,多余的。这一点非常容易理解,下面的示例代码中,Outs...

31道Java核心面试题,一次性打包送给你

31道Java核心面试题,一次性打包送给你

先看再点赞,给自己一点思考的时间,微信搜索【沉默王二】关注这个靠才华苟且的程序员。本文 GitHub github.com/itwanger 已收录,里面还有一线大厂整理的面试题,以及我的系列文章。二...

Java线程池ThreadPoolExecutor面试总结思维导图速记

Java线程池ThreadPoolExecutor面试总结思维导图速记

优点降低资源消耗,通过重复利用已创建的线程降低线程创建和销毁造成的消耗。提高响应速度,当任务到达时,可以不需要等待线程创建就能立即执行。提高线程的可管理性类关系接 Executor 一个无返回值的ex...

扯扯Java中的锁

扯扯Java中的锁

前言    又过去了一个周末,最近陆陆续续的看了《并发编程的艺术》一书,对锁有不少感悟,这次就聊聊Java中的锁事。本文纯粹是漫谈,想到哪说到哪,但准确性肯定会保证,倘若有不正确之...

Java面试必问:ThreadLocal终极篇 淦!

Java面试必问:ThreadLocal终极篇 淦!

点赞再看,养成习惯,微信搜一搜【敖丙】关注这个互联网苟且偷生的程序员。本文 GitHub https://github.com/JavaFamily 已收录,有一线大厂面试完整考点、资料以及我的系列文...

OAuth 2.0详解

OAuth 2.0详解

OAuth 2.0详解概念:OAuth(开放授权)是一个开放标准,允许用户让第三方应用访问该用户在某一网站上存储的私密的资源(如基本消息,照片,联系人列表),而无需将 用户名 和 密码 提供给第三方应...