本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1235953
请先阅读:
1.Hadoop MapReduce 学习笔记(一) 序言和准备
2.Hadoop MapReduce 学习笔记(二) 序言和准备 2
3.Hadoop MapReduce 学习笔记(八) MapReduce实现类似SQL的order by/排序
4.Hadoop MapReduce 学习笔记(九) MapReduce实现类似SQL的order by/排序 正确写法
5.Hadoop MapReduce 学习笔记(十) MapReduce实现类似SQL的order by/排序2 对多个字段排序
下一篇:Hadoop MapReduce 学习笔记(十二) MapReduce实现类似SQL的order by/排序3 改进及改正
Hadoop MapReduce 学习笔记(十) MapReduce实现类似SQL的order by/排序2 对多个字段排序 可以实现对多个字段的order by/排序.但这里想引入2个参数,因为这2个参数对性能有所提升,所以再补充了这篇博客.具体还是看代码吧.
package com.guoyun.hadoop.mapreduce.study; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.RawComparator; import org.apache.hadoop.io.WritableComparator; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Partitioner; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 通过MapReduce实现类似SELECT * FROM TABLE ORDER BY COL1 ASC,COL2 DESC功能 * 也就是对多个字段的排序 * 相比 @OrderByMultiMapReduceTest,主要引入了Partitioner和GroupingComparator,提升性能 * 由于生成的数据frameworkName比较固定(具体请查看 @MyMapReduceMultiColumnTest 如何生成的数据) * 所以这里获取map输出key的frameworkName属性,交给Partitioner和GroupingComparator来确定相同 * frameworkName的数据输出到相同的Reduce上,尽可能减少Reduce之前的清洗和排序工作,提升性能. * 具体Partitioner和GroupingComparator的用法请查看Hadoop说明 * 这里只是我目前对Partitioner和GroupingComparator的理解,刻意安排的输入数据.一切还需要验证中,待有机会 * 查看map和reduce源码后再来求证. * 但运行本类后结果和MROutput_Multi_OrderBy并不一致,正确的实现方法请查看类 @OrderByMultiMapReduceImproveFixTest */ public class OrderByMultiMapReduceImproveTest extends OrderByMultiMapReduceTest { public static final Logger log=LoggerFactory.getLogger(OrderByMultiMapReduceImproveTest.class); public OrderByMultiMapReduceImproveTest(long dataLength) throws Exception { super(dataLength); // TODO Auto-generated constructor stub } public OrderByMultiMapReduceImproveTest(String outputPath) throws Exception { super(outputPath); // TODO Auto-generated constructor stub } public OrderByMultiMapReduceImproveTest(String inputPath, String outputPath) { super(inputPath, outputPath); } public OrderByMultiMapReduceImproveTest(long dataLength, String inputPath, String outputPath) throws Exception { super(dataLength, inputPath, outputPath); // TODO Auto-generated constructor stub } /** * partitioner */ public static class MyPartitioner extends Partitioner<OrderMultiColumnWritable,NullWritable>{ @Override public int getPartition(OrderMultiColumnWritable key, NullWritable value, int numbers) { return (int)Math.abs(key.getFrameworkName().hashCode()%numbers); } } public static class MyComparator extends WritableComparator { public MyComparator() { super(OrderMultiColumnWritable.class); } public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return compareBytes(b1, s1, l1, b2, s2, l2); } } static { // register this comparator WritableComparator.define(OrderMultiColumnWritable.class, new MyComparator()); } /** * GroupingComparator */ public static class MyGroupingComparator implements RawComparator<OrderMultiColumnWritable>{ @Override public int compare(OrderMultiColumnWritable o1, OrderMultiColumnWritable o2) { return o1.getFrameworkName().compareTo(o2.getFrameworkName()); //return o1.compareTo(o2); } @Override public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2) { return WritableComparator.compareBytes(b1,s1,l1,b2,s2,l2); } } public static void main(String[] args){ MyMapReduceTest mapReduceTest=null; Configuration conf=null; Job job=null; FileSystem fs=null; Path inputPath=null; Path outputPath=null; long begin=0; String input="testDatas/mapreduce/MRInput_Multi_OrderBy"; String output="testDatas/mapreduce/MROutput_Multi_OrderBy_Improve"; try { // 直接使用MRInput_Single_OrderBy的输入数据,不重新生成数据,以便比对结果是否正确 // 和MROutput_Multi_OrderBy进行比对 mapReduceTest=new OrderByMultiMapReduceImproveTest(input,output); inputPath=new Path(mapReduceTest.getInputPath()); outputPath=new Path(mapReduceTest.getOutputPath()); conf=new Configuration(); job=new Job(conf,"OrderBy"); fs=FileSystem.getLocal(conf); if(fs.exists(outputPath)){ if(!fs.delete(outputPath,true)){ System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!"); return; } } job.setJarByClass(OrderByMultiMapReduceImproveTest.class); job.setMapOutputKeyClass(OrderMultiColumnWritable.class); job.setMapOutputValueClass(NullWritable.class); job.setOutputKeyClass(OrderMultiColumnWritable.class); job.setOutputValueClass(NullWritable.class); job.setMapperClass(MyMapper.class); job.setReducerClass(MyReducer.class); job.setPartitionerClass(MyPartitioner.class); job.setGroupingComparatorClass(MyGroupingComparator.class); job.setNumReduceTasks(2); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); begin=System.currentTimeMillis(); job.waitForCompletion(true); System.out.println("==================================================="); if(mapReduceTest.isGenerateDatas()){ System.out.println("The maxValue is:"+mapReduceTest.getMaxValue()); System.out.println("The minValue is:"+mapReduceTest.getMinValue()); } System.out.println("Spend time:"+(System.currentTimeMillis()-begin)); // Spend time:1280 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
更多技术文章、感悟、分享、勾搭,请用微信扫描:
相关推荐
Hadoop 用mapreduce实现Wordcount实例,绝对能用
用MapReduce实现TF-IDF,Hadoop版本是2.7.7,参考某教程亲自手写的,可以运行,有问题可以留言
在hadoop平台上,用mapreduce编程实现大数据的词频统计
upon the widely used and highly successful Hadoop MapReduce v1. The recipes that will help you analyze large and complex datasets with next generation Hadoop MapReduce will provide you with the skills...
Hadoop MapReduce Cookbook 高清完整版PDF下载 Hadoop MapReduce Cookbook
本书对Hadoop Mapreduce进行详细讲解,切合实际应用,能够更深入地学习MapReduce,确实是一本不错的书。
Hadoop mapreduce 实现KMeans,可用
Java操作Hadoop Mapreduce基本实践源码.
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...
本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: 1、程序设计模式不容易使用,而且 Hadoop 的 Map Reduce API 太过低级,很难提高开发者的效率。 2、有运行效率问题,MapReduce 需要将中间产生的数据保存到...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析...
Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing
hadoop mapreduce helloworld 能调试 详细内容请看:http://blog.csdn.net/wild46cat/article/details/53641765
基于Apriori算法的频繁项集Hadoop mapreduce
这本书都是实例,很接地气,多加练习和阅读,可稳步上升
(1)熟悉Hadoop开发包 (2)编写MepReduce程序 (3)调试和运行MepReduce程序 (4)完成上课老师演示的内容 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 二、实验内容 1.单词计数实验...
Hadoop mapreduce 实现MatrixMultiply矩阵相乘
Hadoop mapreduce 实现NaiveBayes朴素贝叶斯
Hadoop mapreduce 实现InvertedIndexer倒排索引,能用。
Hadoop mapreduce 实现MR_DesicionTreeBuilder 决策树