`
guoyunsky
  • 浏览: 839481 次
  • 性别: Icon_minigender_1
  • 来自: 上海
博客专栏
3d3a22a0-f00f-3227-8d03-d2bbe672af75
Heritrix源码分析
浏览量:203280
Group-logo
SQL的MapReduce...
浏览量:0
社区版块
存档分类
最新评论

Sqoop源码分析(四) Sqoop中通过hadoop mapreduce从关系型数据库import数据分析

 
阅读更多

      本博客属原创文章,转载请务必注明出处:http://guoyunsky.iteye.com/blogs/1213966/         

       欢迎加入Hadoop超级群: 180941958

           Sqoop中一大亮点就是可以通过hadoop的mapreduce从关系型数据库中导入数据到HDFS,如此可以加快导入时间.一直想了解MapReduce,所以也仔细的阅读了下相关代码,整理成这篇博客.

            .原理:

             Sqoop在import时,需要制定split-by参数.Sqoop根据不同的split-by参数值来进行切分,然后将切分出来的区域分配到不同map中.每个map中再处理数据库中获取的一行一行的值,写入到HDFS中.同时split-by根据不同的参数类型有不同的切分方法,如比较简单的int型,Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来确定划分几个区域。比如select max(split_by),min(split-by) from得到的max(split-by)和min(split-by)分别为1000和1,而num-mappers为2的话,则会分成两个区域(1,500)和(501-100),同时也会分成2个sql给2个map去进行导入操作,分别为select XXX from table where split-by>=1 and split-by<500和select XXX from table where split-by>=501 and split-by<=1000.最后每个map各自获取各自SQL中的数据进行导入工作。

      

       二.mapreduce job所需要的各种参数在Sqoop中的实现

           1)InputFormatClass
              com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat
           2)OutputFormatClass
               1)TextFile
                   com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat
               2)SequenceFile
                   org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat 
               3)AvroDataFile
                   com.cloudera.sqoop.mapreduce.AvroOutputFormat
          3)Mapper
             1)TextFile
                 com.cloudera.sqoop.mapreduce.TextImportMapper
             2)SequenceFile
                 com.cloudera.sqoop.mapreduce.SequenceFileImportMapper
            3)AvroDataFile
                 com.cloudera.sqoop.mapreduce.AvroImportMapper
         4)taskNumbers
            1)mapred.map.tasks (对应num-mappers参数)
            2)job.setNumReduceTasks(0);

 

       这里以我命令行:import --connect jdbc:mysql://localhost/sqoop_datas  --username root --password 123456 --query "select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2  WHERE $CONDITIONS" --target-dir /tmp/sqoop/foo2 -split-by sqoop_1.id   --hadoop-home=/home/guoyun/Downloads/hadoop-0.20.2-CDH3B4  --num-mappers 2

       注:红色部分参数,后接根据我的命令衍生的参数值

      1)设置Input

       DataDrivenImportJob.configureInputFormat(Job job, String tableName,String tableClassName, String splitByCol)
           a)DBConfiguration.configureDB(Configuration conf, String driverClass,
           String dbUrl, String userName, String passwd, Integer fetchSize)

                1).mapreduce.jdbc.driver.class com.mysql.jdbc.Driver
                2).mapreduce.jdbc.url  jdbc:mysql://localhost/sqoop_datas
                3).mapreduce.jdbc.username  root
                4).mapreduce.jdbc.password  123456
                5).mapreduce.jdbc.fetchsize -2147483648
            b)DataDrivenDBInputFormat.setInput(Job job,Class<? extends DBWritable> inputClass,
           String inputQuery, String inputBoundingQuery)
               1)job.setInputFormatClass(DBInputFormat.class);
               2)mapred.jdbc.input.bounding.query SELECT MIN(sqoop_1.id), MAX(sqoop_2.id) FROM (select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2  WHERE  (1 = 1) ) AS t1
               3)job.setInputFormatClass(DataDrivenDBInputFormat.class);
               4)mapreduce.jdbc.input.orderby sqoop_1.id
           c)mapreduce.jdbc.input.class QueryResult
           d)sqoop.inline.lob.length.max 16777216
         3)job.setInputFormatClass(inputFormatClass); class com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat

 

   2)设置Output 

    ImportJobBase.configureOutputFormat(Job job, String tableName,String tableClassName)
             a)job.setOutputFormatClass(getOutputFormatClass());
             b)FileOutputFormat.setOutputCompressorClass(job, codecClass);
             c)SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
             d)FileOutputFormat.setOutputPath(job, outputPath);

 

    3)设置Map

       DataDrivenImportJob.configureMapper(Job job, String tableName,String tableClassName)
              a) job.setOutputKeyClass(Text.class);
              b)job.setOutputValueClass(NullWritable.class);
              c)job.setMapperClass(com.cloudera.sqoop.mapreduce.TextImportMapper);

 

    4)设置task number

       JobBase.configureNumTasks(Job job)
              mapred.map.tasks 4
              job.setNumReduceTasks(0);

 

       三。大概流程

          1.读取要导入数据的表结构,生成运行类,默认是QueryResult,打成jar包,然后提交给Hadoop

          2.设置好job,主要也就是设置好以上二中的各个参数

          3.这里就由Hadoop来执行MapReduce来执行Import命令了,

             1)首先要对数据进行切分,也就是DataSplit

                 DataDrivenDBInputFormat.getSplits(JobContext job)

             2)切分好范围后,写入范围,以便读取

                DataDrivenDBInputFormat.write(DataOutput output) 

                这里是lowerBoundQuery and  upperBoundQuery

             3)读取以上2)写入的范围

                DataDrivenDBInputFormat.readFields(DataInput input)

             4)然后创建RecordReader从数据库中读取数据

                DataDrivenDBInputFormat.createRecordReader(InputSplit split,TaskAttemptContext context)

             5)创建Map

                TextImportMapper.setup(Context context)

             6)RecordReader一行一行从关系型数据库中读取数据,设置好Map的Key和Value,交给Map

                DBRecordReader.nextKeyValue() 

             7)运行map

                 TextImportMapper.map(LongWritable key, SqoopRecord val, Context context)

                  最后生成的Key是行数据,由QueryResult生成,Value是NullWritable.get()

 

  四.总结

       通过这些,我大概了解了MapReduce运行流程.但对于Sqoop这种切分方式感觉还是有很大的问题.比如这里根据ID范围来切分,如此切分出来的数据会很不平均,比如min(split-id)=1,max(split-id)=3000,交给三个map来处理。那么范围是(1-1000),(1001-2000),(2001-3000).而假如1001-2000是没有数据,已经被删除了。那么这个map就什么都不能做。而其他map却累的半死。如此就会拖累job的运行结果。我这里说的范围很小,比如有几十亿条数据交给几百个map去做。map一多,如果任务不均衡就会影响进度。看有没有更好的切分方式?比如取样?如此看来,写好map reduce也不简单!

 

更多技术文章、感悟、分享、勾搭,请用微信扫描:

 

分享到:
评论
2 楼 lookqlp 2012-08-31  
guo总:

你split-by举例是int型的,经过测试若是string型,会出现bug,例如原库数据1000条,使用-m 3  split-by string型字段,在hive中数据量变成了3000,也就是map个数的倍数。

这是不是说,若没有合适的int递增型字段来指定splitby的话,只能使用1个map。
1 楼 SmartMessage 2012-03-28  
写的不错 学习了

相关推荐

    sqoop-1.4.6.bin-hadoop-2.0.4-alpha版本的压缩包,直接下载到本地,解压后即可使用

    Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,...

    Apache Hadoop---Sqoop.docx

    Sqoop可以将一个关系型数据库(例如MySQL、Oracle、PostgreSQL等)中的数据导入Hadoop的HDFS、Hive中,也可以将HDFS、Hive中的数据导入关系型数据库中。Sqoop充分利用了Hadoop的优点,整个数据导入导出过程都是用...

    大数据运维技术第9章 Sqoop组件安装配置.pptx

    多数使用Hadoop技术处理大数据业务的企业,有大量的数据存储在关系型数据中。由于没有工具支持,Hadoop和关系型数据库之间的数据传输是很困难的事情。传统的应用程序管理系统,即应用程序与使用RDBMS的关系数据库的...

    Sqoop同步数据命令

    Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以使用Sqoop将数据从MySQL或Oracle等关系数据库管理系统(RDBMS)导入Hadoop分布式文件系统(HDFS),在Hadoop MapReduce中转换数据,然后将数据...

    在Hadoop集群环境中为MySQL安装配置Sqoop的教程

    Sqoop中一大亮点就是可以通过hadoop的mapreduce把数据从关系型数据库中导入数据到HDFS。 一、安装sqoop 1、下载sqoop压缩包,并解压 压缩包分别是:sqoop-1.2.0-CDH3B4.tar.gz,hadoop-0.20.2-CDH3B4.tar.gz, ...

    hadoop安装与配置.pdf

    此外,Hadoop广义上指的是一个更广泛的概念,即Hadoop生态系统,其中还包括了Hive数据仓库工具、HBase非关系型数据库、Zookeeper分布式协调服务、Kafka消息队列、Sqoop数据导入导出等其他组件。 Hadoop的创始人是...

    Hadoop权威指南

    使用Sqoop从关系型数据库载入数据到HDFS;使用Pig查询语言进行大规模数据处理;使用Hadoop的数据仓库系统Hive分析数据集;利用HBase处理结构化和半结构化数据,以及利用ZooKeeper构建分布式系统。

    sqoop的使用

    sqoop的使用 ----sqoop是一个用来在hadoop体系和关系型数据库之间进行数据互导的工具 ----实质就是将导入导出命令转换成mapreduce程序来实现

    HADOOP权威指南 第3版 PDF电子书下载 带目录书签 完整版.z01

    使用Sqoop从关系型数据库载入数据到HDFS;使用Pig查询语言进行大规模数据处理;使用Hadoop的数据仓库系统Hive分析数据集;利用HBase处理结构化和半结构化数据,以及利用ZooKeeper构建分布式系统……

    Hadoop权威指南 第二版(中文版)

     关系型数据库管理系统  网格计算  志愿计算  1.3.4 Hadoop 发展简史  Apache Hadoop和Hadoop生态圈 第2章 关于MapReduce  一个气象数据集  数据的格式  使用Unix工具进行数据分析  使用Hadoop分析数据  ...

    Hadoop权威指南(中文版)2015上传.rar

    关系型数据库管理系统 网格计算 志愿计算 1.3.4 Hadoop 发展简史 Apache Hadoop和Hadoop生态圈 第2章 关于MapReduce 一个气象数据集 数据的格式 使用Unix工具进行数据分析 使用Hadoop分析数据 map阶段和reduce阶段 ...

    word源码java-hadoop-v1:hadoop入门

    word源码java hadoop生态圈 spark生态圈 大数据概述 初识hadoop 分布式文件系统HDFS 分布式资源调度YARN ...sqoop(关系型数据库&lt;-&gt;hdfs) 数据存储 hadoop(hdfs) 数据处理/分析/挖掘 hadoop、spark、flink、hi

    sqoop详细教程

    sqoop详细教程。Apache Sqoop是用来实现结构型数据(如关系数据库)和Hadoop之间进行数据迁移的工具。它充分利用了MapReduce的并行特点以批处理的方式加快数据的传输,同时也借助MapReduce实现了容错。

    大数据处理流程.pdf

    它可以 将⼀个关系数据库中数据导⼊Hadoop的HDFS中,也可以将HDFS中的数据导⼊关系型数据库中。 Flume:实时数据采集的⼀个开源框架,它是Cloudera提供的⼀个⾼可⽤⽤的、⾼可靠、分布式的海量⽇志采集、聚合和传输...

    基于hadoop生态实现的的电影网站+源代码+文档说明

    以此来收集用户评分信息(MySQL),每过一段时间就对该时段内的评分数据进行协同过滤算法的MapReduce计算,计算结果是存储在HDFS里的,所以要使用sqoop工具来对HDFS中非关系型数据转发到MYSQL这样的关系型数据库中,...

    大数据的基础知识.pdf

    可以将⼀个关系型数据库(例如 : MySQL,Oracle 等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。 2 Flume Flume是Cloudera提供的⼀个⾼可⽤的,⾼可靠的,分布式的海量⽇志采集、聚合...

    大数据开源框架集锦.pdf

    sqoop 数据迁移⼯具,⽤来在不同数据存储软件之间进⾏数据传输的开源软件 DataX 阿⾥巴巴开源的离线数据同步⼯具,⽤于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间...

    基于hadoop实现的电影推荐网站+源代码+文档说明

    以此来收集用户评分信息(MySQL),每过一段时间就对该时段内的评分数据进行协同过滤算法的MapReduce计算,计算结果是存储在HDFS里的,所以要使用sqoop工具来对HDFS中非关系型数据转发到MYSQL这样的关系型数据库中,...

    大数据场景化解决方案.pdf

    使⽤Sqoop可以交互关系型数据库,进⾏导⼊导出数据。 使⽤爬⾍技术,可在⽹上爬取海量⽹页数据。 数据存储与管理: ⼤数据利⽤分布式⽂件系统HDFS、HBase、Hive,实现对结构化、半结构化和⾮结构化数据的存储和管理...

Global site tag (gtag.js) - Google Analytics