本博客属创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1235954
本博客已迁移到本人独立博客: http://www.yun5u.com/articles/hadoop-mapreduce-sql-order-by-sort-improve-fix.html
请先阅读:
1.Hadoop MapReduce 学习笔记(一) 序言和准备
2.Hadoop MapReduce 学习笔记(二) 序言和准备 2
3.Hadoop MapReduce 学习笔记(八) MapReduce实现类似SQL的order by/排序
4.Hadoop MapReduce 学习笔记(九) MapReduce实现类似SQL的order by/排序 正确写法
5.Hadoop MapReduce 学习笔记(十) MapReduce实现类似SQL的order by/排序2 对多个字段排序
6.Hadoop MapReduce 学习笔记(十一) MapReduce实现类似SQL的order by/排序3 改进
下一篇:
上一篇博客 Hadoop MapReduce 学习笔记(十一) MapReduce实现类似SQL的order by/排序3 改进 获得的结果并不是正确的结果,折腾了一小时没找到原因.于是参考hadoop/examples下面的SecondarySort.照搬里面的一些做法才纠正.这里先标记一下,待日后了解原理后再找出答案.
package com.guoyun.hadoop.mapreduce.study;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.RawComparator;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.WritableComparator;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* 通过MapReduce实现类似SELECT * FROM TABLE ORDER BY COL1 ASC,COL2 DESC功能
* 也就是对多个字段的排序
* 相比 @OrderByMultiMapReduceTest,主要引入了Partitioner和GroupingComparator,提升性能
* 由于生成的数据frameworkName比较固定(具体请查看 @MyMapReduceMultiColumnTest 如何生成的数据)
* 所以这里获取map输出key的frameworkName属性,交给Partitioner和GroupingComparator来确定相同
* frameworkName的数据输出到相同的Reduce上,尽可能减少Reduce之前的清洗和排序工作,提升性能.
* 具体Partitioner和GroupingComparator的用法请查看Hadoop说明.
* 这里只是我目前对Partitioner和GroupingComparator的理解,刻意安排的输入数据.一切还需要验证中,待有机会
* 查看map和reduce源码后再来求证.
* 本类相比 @OrderByMultiMapReduceImproveTest 纠正了结果不正确的错误
*
* 注:
* 查看结果可以发现,其实这也是一个group by的实现
*/
public class OrderByMultiMapReduceImproveFixTest extends
OrderByMultiMapReduceTest {
public static final Logger log=LoggerFactory.getLogger(OrderByMultiMapReduceImproveFixTest.class);
public OrderByMultiMapReduceImproveFixTest(long dataLength, String inputPath,
String outputPath) throws Exception {
super(dataLength, inputPath, outputPath);
// TODO Auto-generated constructor stub
}
public OrderByMultiMapReduceImproveFixTest(long dataLength) throws Exception {
super(dataLength);
// TODO Auto-generated constructor stub
}
public OrderByMultiMapReduceImproveFixTest(String inputPath, String outputPath) {
super(inputPath, outputPath);
// TODO Auto-generated constructor stub
}
public OrderByMultiMapReduceImproveFixTest(String outputPath)
throws Exception {
super(outputPath);
// TODO Auto-generated constructor stub
}
/**
* 继承OrderMultiColumnWritable,新增WritableComparator,并注入到WritableComparator中
* 增加本类就可以解决OrderByMultiMapReduceImproveTest输出结果不一致的错误,具体原因还待探索
*/
public static class OrderMultiColumnFixWritable extends OrderMultiColumnWritable{
/**
* 增加这个WritableComparator就可以解决OrderByMultiMapReduceImproveTest
* 原理还不清楚,待探索
*/
public static class MyComparator extends WritableComparator {
public MyComparator() {
super(OrderMultiColumnWritable.class);
}
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
return compareBytes(b1, s1, l1, b2, s2, l2);
}
}
static {
// register this comparator
WritableComparator.define(OrderMultiColumnWritable.class, new MyComparator());
}
}
/**
* map,get the source datas,and generate a (key,value) pair as (MultiWritable,NullWritable)
*/
public static class MyMapper extends Mapper<LongWritable,Text,OrderMultiColumnWritable,LongWritable>{
private OrderMultiColumnWritable writeKey=new OrderMultiColumnWritable();
private LongWritable writeValue=new LongWritable(0);
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
log.debug("begin to map");
String[] splits=null;
try {
splits=value.toString().split("\\t");
if(splits!=null&&splits.length==2){
writeKey.set(splits[0],Long.parseLong(splits[1].trim()));
writeValue.set(writeKey.getNumber());
}
} catch (NumberFormatException e) {
log.error("map error:"+e.getMessage());
}
context.write(writeKey, writeValue);
}
}
/**
* reduce,only use to output the result
*/
public static class MyReducer
extends Reducer<OrderMultiColumnWritable,LongWritable,Text,LongWritable>{
private Text writeKey=new Text();
@Override
protected void reduce(OrderMultiColumnWritable key,
Iterable<LongWritable> values,Context context) throws IOException,
InterruptedException {
writeKey.set(key.getFrameworkName());
for(LongWritable value:values){
context.write(writeKey, value);
}
}
}
/**
* partitioner
*/
public static class MyPartitioner extends Partitioner<OrderMultiColumnWritable,LongWritable>{
@Override
public int getPartition(OrderMultiColumnWritable key, LongWritable value,
int numbers) {
return (int)Math.abs(key.getFrameworkName().hashCode()%numbers);
}
}
/**
* GroupingComparator
*/
public static class MyGroupingComparator implements RawComparator<OrderMultiColumnWritable>{
@Override
public int compare(OrderMultiColumnWritable o1,
OrderMultiColumnWritable o2) {
return o1.getFrameworkName().compareTo(o2.getFrameworkName());
}
@Override
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2,
int l2) {
return WritableComparator.compareBytes(b1,s1,l1,b2,s2,l2);
}
}
public static void main(String[] args){
MyMapReduceTest mapReduceTest=null;
Configuration conf=null;
Job job=null;
FileSystem fs=null;
Path inputPath=null;
Path outputPath=null;
long begin=0;
String input="testDatas/mapreduce/MRInput_Multi_OrderBy";
String output="testDatas/mapreduce/MROutput_Multi_OrderBy_Improve_Fix";
try {
// 直接使用MRInput_Single_OrderBy的输入数据,不重新生成数据,以便比对结果是否正确
// 和MROutput_Multi_OrderBy输出结果进行比对
mapReduceTest=new OrderByMultiMapReduceImproveFixTest(input,output);
inputPath=new Path(mapReduceTest.getInputPath());
outputPath=new Path(mapReduceTest.getOutputPath());
conf=new Configuration();
job=new Job(conf,"OrderBy");
fs=FileSystem.getLocal(conf);
if(fs.exists(outputPath)){
if(!fs.delete(outputPath,true)){
System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");
return;
}
}
job.setJarByClass(OrderByMultiMapReduceImproveFixTest.class);
job.setMapOutputKeyClass(OrderMultiColumnWritable.class);
job.setMapOutputValueClass(LongWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);
job.setMapperClass(MyMapper.class);
job.setReducerClass(MyReducer.class);
job.setPartitionerClass(MyPartitioner.class);
job.setGroupingComparatorClass(MyGroupingComparator.class);
job.setNumReduceTasks(2);
FileInputFormat.addInputPath(job, inputPath);
FileOutputFormat.setOutputPath(job, outputPath);
begin=System.currentTimeMillis();
job.waitForCompletion(true);
System.out.println("===================================================");
if(mapReduceTest.isGenerateDatas()){
System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());
System.out.println("The minValue is:"+mapReduceTest.getMinValue());
}
System.out.println("Spend time:"+(System.currentTimeMillis()-begin));
// Spend time:1270
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
更多技术文章、感悟、分享、勾搭,请用微信扫描:
相关推荐
Hadoop 用mapreduce实现Wordcount实例,绝对能用
用MapReduce实现TF-IDF,Hadoop版本是2.7.7,参考某教程亲自手写的,可以运行,有问题可以留言
在hadoop平台上,用mapreduce编程实现大数据的词频统计
upon the widely used and highly successful Hadoop MapReduce v1. The recipes that will help you analyze large and complex datasets with next generation Hadoop MapReduce will provide you with the skills...
Hadoop MapReduce Cookbook 高清完整版PDF下载 Hadoop MapReduce Cookbook
本书对Hadoop Mapreduce进行详细讲解,切合实际应用,能够更深入地学习MapReduce,确实是一本不错的书。
Hadoop mapreduce 实现KMeans,可用
本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: 1、程序设计模式不容易使用,而且 Hadoop 的 Map Reduce API 太过低级,很难提高开发者的效率。 2、有运行效率问题,MapReduce 需要将中间产生的数据保存到...
Java操作Hadoop Mapreduce基本实践源码.
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析...
Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing
(1)熟悉Hadoop开发包 (2)编写MepReduce程序 (3)调试和运行MepReduce程序 (4)完成上课老师演示的内容 二、实验环境 Windows 10 VMware Workstation Pro虚拟机 Hadoop环境 Jdk1.8 二、实验内容 1.单词计数实验...
hadoop mapreduce helloworld 能调试 详细内容请看:http://blog.csdn.net/wild46cat/article/details/53641765
这本书都是实例,很接地气,多加练习和阅读,可稳步上升
基于Apriori算法的频繁项集Hadoop mapreduce
Hadoop mapreduce 实现MatrixMultiply矩阵相乘
Hadoop mapreduce 实现NaiveBayes朴素贝叶斯
MapReduce is the distribution system that the Hadoop MapReduce engine uses to distribute work around a cluster by working parallel on smaller data sets. It is useful in a wide range of applications, ...
<groupId>com.hadoop.mapreduce</groupId> <artifactId>wordcount</artifactId> <version>0.0.1-SNAPSHOT</version> <packaging>jar</packaging> <name>wordcount</name> <url>http://maven.apache.org</url>...