本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1233733
请先阅读:
1.Hadoop MapReduce 学习笔记(一) 序言和准备
2.Hadoop MapReduce 学习笔记(二) 序言和准备 2
3.Hadoop MapReduce 学习笔记(三) MapReduce实现类似SQL的SELECT MAX(ID)
4.Hadoop MapReduce 学习笔记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进
5.Hadoop MapReduce 学习笔记(五) MapReduce实现类似SQL的max和min
6.Hadoop MapReduce 学习笔记(六) MapReduce实现类似SQL的max和min 正确写法
下一篇: Hadoop MapReduce 学习笔记(八) MapReduce实现类似SQL的order by/排序
Hadoop MapReduce 学习笔记(六) MapReduce实现类似SQL的max和min 正确写法 只是一列,如序言说的,一张表中有多个列呢?比如想找出序言中USER表最大和最小ID的用户数据,类似SQL:
SELECT * FROM USER WHERE ID=MAX(ID) OR ID= MIN(ID);
还是贴上代码吧,这里引入的概念是自己实现Hadoop的输入输出.Hadoop自己的是IntWritalbe,Text等,有如Java的int,String.但我们想实现自己的类呢.请看代码吧:
1.相对Hadoop来说,自己的输入输出类:
package com.guoyun.hadoop.mapreduce.study; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.WritableComparable; /** * 多列数据,这里格式是:frameworkName(String) number(int) * 等同于数据表 * CREATE TABLE TABLE_NAME( * FRAMEWORK_NAME VARCHAR(32), * NUMBER INT * ) */ public class MultiColumnWritable implements WritableComparable{ protected String frameworkName=""; protected long number=-1; public String getFrameworkName() { return frameworkName; } public void setFrameworkName(String frameworkName) { this.frameworkName = frameworkName; } public long getNumber() { return number; } public void setNumber(long number) { this.number = number; } public MultiColumnWritable() { super(); } @Override public int compareTo(Object obj) { int result=-1; if(obj instanceof MultiColumnWritable){ MultiColumnWritable mcw=(MultiColumnWritable)obj; if(mcw.getNumber()<this.getNumber()){ result =1; }else if(mcw.getNumber()==this.getNumber()){ result=0; } } return result; } @Override public void readFields(DataInput in) throws IOException { frameworkName=in.readUTF(); number=in.readLong(); } @Override public void write(DataOutput out) throws IOException { out.writeUTF(frameworkName); out.writeLong(number); } @Override public String toString() { return frameworkName+"\t"+number; } }
2.获得最大和最小值
package com.guoyun.hadoop.mapreduce.study; import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** * 或得最大和最小值,类似SQL:SELECT * FROM TABLE WHERE NUMBER=MAX(NUMBER) OR NUMBER=MIN(NUMBER) * 这里有多列数据,但只取其中一列的最大和最小 * 如果想对其中几列取最大最小值,请自己实现 @MultiColumnWritable */ public class GetMaxAndMinValueMultiMapReduceTest extends MyMapReduceMultiColumnTest { public static final Logger log=LoggerFactory.getLogger(GetMaxAndMinValueMultiMapReduceTest.class); public GetMaxAndMinValueMultiMapReduceTest(long dataLength) throws Exception { super(dataLength); // TODO Auto-generated constructor stub } public GetMaxAndMinValueMultiMapReduceTest(String outputPath) throws Exception { super(outputPath); // TODO Auto-generated constructor stub } public GetMaxAndMinValueMultiMapReduceTest(long dataLength, String inputPath, String outputPath) throws Exception { super(dataLength, inputPath, outputPath); // TODO Auto-generated constructor stub } public static class MyCombiner extends Reducer<Text,MultiColumnWritable,Text,MultiColumnWritable>{ private final Text maxValueKey=new Text("maxValue"); private final Text minValueKey=new Text("minValue"); @Override public void reduce(Text key, Iterable<MultiColumnWritable> values,Context context) throws IOException, InterruptedException { log.debug("begin to combine"); long maxValue=Long.MIN_VALUE; String maxFrameworkName=""; long minValue=Long.MAX_VALUE; String minFrameworkName=""; long valueTmp=0; String nameTmp=""; MultiColumnWritable writeValue=new MultiColumnWritable(); for(MultiColumnWritable value:values){ valueTmp=value.getNumber(); nameTmp=value.getFrameworkName(); // 其实可以用他们的compare方法 if(valueTmp>maxValue){ maxValue=valueTmp; maxFrameworkName=nameTmp; }else if(valueTmp<minValue){ minValue=valueTmp; minFrameworkName=nameTmp; } } writeValue.setFrameworkName(maxFrameworkName); writeValue.setNumber(maxValue); context.write(maxValueKey, writeValue); writeValue.setFrameworkName(minFrameworkName); writeValue.setNumber(minValue); context.write(minValueKey, writeValue); } } /** * Reduce,to get the max value */ public static class MyReducer extends Reducer<Text,MultiColumnWritable,Text,MultiColumnWritable>{ private final Text maxValueKey=new Text("maxValue"); private final Text minValueKey=new Text("minValue"); @Override public void run(Context context) throws IOException, InterruptedException { long maxValue=Long.MIN_VALUE; long minValue=Long.MAX_VALUE; long tmpValue=0; String tmpFrameworkName=""; String tmpKey=""; String maxFrameworkName=""; String minFrameworkName=""; MultiColumnWritable writeValue=new MultiColumnWritable(); MultiColumnWritable tmpWrite=null; try { setup(context); while(context.nextKey()){ tmpKey=context.getCurrentKey().toString(); tmpWrite=(MultiColumnWritable)context.getCurrentValue(); tmpValue=tmpWrite.getNumber(); tmpFrameworkName=tmpWrite.getFrameworkName(); if(tmpKey.equals("maxValue")){ if(tmpValue>maxValue){ maxValue=tmpValue; maxFrameworkName=tmpFrameworkName; } }else if(tmpKey.equals("minValue")){ if(tmpValue<minValue){ minValue=tmpValue; minFrameworkName=tmpFrameworkName; } } } writeValue.setFrameworkName(maxFrameworkName); writeValue.setNumber(maxValue); context.write(maxValueKey, writeValue); writeValue.setFrameworkName(minFrameworkName); writeValue.setNumber(minValue); context.write(minValueKey, writeValue); } catch (Exception e) { log.debug(e.getMessage()); }finally{ cleanup(context); } } @Override protected void cleanup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.cleanup(context); } @Override protected void setup(Context context) throws IOException, InterruptedException { // TODO Auto-generated method stub super.setup(context); } } /** * @param args */ public static void main(String[] args) { MyMapReduceTest mapReduceTest=null; Configuration conf=null; Job job=null; FileSystem fs=null; Path inputPath=null; Path outputPath=null; long begin=0; String input="testDatas/mapreduce/MRInput_Multi_getMaxAndMin"; String output="testDatas/mapreduce/MROutput_Multi_getMaxAndMin"; try { mapReduceTest=new GetMaxAndMinValueMultiMapReduceTest(2000000,input,output); inputPath=new Path(mapReduceTest.getInputPath()); outputPath=new Path(mapReduceTest.getOutputPath()); conf=new Configuration(); job=new Job(conf,"getMaxAndMinValueMulti"); fs=FileSystem.getLocal(conf); if(fs.exists(outputPath)){ if(!fs.delete(outputPath,true)){ System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!"); return; } } job.setJarByClass(GetMaxAndMinValueMultiMapReduceTest.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(MultiColumnWritable.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(MultiColumnWritable.class); job.setMapperClass(MultiSupMapper.class); job.setCombinerClass(MyCombiner.class); job.setReducerClass(MyReducer.class); job.setNumReduceTasks(2); FileInputFormat.addInputPath(job, inputPath); FileOutputFormat.setOutputPath(job, outputPath); begin=System.currentTimeMillis(); job.waitForCompletion(true); System.out.println("==================================================="); if(mapReduceTest.isGenerateDatas()){ System.out.println("The maxValue is:"+mapReduceTest.getMaxValue()); System.out.println("The minValue is:"+mapReduceTest.getMinValue()); } System.out.println("Spend time:"+(System.currentTimeMillis()-begin)); // Spend time:13361 } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } }
更多技术文章、感悟、分享、勾搭,请用微信扫描:
相关推荐
Hadoop 用mapreduce实现Wordcount实例,绝对能用
用MapReduce实现TF-IDF,Hadoop版本是2.7.7,参考某教程亲自手写的,可以运行,有问题可以留言
在hadoop平台上,用mapreduce编程实现大数据的词频统计
upon the widely used and highly successful Hadoop MapReduce v1. The recipes that will help you analyze large and complex datasets with next generation Hadoop MapReduce will provide you with the skills...
赠送jar包:hadoop-mapreduce-client-core-2.5.1.jar; 赠送原API文档:hadoop-mapreduce-client-core-2.5.1-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-core-2.5.1-sources.jar; 赠送Maven依赖信息文件:...
Hadoop mapreduce 实现KMeans,可用
Hadoop MapReduce Cookbook 高清完整版PDF下载 Hadoop MapReduce Cookbook
本书对Hadoop Mapreduce进行详细讲解,切合实际应用,能够更深入地学习MapReduce,确实是一本不错的书。
Java操作Hadoop Mapreduce基本实践源码.
本章介绍了 Hadoop MapReduce,同时发现它有以下缺点: 1、程序设计模式不容易使用,而且 Hadoop 的 Map Reduce API 太过低级,很难提高开发者的效率。 2、有运行效率问题,MapReduce 需要将中间产生的数据保存到...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python开发源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析...
基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python源码+项目说明).zip基于Hadoop Mapreduce 实现酒店评价文本情感分析(python...
(1)输入start-all.sh启动hadoop相应进程和相关的端口号 (2)打开网站localhost:8088和localhost:50070,查看MapReduce任务启动情况 (3)写wordcount代码并把代码生成jar包 (4)运行命令 (1):把linus下的文件...
Hadoop MapReduce v2 Cookbook (第二版), Packt Publishing
hadoop mapreduce helloworld 能调试 详细内容请看:http://blog.csdn.net/wild46cat/article/details/53641765
这本书都是实例,很接地气,多加练习和阅读,可稳步上升
赠送jar包:hadoop-mapreduce-client-jobclient-2.6.5.jar; 赠送原API文档:hadoop-mapreduce-client-jobclient-2.6.5-javadoc.jar; 赠送源代码:hadoop-mapreduce-client-jobclient-2.6.5-sources.jar; 赠送...
基于Apriori算法的频繁项集Hadoop mapreduce
hadoop-mapreduce-examples-2.7.1.jar
Hadoop mapreduce 实现MatrixMultiply矩阵相乘