`
guoyunsky
  • 浏览: 838994 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
3d3a22a0-f00f-3227-8d03-d2bbe672af75
Heritrix源码分析
浏览量:203170
Group-logo
SQL的MapReduce...
浏览量:0
社区版块
存档分类
最新评论

Hadoop MapReduce 学习笔记(七) MapReduce在多字段/列基础上实现类似SQL的max和min

 
阅读更多

   本博客属原创文章,转载请注明出处:http://guoyunsky.iteye.com/blog/1233733

 

      

       请先阅读:           

           1.Hadoop MapReduce 学习笔记(一) 序言和准备

           2.Hadoop MapReduce 学习笔记(二) 序言和准备 2

           3.Hadoop MapReduce 学习笔记(三) MapReduce实现类似SQL的SELECT MAX(ID)

           4.Hadoop MapReduce 学习笔记(四) MapReduce实现类似SQL的SELECT MAX(ID) 2 一些改进

                 5.Hadoop MapReduce 学习笔记(五) MapReduce实现类似SQL的max和min

                 6.Hadoop MapReduce 学习笔记(六) MapReduce实现类似SQL的max和min  正确写法

 

    下一篇: Hadoop MapReduce 学习笔记(八) MapReduce实现类似SQL的order by/排序

 

        Hadoop MapReduce 学习笔记(六) MapReduce实现类似SQL的max和min  正确写法 只是一列,如序言说的,一张表中有多个列呢?比如想找出序言中USER表最大和最小ID的用户数据,类似SQL:

 

 SELECT * FROM USER WHERE ID=MAX(ID) OR ID= MIN(ID);

      还是贴上代码吧,这里引入的概念是自己实现Hadoop的输入输出.Hadoop自己的是IntWritalbe,Text等,有如Java的int,String.但我们想实现自己的类呢.请看代码吧:

 

     1.相对Hadoop来说,自己的输入输出类:

package com.guoyun.hadoop.mapreduce.study;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;
/**
 * 多列数据,这里格式是:frameworkName(String)  number(int)
 * 等同于数据表
 * CREATE TABLE TABLE_NAME(
 *  FRAMEWORK_NAME VARCHAR(32),
 *  NUMBER INT
 * )
 */
public  class MultiColumnWritable implements  WritableComparable{
  protected String frameworkName="";
  protected long number=-1;
  
  public String getFrameworkName() {
    return frameworkName;
  }

  public void setFrameworkName(String frameworkName) {
    this.frameworkName = frameworkName;
  }

  public long getNumber() {
    return number;
  }

  public void setNumber(long number) {
    this.number = number;
  }

  public MultiColumnWritable() {
    super();
  }

  @Override
  public int compareTo(Object obj) {
    int result=-1;
    if(obj instanceof MultiColumnWritable){
      MultiColumnWritable mcw=(MultiColumnWritable)obj;
      if(mcw.getNumber()<this.getNumber()){
        result =1;
      }else if(mcw.getNumber()==this.getNumber()){
        result=0;
      }
    }
    return result;
  }

  @Override
  public void readFields(DataInput in) throws IOException {
    frameworkName=in.readUTF();
    number=in.readLong();
  }

  @Override
  public void write(DataOutput out) throws IOException {
    out.writeUTF(frameworkName);
    out.writeLong(number);
  }

  @Override
  public String toString() {
    return frameworkName+"\t"+number;
  }
  
}

 

  2.获得最大和最小值

package com.guoyun.hadoop.mapreduce.study;

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
 * 或得最大和最小值,类似SQL:SELECT * FROM TABLE WHERE NUMBER=MAX(NUMBER) OR NUMBER=MIN(NUMBER)
 * 这里有多列数据,但只取其中一列的最大和最小
 * 如果想对其中几列取最大最小值,请自己实现 @MultiColumnWritable
 */
public class GetMaxAndMinValueMultiMapReduceTest extends MyMapReduceMultiColumnTest {
  
  public static final Logger log=LoggerFactory.getLogger(GetMaxAndMinValueMultiMapReduceTest.class);

  public GetMaxAndMinValueMultiMapReduceTest(long dataLength) throws Exception {
    super(dataLength);
    // TODO Auto-generated constructor stub
  }

  public GetMaxAndMinValueMultiMapReduceTest(String outputPath) throws Exception {
    super(outputPath);
    // TODO Auto-generated constructor stub
  }

  public GetMaxAndMinValueMultiMapReduceTest(long dataLength, String inputPath,
      String outputPath) throws Exception {
    super(dataLength, inputPath, outputPath);
    // TODO Auto-generated constructor stub
  }
  
  
  public static class MyCombiner
    extends Reducer<Text,MultiColumnWritable,Text,MultiColumnWritable>{
    private final Text maxValueKey=new Text("maxValue");
    private final Text minValueKey=new Text("minValue");
    
    @Override
    public void reduce(Text key, Iterable<MultiColumnWritable> values,Context context)
        throws IOException, InterruptedException {
      log.debug("begin to combine");
      long maxValue=Long.MIN_VALUE;
      String maxFrameworkName="";
      long minValue=Long.MAX_VALUE;
      String minFrameworkName="";
      long valueTmp=0;
      String nameTmp="";
      MultiColumnWritable writeValue=new MultiColumnWritable();
      
      for(MultiColumnWritable value:values){
        valueTmp=value.getNumber();
        nameTmp=value.getFrameworkName();
        
        // 其实可以用他们的compare方法
        if(valueTmp>maxValue){
          maxValue=valueTmp;
          maxFrameworkName=nameTmp;
        }else if(valueTmp<minValue){
          minValue=valueTmp;
          minFrameworkName=nameTmp;
        }
      }
      
      writeValue.setFrameworkName(maxFrameworkName);
      writeValue.setNumber(maxValue);
      context.write(maxValueKey, writeValue);
      writeValue.setFrameworkName(minFrameworkName);
      writeValue.setNumber(minValue);
      context.write(minValueKey, writeValue);
    } 
    
  }
  
  
  /**
   * Reduce,to get the max value
   */
  public static class MyReducer 
    extends Reducer<Text,MultiColumnWritable,Text,MultiColumnWritable>{
    private final Text maxValueKey=new Text("maxValue");
    private final Text minValueKey=new Text("minValue");
      
    @Override
    public void run(Context context) throws IOException, InterruptedException {
      long maxValue=Long.MIN_VALUE;
      long minValue=Long.MAX_VALUE;
      long tmpValue=0;
      String tmpFrameworkName="";
      String tmpKey="";
      String maxFrameworkName="";
      String minFrameworkName="";
      MultiColumnWritable writeValue=new MultiColumnWritable(); 
      MultiColumnWritable tmpWrite=null;
      
      try {
        setup(context);
   
        while(context.nextKey()){
          tmpKey=context.getCurrentKey().toString();
          tmpWrite=(MultiColumnWritable)context.getCurrentValue();
          tmpValue=tmpWrite.getNumber();
          tmpFrameworkName=tmpWrite.getFrameworkName();
          
          if(tmpKey.equals("maxValue")){
            if(tmpValue>maxValue){
              maxValue=tmpValue;
              maxFrameworkName=tmpFrameworkName;
            }
          }else if(tmpKey.equals("minValue")){
            if(tmpValue<minValue){
              minValue=tmpValue;
              minFrameworkName=tmpFrameworkName;
            }
          }
        }
        
        writeValue.setFrameworkName(maxFrameworkName);
        writeValue.setNumber(maxValue);
        context.write(maxValueKey, writeValue);
        writeValue.setFrameworkName(minFrameworkName);
        writeValue.setNumber(minValue);
        context.write(minValueKey, writeValue);
      } catch (Exception e) {
        log.debug(e.getMessage());
      }finally{
        cleanup(context);
      }
       
    }

    @Override
    protected void cleanup(Context context) throws IOException,
        InterruptedException {
      // TODO Auto-generated method stub
      super.cleanup(context);
    }

    @Override
    protected void setup(Context context) throws IOException,
        InterruptedException {
      // TODO Auto-generated method stub
      super.setup(context);
    }
    
  }
  
  /**
   * @param args
   */
  public static void main(String[] args) {
    MyMapReduceTest mapReduceTest=null;
    Configuration conf=null;
    Job job=null;
    FileSystem fs=null;
    Path inputPath=null;
    Path outputPath=null;
    long begin=0;
    String input="testDatas/mapreduce/MRInput_Multi_getMaxAndMin";
    String output="testDatas/mapreduce/MROutput_Multi_getMaxAndMin";
    
    
    try {
      mapReduceTest=new GetMaxAndMinValueMultiMapReduceTest(2000000,input,output);
      
      inputPath=new Path(mapReduceTest.getInputPath());
      outputPath=new Path(mapReduceTest.getOutputPath());
      
      conf=new Configuration();
      job=new Job(conf,"getMaxAndMinValueMulti");
      
      fs=FileSystem.getLocal(conf);
      if(fs.exists(outputPath)){
        if(!fs.delete(outputPath,true)){
          System.err.println("Delete output file:"+mapReduceTest.getOutputPath()+" failed!");
          return;
        }
      }
      
      
      job.setJarByClass(GetMaxAndMinValueMultiMapReduceTest.class);
      job.setMapOutputKeyClass(Text.class);
      job.setMapOutputValueClass(MultiColumnWritable.class);
      job.setOutputKeyClass(Text.class);
      job.setOutputValueClass(MultiColumnWritable.class);
      job.setMapperClass(MultiSupMapper.class);
      job.setCombinerClass(MyCombiner.class);
      job.setReducerClass(MyReducer.class);
      
      job.setNumReduceTasks(2);
      
      FileInputFormat.addInputPath(job, inputPath);
      FileOutputFormat.setOutputPath(job, outputPath);
      
      begin=System.currentTimeMillis();
      job.waitForCompletion(true);
      
      System.out.println("===================================================");
      if(mapReduceTest.isGenerateDatas()){
        System.out.println("The maxValue is:"+mapReduceTest.getMaxValue());
        System.out.println("The minValue is:"+mapReduceTest.getMinValue());
      }
      System.out.println("Spend time:"+(System.currentTimeMillis()-begin));
      // Spend time:13361
      
    } catch (Exception e) {
      // TODO Auto-generated catch block
      e.printStackTrace();
    }
    

  }
  

}

 

 

更多技术文章、感悟、分享、勾搭,请用微信扫描:

0
1
分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics