本博客属原创文章,转载请务必注明出处:http://guoyunsky.iteye.com/blogs/1213966/
欢迎加入Hadoop超级群: 180941958
Sqoop中一大亮点就是可以通过hadoop的mapreduce从关系型数据库中导入数据到HDFS,如此可以加快导入时间.一直想了解MapReduce,所以也仔细的阅读了下相关代码,整理成这篇博客.
一.原理:
Sqoop在import时,需要制定split-by参数.Sqoop根据不同的split-by参数值来进行切分,然后将切分出来的区域分配到不同map中.每个map中再处理数据库中获取的一行一行的值,写入到HDFS中.同时split-by根据不同的参数类型有不同的切分方法,如比较简单的int型,Sqoop会取最大和最小split-by字段值,然后根据传入的num-mappers来确定划分几个区域。比如select max(split_by),min(split-by) from得到的max(split-by)和min(split-by)分别为1000和1,而num-mappers为2的话,则会分成两个区域(1,500)和(501-100),同时也会分成2个sql给2个map去进行导入操作,分别为select XXX from table where split-by>=1 and split-by<500和select XXX from table where split-by>=501 and split-by<=1000.最后每个map各自获取各自SQL中的数据进行导入工作。
二.mapreduce job所需要的各种参数在Sqoop中的实现
1)InputFormatClass
com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat
2)OutputFormatClass
1)TextFile
com.cloudera.sqoop.mapreduce.RawKeyTextOutputFormat
2)SequenceFile
org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat
3)AvroDataFile
com.cloudera.sqoop.mapreduce.AvroOutputFormat
3)Mapper
1)TextFile
com.cloudera.sqoop.mapreduce.TextImportMapper
2)SequenceFile
com.cloudera.sqoop.mapreduce.SequenceFileImportMapper
3)AvroDataFile
com.cloudera.sqoop.mapreduce.AvroImportMapper
4)taskNumbers
1)mapred.map.tasks (对应num-mappers参数)
2)job.setNumReduceTasks(0);
这里以我命令行:import --connect jdbc:mysql://localhost/sqoop_datas --username root --password 123456 --query "select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2 WHERE $CONDITIONS" --target-dir /tmp/sqoop/foo2 -split-by sqoop_1.id --hadoop-home=/home/guoyun/Downloads/hadoop-0.20.2-CDH3B4 --num-mappers 2
注:红色部分参数,后接根据我的命令衍生的参数值
1)设置Input
DataDrivenImportJob.configureInputFormat(Job job, String tableName,String tableClassName, String splitByCol)
a)DBConfiguration.configureDB(Configuration conf, String driverClass,
String dbUrl, String userName, String passwd, Integer fetchSize)
1).mapreduce.jdbc.driver.class com.mysql.jdbc.Driver
2).mapreduce.jdbc.url jdbc:mysql://localhost/sqoop_datas
3).mapreduce.jdbc.username root
4).mapreduce.jdbc.password 123456
5).mapreduce.jdbc.fetchsize -2147483648
b)DataDrivenDBInputFormat.setInput(Job job,Class<? extends DBWritable> inputClass,
String inputQuery, String inputBoundingQuery)
1)job.setInputFormatClass(DBInputFormat.class);
2)mapred.jdbc.input.bounding.query SELECT MIN(sqoop_1.id), MAX(sqoop_2.id) FROM (select sqoop_1.id as foo_id, sqoop_2.id as bar_id from sqoop_1 ,sqoop_2 WHERE (1 = 1) ) AS t1
3)job.setInputFormatClass(DataDrivenDBInputFormat.class);
4)mapreduce.jdbc.input.orderby sqoop_1.id
c)mapreduce.jdbc.input.class QueryResult
d)sqoop.inline.lob.length.max 16777216
3)job.setInputFormatClass(inputFormatClass); class com.cloudera.sqoop.mapreduce.db.DataDrivenDBInputFormat
2)设置Output
ImportJobBase.configureOutputFormat(Job job, String tableName,String tableClassName)
a)job.setOutputFormatClass(getOutputFormatClass());
b)FileOutputFormat.setOutputCompressorClass(job, codecClass);
c)SequenceFileOutputFormat.setOutputCompressionType(job,CompressionType.BLOCK);
d)FileOutputFormat.setOutputPath(job, outputPath);
3)设置Map
DataDrivenImportJob.configureMapper(Job job, String tableName,String tableClassName)
a) job.setOutputKeyClass(Text.class);
b)job.setOutputValueClass(NullWritable.class);
c)job.setMapperClass(com.cloudera.sqoop.mapreduce.TextImportMapper);
4)设置task number
JobBase.configureNumTasks(Job job)
mapred.map.tasks 4
job.setNumReduceTasks(0);
三。大概流程
1.读取要导入数据的表结构,生成运行类,默认是QueryResult,打成jar包,然后提交给Hadoop
2.设置好job,主要也就是设置好以上二中的各个参数
3.这里就由Hadoop来执行MapReduce来执行Import命令了,
1)首先要对数据进行切分,也就是DataSplit
DataDrivenDBInputFormat.getSplits(JobContext job)
2)切分好范围后,写入范围,以便读取
DataDrivenDBInputFormat.write(DataOutput output)
这里是lowerBoundQuery and upperBoundQuery
3)读取以上2)写入的范围
DataDrivenDBInputFormat.readFields(DataInput input)
4)然后创建RecordReader从数据库中读取数据
DataDrivenDBInputFormat.createRecordReader(InputSplit split,TaskAttemptContext context)
5)创建Map
TextImportMapper.setup(Context context)
6)RecordReader一行一行从关系型数据库中读取数据,设置好Map的Key和Value,交给Map
DBRecordReader.nextKeyValue()
7)运行map
TextImportMapper.map(LongWritable key, SqoopRecord val, Context context)
最后生成的Key是行数据,由QueryResult生成,Value是NullWritable.get()
四.总结
通过这些,我大概了解了MapReduce运行流程.但对于Sqoop这种切分方式感觉还是有很大的问题.比如这里根据ID范围来切分,如此切分出来的数据会很不平均,比如min(split-id)=1,max(split-id)=3000,交给三个map来处理。那么范围是(1-1000),(1001-2000),(2001-3000).而假如1001-2000是没有数据,已经被删除了。那么这个map就什么都不能做。而其他map却累的半死。如此就会拖累job的运行结果。我这里说的范围很小,比如有几十亿条数据交给几百个map去做。map一多,如果任务不均衡就会影响进度。看有没有更好的切分方式?比如取样?如此看来,写好map reduce也不简单!
更多技术文章、感悟、分享、勾搭,请用微信扫描:
相关推荐
Sqoop(发音:skup)是一款开源的工具,主要用于在Hadoop(Hive)与传统的数据库(mysql、postgresql...)间进行数据的传递,可以将一个关系型数据库(例如 : MySQL ,Oracle ,Postgres等)中的数据导进到Hadoop的HDFS中,...
Sqoop可以将一个关系型数据库(例如MySQL、Oracle、PostgreSQL等)中的数据导入Hadoop的HDFS、Hive中,也可以将HDFS、Hive中的数据导入关系型数据库中。Sqoop充分利用了Hadoop的优点,整个数据导入导出过程都是用...
多数使用Hadoop技术处理大数据业务的企业,有大量的数据存储在关系型数据中。由于没有工具支持,Hadoop和关系型数据库之间的数据传输是很困难的事情。传统的应用程序管理系统,即应用程序与使用RDBMS的关系数据库的...
Sqoop是一个用来将Hadoop和关系型数据库中的数据相互转移的工具,可以使用Sqoop将数据从MySQL或Oracle等关系数据库管理系统(RDBMS)导入Hadoop分布式文件系统(HDFS),在Hadoop MapReduce中转换数据,然后将数据...
Sqoop中一大亮点就是可以通过hadoop的mapreduce把数据从关系型数据库中导入数据到HDFS。 一、安装sqoop 1、下载sqoop压缩包,并解压 压缩包分别是:sqoop-1.2.0-CDH3B4.tar.gz,hadoop-0.20.2-CDH3B4.tar.gz, ...
此外,Hadoop广义上指的是一个更广泛的概念,即Hadoop生态系统,其中还包括了Hive数据仓库工具、HBase非关系型数据库、Zookeeper分布式协调服务、Kafka消息队列、Sqoop数据导入导出等其他组件。 Hadoop的创始人是...
使用Sqoop从关系型数据库载入数据到HDFS;使用Pig查询语言进行大规模数据处理;使用Hadoop的数据仓库系统Hive分析数据集;利用HBase处理结构化和半结构化数据,以及利用ZooKeeper构建分布式系统。
sqoop的使用 ----sqoop是一个用来在hadoop体系和关系型数据库之间进行数据互导的工具 ----实质就是将导入导出命令转换成mapreduce程序来实现
使用Sqoop从关系型数据库载入数据到HDFS;使用Pig查询语言进行大规模数据处理;使用Hadoop的数据仓库系统Hive分析数据集;利用HBase处理结构化和半结构化数据,以及利用ZooKeeper构建分布式系统……
关系型数据库管理系统 网格计算 志愿计算 1.3.4 Hadoop 发展简史 Apache Hadoop和Hadoop生态圈 第2章 关于MapReduce 一个气象数据集 数据的格式 使用Unix工具进行数据分析 使用Hadoop分析数据 ...
关系型数据库管理系统 网格计算 志愿计算 1.3.4 Hadoop 发展简史 Apache Hadoop和Hadoop生态圈 第2章 关于MapReduce 一个气象数据集 数据的格式 使用Unix工具进行数据分析 使用Hadoop分析数据 map阶段和reduce阶段 ...
word源码java hadoop生态圈 spark生态圈 大数据概述 初识hadoop 分布式文件系统HDFS 分布式资源调度YARN ...sqoop(关系型数据库<->hdfs) 数据存储 hadoop(hdfs) 数据处理/分析/挖掘 hadoop、spark、flink、hi
sqoop详细教程。Apache Sqoop是用来实现结构型数据(如关系数据库)和Hadoop之间进行数据迁移的工具。它充分利用了MapReduce的并行特点以批处理的方式加快数据的传输,同时也借助MapReduce实现了容错。
它可以 将⼀个关系数据库中数据导⼊Hadoop的HDFS中,也可以将HDFS中的数据导⼊关系型数据库中。 Flume:实时数据采集的⼀个开源框架,它是Cloudera提供的⼀个⾼可⽤⽤的、⾼可靠、分布式的海量⽇志采集、聚合和传输...
以此来收集用户评分信息(MySQL),每过一段时间就对该时段内的评分数据进行协同过滤算法的MapReduce计算,计算结果是存储在HDFS里的,所以要使用sqoop工具来对HDFS中非关系型数据转发到MYSQL这样的关系型数据库中,...
可以将⼀个关系型数据库(例如 : MySQL,Oracle 等)中的数据导进到Hadoop的HDFS中,也可以将HDFS的数据导进到关系型数据库中。 2 Flume Flume是Cloudera提供的⼀个⾼可⽤的,⾼可靠的,分布式的海量⽇志采集、聚合...
sqoop 数据迁移⼯具,⽤来在不同数据存储软件之间进⾏数据传输的开源软件 DataX 阿⾥巴巴开源的离线数据同步⼯具,⽤于实现包括关系型数据库(MySQL、Oracle等)、HDFS、Hive、ODPS、HBase、FTP等各种异构数据源之间...
以此来收集用户评分信息(MySQL),每过一段时间就对该时段内的评分数据进行协同过滤算法的MapReduce计算,计算结果是存储在HDFS里的,所以要使用sqoop工具来对HDFS中非关系型数据转发到MYSQL这样的关系型数据库中,...
使⽤Sqoop可以交互关系型数据库,进⾏导⼊导出数据。 使⽤爬⾍技术,可在⽹上爬取海量⽹页数据。 数据存储与管理: ⼤数据利⽤分布式⽂件系统HDFS、HBase、Hive,实现对结构化、半结构化和⾮结构化数据的存储和管理...