浅谈Hadoop

Hadoop是一个十分流行的分布式存储和计算框架,也是业内大数据处理和分析最通用的框架之一。


hadoop_icon.png

Hadoop2.0 由HDFS(Hadoop Distributed File System)、MapReduce和Yarn三部分组成。
Hadoop的设计原型来源于google的三篇论文,即GFS、MapReduce和BigTable,同时作为Lucene的子项目Nutch的一部分在2005年引入Apache,Hadoop的得名是Hadoop之父Doug Cutting儿子的玩具,值得一提的是他的妻子叫Lucene。
Hadoop生态

hbase生态.png

"永远不把鸡蛋放在一个篮子里"——HDFS

在分布式的文件系统之前,往往使用大型机和存储服务器去做存储,无论大型机和存储服务器都是十分昂贵的,同时也是有瓶颈的,横向扩展能力很差。而分布式文件系统的横向扩展能力以及容错性十分好,也越来越受到人们的青睐。HDFS的定位是用比较廉价的机器,做高可用的海量数据的存储。主要采用多副本的分块存储机制,在部分机器宕机或数据损坏的情况下,依然能提供可靠服务。

  1. 集群拓扑
hdfs.jpg
  • NameNode:文件元数据存储节点,Hadoop1中存在单点问题,Hadoop2中通过备用节点实现高可用,同时有元数据存储瓶颈,不适于存储小文件。

  • DataNode:数据节点,单文件被分成多块,每一块多副本跨机架、机房存储,保证数据的高可用。

  • Block:文件的存储单位,单个文件可被分成多块,默认为128M。同时也是MapReduce默认的输入块大小。
    文件读取流程:

hdfsread.png

open DistributedFileSystem 去NameNode获取文件的块列表,NameNode根据Client距离各节点的网络距离给出Block列表。
根据Block列表去一次读取文件,读取后在Client进行文件汇总。

  1. 写入流程:
hdfs_write.png
  • 实例化DistributeFileSystem,在NameNode进行写文件的申请,申请成功后创建元数据,并返回数据存放的位置信息。

  • 过位置信息,对DataNode进行流式写入,将数据分成多个数据包,作为一个数据队列。写入时每次取一个数据包,写入全部副本,且三个节点均写入成功,则返回ACK信号表名当前buff写入成功,Client内部维护着数据包的Ack队列,收到Ack之后会移除这个数据包。

  • 最后想NameNode发送Complete信号,确认文件写入全部成功。
    如果中途节点写失败:写入部分数据的节点将在管线中移除,同时后续恢复正常后会删除这部分数据。随后写入后续两个节点,原则上写入一个块的时候就可以写成功,因为namenode发现数据不一致会做复制操作。

"分而治之"——MapReduce###

</br>
分布式计算出现之前,数据的计算往往依靠性能比较好的单机计算。但是单机受限于本身的计算资源,往往计算速度都不如人意。
一天小明接到产品的一个需求:
产品:小明啊,这里有一天的日志信息,大概5个G,我要统计一下一共有多少。
小明:OK啊,就5个G,一个shell搞定,看我 cat * | wc -l,我简直就是个天才。
产品:对不起啊小明,需求变了,一天的看不出来效果,我需要统计1个月的数据,大概有150G。
小明:有点大啊,不怕,我线上服务器内存120G,40核,看我用多线程搞定,过了2个小时,终于搞 定了还有点费劲。
产品:我保证这是我最后一次变更需求,我想要最近一年的数据1800G左右。
小明:数据上T了,搞不定了啊。
上面的例子告诉我们,在大数量的场景下,高性能的单机有时也是解决不了问题。所以我们就需要MapReduce帮助我们。
</br>
MapReduce是一种采用分治和规约的一种并行的批处理框架,先将数据做分割计算,最后汇总结果。看上去和多线程的处理机制一样,但是Hadoop将它封装在了框架中,编程十分简单,吞吐量十分高,目前支持Java、C++、Python等多种API编程。
1.MapReduce运行模型总体概览:

mapreduceAllGraph.png
  • InputSplit: InputSplit是单个map任务的输入文件片,默认文件的一个block。

  • Map函数:数据处理逻辑的主体,用开发者开发。

  • Partition:map的结果发送到相应的reduce。

  • Combain:reduce之前进行一次预合并,减小网络IO。当然,部分场景不适合。

  • Shuffle:map输出数据按照Partition分发到各个reduce。

*reduce:将不同map汇总来的数据做reduce逻辑。

2.多reduce:

datatrans.png

3.经典wordcount:

wordcountdatatrans.png
mapreducedataStream.png

4.Map类的实现:

  • 必须继承org.apache.hadoop.mapreduce.Mapper 类

  • map()函数,对于每一个输入K/V都会调用一次map函数,逻辑实现(必须)。

  • setup()函数,在task开始前调用一次,做maptask的一些初始化工作,如连接数据库、加载配置(可选)。

  • cleanup()函数,在task结束前调用一次,做maptask的收尾清理工作,如批处理的收尾,关闭连接等(可选)

  • Context上下文环境对象,包含task相关的配置、属性和状态等。

5.Reduce类的实现:

  • 必须继承org.apache.hadoop.mapreduce.Reducer类。

  • reduce(key, Iterable<>values,Context context)对于每一个key值调用一次reduce函数。

  • setup():在task开始前调用一次,做reducetask的一些初始化工作。

  • cleanup():在task结束时调用一次,做reducetask的收尾清理工作。

6.作业整体配置:

  • 参数解析:String[]otherArgs= new GenericOptionsParser(conf, args).getRemainingArgs();

  • 创建job: Jobjob= Job.getInstance(conf, "word count");

  • 设置map类,reduce类。

  • 设置map和reduce输出的KV类型,二者输出类型一致的话则可以只设置Reduce的输出类型。

  • 设置reduce的个数 :默认为1,综合考虑,建议单个reduce处理数据量<10G。不想启用reduce设置为0即可。

  • 设置InputFormat

  • 设置OutputFormat

  • 设置输入,输出路径。

  • job.waitForCompletion(true) (同步提交)和job.submit()(异步提交)

wordcount:
public class WordCountTask { private static final Logger logger = Logger.getLogger(WordCountTask.class); public static class WordCountMap extends Mapper<Object, Text, Text, IntWritable>{ private static final IntWritable one = new IntWritable(1); private Text word = new Text(); @Override protected void cleanup(Context context) throws IOException, InterruptedException { logger.info("mapTaskEnd....."); } protected void map(Object key, Text value, Context context) throws IOException, InterruptedException { StringTokenizer itr = new StringTokenizer(value.toString()); while (itr.hasMoreTokens()) { this.word.set(itr.nextToken()); context.write(this.word, one); } } protected void setup(Context context) throws IOException, InterruptedException { logger.info("mapTaskStart....."); } } public static class WordCountReduce extends Reducer<Text, IntWritable, Text, IntWritable>{ private IntWritable result = new IntWritable(); public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException { int sum = 0; for (IntWritable val : values) { sum += val.get(); } this.result.set(sum); context.write(key, this.result); } } public static void main(String[] args) throws Exception { Configuration conf = new Configuration(); String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs(); if (otherArgs.length < 2) { System.err.println("Usage: wordcount <in> [<in>...] <out>"); System.exit(2); } Job job = Job.getInstance(conf, "word count"); job.setJarByClass(WordCountTask.class); job.setMapperClass(WordCountMap.class); job.setReducerClass(WordCountReduce.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(IntWritable.class); FileSystem fs = FileSystem.get(conf); for (int i = 0; i < otherArgs.length - 1; i++) { FileInputFormat.addInputPath(job, new Path(otherArgs[i])); } if(fs.exists(new Path(otherArgs[otherArgs.length - 1]))){ fs.delete(new Path(otherArgs[otherArgs.length - 1])); } FileOutputFormat.setOutputPath(job, new Path(otherArgs[(otherArgs.length - 1)])); job.setNumReduceTasks(1); System.exit(job.waitForCompletion(true) ? 0 : 1); } }

6.提交:
​hadoop jar hadoop-examples.jar demo.wordcount(主类名) Dmapreduce.job.queuename=XX(系统参数) input output
​缺点:无定时调度

  1. 常用的InputFormat:
  • TextInputFormat key:行偏移 value:文本内容,split计算:splitSize=max("mapred.min.split.size",min("mapred.max.split.size",blockSize)) mapred.min.split.size 在大量文本输入的情况下,需要控制map的数量,可以调此选项。

  • CombineTextInputFormat(集群默认),多个小文件分片送到一个map中处理,主要解决多个小文件消耗map资源的问题。

  • sequenceFileInputFormat,采用自己的序列化方式,通常文件名为key,value为文件内容,可在存储上解决小文件对namenode的影响。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,242评论 5 459
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,769评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,484评论 0 319
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,133评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,007评论 4 355
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,080评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,496评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,190评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,464评论 1 290
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,549评论 2 309
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,330评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,205评论 3 312
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,567评论 3 298
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,889评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,160评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,475评论 2 341
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,650评论 2 335

推荐阅读更多精彩内容

  • 首先,我们在使用前先看看HDFS是什麽?这将有助于我们是以后的运维使用和故障排除思路的获得。 HDFS采用mast...
    W_Bousquet阅读 4,127评论 0 2
  • 摘自:http://staticor.io/post/hadoop/2016-01-23hadoop-defini...
    wangliang938阅读 578评论 0 1
  • 先思考问题 我们处在一个大数据的时代已经是不争的事实,这主要表现在数据源多且大,如互联网数据,人们也认识到数据里往...
    墙角儿的花阅读 7,326评论 0 9
  • Hadoop部署方式 本地模式 伪分布模式(在一台机器中模拟,让所有进程在一台机器上运行) 集群模式 服务器只是一...
    陈半仙儿阅读 1,593评论 0 9
  • 目的这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面。先决条件请先确认Had...
    SeanC52111阅读 1,689评论 0 1