一、Hadoop简介
1.1 Hadoop是什么
The Apache Hadoop project develops open-source software for reliable, scalable, distributed computing.
1.2 Hadoop官网
http://hadoop.apache.org/
1.3 Hadoop特点
(1)扩容能力(Scalable):能可靠地(reliably)存储和处理千兆字节(PB)数据。
(2)成本低(Economical):可以通过普通机器组成的服务器群来分发以及处理数据。这些服务器群总计可达数千个节点。
(3)高效率(Efficient):通过分发数据,hadoop可以在数据所在的节点上并行地(parallel)处理它们,这使得处理非常的快速。
(4)可靠性(Reliable):hadoop能自动地维护数据的多份副本,并且在任务失败后能自动地重新部署(redeploy)计算任务。
1.4 Hadoop主要解决的问题
海量数据的存储(HDFS)
海量数据的分析(MapReduce)
资源管理调度(YARN)
1.5 Hadoop生态圈
1.6 Hadoop1.0和Hadoop2.0的对比
1.7 Hadoop系统结构图
1.8 Hadoop适用场景
1,大数据量存储:分布式存储
2,日志处理:Hadoop擅长这个
3,海量计算:并行计算
4,ETL:数据抽取到oracle、mysql、DB2、mongdb及主流数据库
5,使用HBase做数据分析:用扩展性应对大量的写操作—Facebook构建了基于HBase的实时数据分析系统
6,机器学习:比如Apache Mahout项目
7,搜索引擎:hadoop + lucene实现
8,数据挖掘:目前比较流行的广告推荐,9,用户细分特征建模
10,个性化广告推荐,
11,智能仪器推荐
二、Hadoop生态圈组件
2.1 HDFS(分布式文件系统)
HDFS(hadoop分布式文件系统)是hadoop体系中数据存储管理的基础。他是一个高度容 错的系统,能检测和应对硬件故障。
client:切分文件,访问HDFS,与那么弄得交互,获取文件位置信息,与DataNode交互, 读取和写入数据。
namenode:master节点,在hadoop1.x中只有一个,管理HDFS的名称空间和数据块映射信 息,配置副本策略,处理客户端请求。
DataNode:slave节点,存储实际的数据,汇报存储信息给namenode。
secondary namenode:辅助namenode,分担其工作量:定期合并fsimage和fsedits,推送给 namenode;紧急情况下和辅助恢复namenode,但其并非namenode的热备。
2.2 mapreduce(分布式计算框架)
mapreduce是一种计算模型,用于处理大数据量的计算。其中map对应数据集上的独立元 素进行指定的操作,生成键-值对形式中间,reduce则对中间结果中相同的键的所 有值进行规约,以得到最终结果。
jobtracker:master节点,只有一个,管理所有作业,任务/作业的监控,错误处理等,将任 务分解成一系列任务,并分派给tasktracker。
tacktracker:slave节点,运行 map task和reducetask;并与jobtracker交互,汇报任务状态。
map task:解析每条数据记录,传递给用户编写的map()并执行,将输出结果写入到本地 磁盘(如果为map—only作业,则直接写入HDFS)。
reduce task:从map 它深刻地执行结果中,远程读取输入数据,对数据进行排序,将数据 分组传递给用户编写的reduce函数执行。
2.3 hive(基于hadoop的数据仓库)
由Facebook开源,最初用于解决海量结构化的日志数据统计问题。
hive定于了一种类似sql的查询语言(hql)将sql转化为mapreduce任务在hadoop上执行。
2.4 hbase(分布式列存数据库)
hbase是一个针对结构化数据的可伸缩,高可靠,高性能,分布式和面向列的动态模式数据库。和传统关系型数据库不同,hbase采用了bigtable的数据模型:增强了稀疏排序映射表(key/value)。其中,键由行关键字,列关键字和时间戳构成,hbase提供了对大规模数据的随机,实时读写访问,同时,hbase中保存的数据可以使用mapreduce来处理,它将数据存储和并行计算完美结合在一起。
2.5 zookeeper(分布式协作服务)
解决分布式环境下的数据管理问题:统一命名,状态同步,集群管理,配置同步等。
2.6 sqoop(数据同步工具)
sqoop是sql-to-hadoop的缩写,主要用于传统数据库和hadoop之间传输数据。数据的导入和导出本质上是mapreduce程序,充分利用了MR的并行化和容错性。
2.7 pig(基于hadoop的数据流系统)
定义了一种数据流语言-pig latin,将脚本转换为mapreduce任务在hadoop上执行。通常用于离线分析。
2.8 mahout(数据挖掘算法库)
mahout的主要目标是创建一些可扩展的机器学习领域经典算法的实现,旨在帮助开发人员更加方便快捷地创建只能应用程序。mahout现在已经包含了聚类,分类,推荐引擎(协同过滤)和频繁集挖掘等广泛使用的数据挖掘方法。除了算法是,mahout还包含了数据的输入/输出工具,与其他存储系统(如数据库,mongoDB或Cassandra)集成等数据挖掘支持架构。
2.9 Oozie(工作流调度器)
Oozie是一个可扩展的工作体系,集成于Hadoop的堆栈,用于协调多个MapReduce作业的执行。它能够管理一个复杂的系统,基于外部事件来执行,外部事件包括数据的定时和数据的出现。
Oozie工作流是放置在控制依赖DAG(有向无环图 Direct Acyclic Graph)中的一组动作(例如,Hadoop的Map/Reduce作业、Pig作业等),其中指定了动作执行的顺序。
Oozie使用hPDL(一种XML流程定义语言)来描述这个图。
2.10 flume(日志收集工具)
cloudera开源的日志收集系统,具有分布式,高可靠,高容错,易于定制和扩展的特点。他将数据从产生,传输,处理并写入目标的路径的过程抽象为数据流,在具体的数据流中,数据源支持在flume中定制数据发送方,从而支持收集各种不同协议数据。
2.11 Yarn(分布式资源管理器)
YARN是下一代MapReduce,即MRv2,是在第一代MapReduce基础上演变而来的,主要是为了解决原始Hadoop扩展性较差,不支持多计算框架而提出的。
Yarn是下一代 Hadoop 计算平台,yarn是一个通用的运行时框架,用户可以编写自己的计算框架,在该运行环境中运行。
用于自己编写的框架作为客户端的一个lib,在运用提交作业时打包即可。该框架为提供了以下几个组件:
-资源管理:包括应用程序管理和机器资源管理
-资源双层调度
-容错性:各个组件均有考虑容错性
-扩展性:可扩展到上万个节点
2.12 Mesos(分布式资源管理器)
Mesos诞生于UC Berkeley的一个研究项目,现已成为Apache项目,当前有一些公司使用Mesos管理集群资源,比如Twitter。
与yarn类似,Mesos是一个资源统一管理和调度的平台,同样支持比如MR、steaming等多种运算框架。
2.13 cloudrea impala(查询引擎)
一个开源的查询引擎。与hive相同的元数据,SQL语法,ODBC驱动程序和用户接口,可以直接在HDFS上提供快速,交互式SQL查询。impala不再使用缓慢的hive+mapreduce批处理,而是通过与商用并行关系数据库中类似的分布式查询引擎。可以直接从HDFS或者Hbase中用select,join和统计函数查询数据,从而大大降低延迟。
2.14 Tachyon(分布式内存文件系统)
Tachyon(/’tki:n/ 意为超光速粒子)是以内存为中心的分布式文件系统,拥有高性能和容错能力,
能够为集群框架(如Spark、MapReduce)提供可靠的内存级速度的文件共享服务。
Tachyon诞生于UC Berkeley的AMPLab。
2.15 Tez(DAG计算模型)
Tez是Apache最新开源的支持DAG作业的计算框架,它直接源于MapReduce框架,核心思想是将Map和Reduce两个操作进一步拆分,
即Map被拆分成Input、Processor、Sort、Merge和Output, Reduce被拆分成Input、Shuffle、Sort、Merge、Processor和Output等,
这样,这些分解后的元操作可以任意灵活组合,产生新的操作,这些操作经过一些控制程序组装后,可形成一个大的DAG作业。
目前hive支持mr、tez计算模型,tez能完美二进制mr程序,提升运算性能。
2.16 Spark
spark是个开源的数据分析集群计算框架,最初由加州大学伯克利分校AMPLab,建立于HDFS之上。spark与hadoop一样,用于构建大规模,延迟低的数据分析应用。spark采用Scala语言实现,使用Scala作为应用框架。spark采用基于内存的分布式数据集,优化了迭代式的工作负载以及交互式查询。与hadoop不同的是,spark与Scala紧密集成,Scala象管理本地collective对象那样管理分布式数据集。spark支持分布式数据集上的迭代式任务,实际上可以在hadoop文件系统上与hadoop一起运行(通过YARN,MESOS等实现)。
2.17 Storm
storm是一个分布式的,容错的计算系统,storm属于流处理平台,多用于实时计算并更新数据库。storm也可被用于“连续计算”,对数据流做连续查询,在计算时将结果一流的形式输出给用户。他还可被用于“分布式RPC”,以并行的方式运行昂贵的运算
2.18 Kafka
kafka是由Apache软件基金会开发的一个开源流处理平台,由Scala和Java编写。Kafka是一种高吞吐量的分布式发布订阅消息系统,它可以处理消费者规模的网站中的所有动作流数据。 这种动作(网页浏览,搜索和其他用户的行动)是在现代网络上的许多社会功能的一个关键因素。 这些数据通常是由于吞吐量的要求而通过处理日志和日志聚合来解决。 对于像Hadoop的一样的日志数据和离线分析系统,但又要求实时处理的限制,这是一个可行的解决方案。Kafka的目的是通过Hadoop的并行加载机制来统一线上和离线的消息处理,也是为了通过集群来提供实时的消息
三、Hadoop核心之MapReaduce
3.1 MapReaduce是什么
MapReduce是一个分布式计算框架,它是Hadoop的一个程序,不会产生进程。
3.2 MapReduce的思想
分而治之、大数据集分为小的数据集、每个数据集进行逻辑业务处理、合并统计数据结果
3.2 MapReaduce之自定义序列化类
代码及注释参见
https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/MyComparator.java
有时候,默认的数据类型不能满足我们的需求时,需要我们自定义序列化类,实现WritableComparable。在自定义的序列化类中,最重要的是重写compareTo方法以及序列化反序列化方法。序列化和反序列化的内容需要重点关注,容易犯低级错误。
★二次排序:compareTo方法也可以实现二次排序的功能,但会产生大量的序列化反序列化实例,浪费资源;比较优化的方法是在自定义序列化类中的一个静态内部类--Comparator,继承WritableComparator,在这个类中的compare方法中写排序的逻辑。需要对这个内部类进行注册。
3.3 MapReaduce之Mapper
Mapper对来的每一条数据进行一次计算(这里的计算指的时代码逻辑,这句话的意思就是每来一条数据走一次map方法)
代码及注释
https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/wordcount/WordCountMapper.java
自定义Mapper,需要继承Mapper,并指定泛型<key-in,value-in,key-out,value-out>,然后重写map方法。泛型中的key-in,value-in是读取文件时读的内容,默认k按偏移量操作,v读一行内容,可以通过自定义输入格式来改变k-in和v-in;key-out和value-out是写入环形缓存区的内容,如果有Reducer的话这里的k-out和v-out最终是Reducer的key-in和value-in。
★shuffle过程--shuffle:洗牌、发牌——(核心机制:数据分区,排序,缓存)
mapreduce中,map阶段处理的数据如何传递给reduce阶段,是mapreduce框架中最关键的一个流程,这个流程就叫shuffle。简单来说,就是将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序。
◆理解shuffle过程
MapReduce的核心与基础是Mapper类、Reducer类与Driver。Driver中主要是main()方法,MR的程序入口;Driver中还要规定job的各种配置。自己的Mapper需要继承Mapper类,重写其中的map()方法,自己的Reducer需要继承Reducer类,重写其中的reduce()方法。map的运行机制是来一条数据运行一次map方法,reduce的运行机制是来一个key运行一次reduce方法。数据从map中出来到进入reduce之前称为shuffle阶段,Mapper的数量不建议人为设定,一般一个block对应一个Mapper,而Reducer的数量可以在Driver中人为控制,不设定默认是1。
MR过程中的数据流向:一个文件在HDFS中是分布存储在不同节点的block中,每一个block对应一个Mapper,每一条数据以K,V的形式进入一个map()方法,map()方法中对数据进行处理(数据筛选,业务逻辑以及分区),再将处理结果以K,V的形式写入环形缓冲区,一个Mapper对应一个context,context对写入的数据按key进行聚合、排序、归约。context的大小默认为100M,当context容量达到80%或Mapper处理结束时,context会向外溢出,形成许多小文件,小文件为一个K和许多V的集合。处理完成后,这些文件会发送到Reducer所在节点,在该节点的context中,会对不同节点发送过来的数据按key进行再一次的聚合、排序和归约,最后进入Reducer,在reduce方法中对同一个<key,value集合>进行处理(业务逻辑),然后按照分区写入文件。
shuffle的处理任务:将maptask输出的处理结果数据,分发给reducetask,并在分发的过程中,对数据按key进行了分区和排序;
1、maptask收集我们的map()方法输出的k、v对,放到内存缓冲区中
2、从内存缓冲区不断溢出本地磁盘文件,可能会溢出多个文件
3、多个溢出文件会被合并成大的溢出文件
4、在溢出过程中,及合并的过程中,都要调用partitoner进行分组和针对key进行排序
5、reducetask根据自己的分区号,去各个maptask机器上取相应的结果分区数据
6、reducetask会取到同一个分区的来自不同maptask的结果文件,reducetask会将这些文件再进行合并(归并排序)
7、合并成大文件后,shuffle的过程也就结束了,后面进入reducetask的逻辑运算过程(从文件中取出一个一个的键值对group,调用用户自定义的reduce()方法)
备注:Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快。缓冲区的大小可以通过参数调整,参数:io.sort.mb默认100M
3.4 MapReaduce之Reducer
Reducer对相同的Key进行一次计算
代码及注释
https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/wordcount/WordCountReducer.java
自定义Reducer,需要继承Reducer,并指定泛型<key-in,value-in,key-out,value-out>,然后重写reduce方法。泛型中的k-in和v-in必须和Mapper的k-out和v-out的数据类型一致;可以通过自定义输出格式来改变k-out和v-out。
3.5 MapReaduce之Driver
main方法,程序的入口
代码及注释
https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/wordcount/WordCountDriver.java
在main方法中需要获取一个配置实例,得到一个job实例,用这个job指定主类、Mapper类、Reducer类、Combiner类(如果有)、Partitioner类(如果有),指定Mapper和Reducer的输出格式类(如果Mapper和Reducer的输出类型相同,可以只设置outputKey和outputValue;如果不同则需要设置outputKey和outputValue、MapoutputKey和MapoutputValue),通过默认输入格式或自定义输入格式指定输入文件路径,指定输出目录,判断job结束并关闭程序。
3.6 MapReaduce之Partitioner(分区类)
代码及注释参见
https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/FlowSumPartitioner.java
自定义分区类需要继承Partitioner,指定泛型<k,v>,重写getPartition方法,它可以实现将不同的Key写入不同的文件。如果自定义了分区类,那么需要在Driver中指定分区类并且设置ReducTask数量(通过setNumReduceTasks方法)。
注意:每来一条数据走一次getPartition方法;有几个ReduceTask就会生成几个文件;1个task任务不要处理大于10G的内容;Partitioner的泛型要和Mapper的k-out、v-out一致。
3.7 MapReaduce之Combiner
本地的reducer,只能起到过渡和优化的作用,它能做一些像归约类的对输出结果不造成影响的任务,比如求和
代码及注释
https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/flowsum/FlowSumCombiner.java
Combiner是一个本地reducer,所以它仍然继承Reducer,指定泛型<key-in,value-in,key-out,value-out>,它的key-in,value-in,key-out,value-out必须要和mapper的k-out,v-out一致,也和reducer的k-in,v-in一样(因为它只做简单优化,不能影响输出结果)。因为combiner只对reducer进行优化,所以它的逻辑可以跟reducer完全相同,也可以不一样,但是不能影响输出结果。而且,在数据量小的时候使用combiner与否几乎没什么差别。至于使用combiner的原因,是Reducer是在运行在网络环境上的,当数据量太大时,网络I/O速度慢,会导致效率低下。用本地的Reducer过渡,预处理可以提高效率。
3.8 MapReaduce之自定义输入格式(InputFormat)
代码及注释参见
https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/MyFileInputFormat.java
自定义输入格式,需要继承FileInputFormat。里面主要是重写createRecordReader方法,返回一个自定义的RecordReader。实际上还有一个重写方法是isSplitable,但是我们一般不作重写;因为Hadoop不适合管理小文件,所以我们需要这个方法的返回值一直时true,所以我们不需要重写这个方法。
自定义RecordReader,它是主要的处理输入数据格式的类,最终是写初始化方法initialize和nextKeyValue
在自定义RecordReader中,需要重写的方法有:
①initialize()---初始化方法,完成自定义字段的初始化(以实现一次读取两行为例解释)
思路:因为LineReader可以完成一行一行读的目的,所以初始化时,要做的事情就是得到一个LineReader实例;通过查看API发现,想实例化一个LineReader最低要求是得到一个输入流,所以首先需要得到一个输入流;输入流可以通过文件对象open(Path)方法获得,所以我们的目的变成了获取一个文件对象和一个Path;文件对象可以通过get(Configuration)获取,Path可以通过FileSplit的getPath()获得;Configuration可以通过context得到,FileSplit可以通过split得到---context是一直贯穿于整个过程中的,split时initialize的参数。所以,将上面的思路倒序实现,就完成了初始化过程,得到一个LineReader的实例。
②nextKeyValue()---在这个方法中写逻辑,对K和V进行赋值
要实现一次读两行,而在初始化时得到的LineReader可以一次读一行,所以只需要读两次,然后赋值给Value就可以实现。
③getCurrentKey()---获取当前key
④getCurrentValue()---获取当前value
⑤getProgress()---获取进度,一般returntrue0.0f:1.0f;
⑥close()---如果开了流必须要关闭,如果没开流则不需要
3.9 MapReaduce之自定义输出格式(OutputFormat)
代码及注释参见
https://github.com/ZzzzZzreal/HadoopKeyPoints/blob/master/HadoopKeyPoints/src/main/java/MyOutputFormat.java
自定义输出格式,需要继承FileOutputFormat,主要重写一个getRecordWriter方法,返回一个自定义的RecordWriter。自定义的RecordWriter中重写write方法和close。可以实现自定义输出文件名,也可以写逻辑改变输出内容。
四、Hadoop核心之HDFS
4.1 HDFS是什么
HDFS是一个高度容错性的系统,适合部署在廉价的机器上。HDFS采用master/slave架构。一个HDFS集群是由一个Namenode和一定数目的Datanodes组成。Namenode是一个中心服务器,负责管理文件系统的名字空间(namespace)以及客户端对文件的访问。集群中的Datanode一般是一个节点一个,负责管理它所在节点上的存储
4.2 HDFS之NameNode
NameNode管理着文件系统的命名空间,维护着文件系统树,它不存储真实数据,存储元数据(MetaData)[元数据(FileName、副本数、每一个副本所在的位置...)],NameNode保存在内存中。
元数据信息通过以下文件和过程持久化到磁盘中。
a、fsimage--对元数据定期进行镜像
b、edits--存放一定时间内对HDFS的操作记录
c、checkpoint---检查点
Namenode在内存中保存着整个文件系统的名字空间和文件数据块映射(Blockmap)的映像。这个关键的元数据结构设计得很紧凑,因而一个有4G内存的Namenode足够支撑大量的文件和目录。当Namenode启动时,它从硬盘中读取Editlog和FsImage,将所有Editlog中的事务作用在内存中的FsImage上,并将这个新版本的FsImage从内存中保存到本地磁盘上,然后删除旧的Editlog,因为这个旧的Editlog的事务都已经作用在FsImage上了。这个过程称为一个检查点(checkpoint)。在当前实现中,检查点只发生在Namenode启动时,在不久的将来将实现支持周期性的检查点。
4.3 HDFS之DataNode
DataNode---存储节点,真正存放数据的节点,用于保存数据,保存在磁盘上(在HDFS上保存的数据副本数默认是3个,这个副本数量是可以设置的)。基本单位是块(block),默认128M。
Block块的概念
先不看HDFS的Block,每台机器都有磁盘,机器上的所有持久化数据都是存储在磁盘上的。磁盘是通过块来管理数据的,一个块的数据是该磁盘一次能够读写的最小单位,一般是512个字节,而建立在磁盘之上的文件系统也有块的概念,通常是磁盘块的整数倍,例如几kb。
HDFS作为文件系统,一样有块的概念,对于分布式文件系统,使用文件块将会带来这些好处:
(1)一个文件的大小不限制于集群中任意机器的磁盘大小
(2)因为块的大小是固定的,相对比不确定大小的文件,块更容易进行管理和计算
(3)块同样方便进行备份操作,以提高数据容错性和系统的可靠性
为什么HDFS的块大小会比文件系统的块大那么多呢?
操作数据时,需要先从磁盘上找到指定的数据块然后进行传输,而这就包含两个动作:
1)数据块寻址:找到该数据块的起始位置
2)数据传输:读取数据
也就是说,操作数据所花费的时间是由以上两个步骤一起决定的,步骤1所花费的时间一般比步骤2要少很多,那么当操作的数据块越多,寻址所花费的时间在总时间中就越小的可以忽略不计。所以块设置的大,可以最小化磁盘的寻址开销。但是HDFS的Block块也不能设置的太大,会影响到map任务的启动数,并行度降低,任务的执行数据将会变慢。
★名词扩展:心跳机制、宕机、安全模式(zzy至理名言--“自己看网上都有”)
Datanode负责处理文件系统客户端的读写请求。在Namenode的统一调度下进行数据块的创建、删除和复制。集群中单一Namenode的结构大大简化了系统的架构。Namenode是所有HDFS元数据的仲裁者和管理者,这样,用户数据永远不会流过Namenode。
4.4 HDFS之SecondaryNameNode
SecondaryNameNode---辅助节点,用于同步元数据信息。辅助NameNode对fsimage和edits进行合并(冷备份),下面用SNN代替
NameNode的元数据信息先往edits文件中写,当edits文件达到一定的阈值(3600秒或大小到64M)的时候,会开启合并的流程。合并流程如下:
①当开始合并的时候,SNN 会把edits和fsimage拷贝到自己服务器所在内存中,开始合并,合并生成一个名为fsimage.ckpt的文件。
②将fsimage.ckpt文件拷贝到NameNode上,成功后,再删除原有的fsimage,并将fsimage.ckpt文件重命名为fsimage。
③当SNN 将edits和fsimage拷贝走之后,NameNode会立刻生成一个edits.new文件,用于记录新来的元数据,当合并完成之后,原有的edits文件才会被删除,并将edits.new文件重命名为edits文件,开启下一轮流程。
4.5 HDFS读取文件过程
过程描述:
(1)客户端调用FileSyste对象的open()方法在分布式文件系统中打开要读取的文件。
(2)分布式文件系统通过使用RPC(远程过程调用)来调用namenode,确定文件起始块的位置。
(3)分布式文件系统的DistributedFileSystem类返回一个支持文件定位的输入流FSDataInputStream对象,FSDataInputStream对象接着封装DFSInputStream对象(存储着文件起始几个块的datanode地址),客户端对这个输入流调用read()方法。
(4)DFSInputStream连接距离最近的datanode,通过反复调用read方法,将数据从datanode传输到客户端。
(5)到达块的末端时,DFSInputStream关闭与该datanode的连接,寻找下一个块的最佳datanode。
(6)客户端完成读取,对FSDataInputStream调用close()方法关闭连接。
4.6 HDFS文件写入的过程
过程描述:
(1) 客户端通过对DistributedFileSystem对象调用create()函数来新建文件。
(2) 分布式文件系统对namenod创建一个RPC调用,在文件系统的命名空间中新建一个文件。
(3)Namenode对新建文件进行检查无误后,分布式文件系统返回给客户端一个FSDataOutputStream对象,FSDataOutputStream对象封装一个DFSoutPutstream对象,负责处理namenode和datanode之间的通信,客户端开始写入数据。
(4)FSDataOutputStream将数据分成一个一个的数据包,写入内部队列“数据队列”,DataStreamer负责将数据包依次流式传输到由一组namenode构成的管线中。
(5)DFSOutputStream维护着确认队列来等待datanode收到确认回执,收到管道中所有datanode确认后,数据包从确认队列删除。
(6)客户端完成数据的写入,对数据流调用close()方法。
(7)namenode确认完成。
五、Hadoop核心之YARN
5.1 YARN是什么
分布式资源管理框架:
(1)管理整个集群的资源(内存、CPU)
(2)分配调度集群的资源
5.2 YARN架构图
5.3 YARN的原理
5.4 YARN上的执行流程
A、ResourceManager
B、NodeManager
C、ApplicationMaster
D、Container
◆MapReduce在YARN上的执行流程:
①client提交job,首先找ResourceManager(ApplicationsManager)分配资源,同时将jar包默认拷贝10份到hdfs。
②ResourceManager指定一个NodeManager开启一个container,在Container中运行一个ApplicationMaster来管理这个应用程序。
③ApplicationMaster会计算此个应用所需资源,向ResourceManager(ResourceScheduler)申请资源。
④ResourceManager会分配资源,在NodeManager上开启不同的container,在container中来运行map任务或者reduce任务
⑤当所有的task都执行完了,ApplicationMaster会将结果反馈给客户端,所有工作执行完成之后,ApplicationMaster就会自行关闭。
⑥如果某个map任务或者reduce任务失败,ApplicationMaster会重新申请新的container来执行这个task。
5.5 YARN步骤组成
(1)作业提交
client调用job.waitForCompletion方法,向整个集群提交MapReduce作业 (第1步) 。 新的作业ID(应用ID)由资源管理器分配(第2步). 作业的client核实作业的输出, 计算输入的split, 将作业的资源(包括Jar包, 配置文件, split信息)拷贝给HDFS(第3步). 最后, 通过调用资源管理器的submitApplication()来提交作业(第4步).
(2)作业初始化
当资源管理器收到submitApplciation()的请求时, 就将该请求发给调度器(scheduler), 调度器分配container, 然后资源管理器在该container内启动应用管理器进程, 由节点管理器监控(第5步).
MapReduce作业的应用管理器是一个主类为MRAppMaster的Java应用. 其通过创造一些bookkeeping对象来监控作业的进度, 得到任务的进度和完成报告(第6步). 然后其通过分布式文件系统得到由客户端计算好的输入split(第7步). 然后为每个输入split创建一个map任务, 根据mapreduce.job.reduces创建reduce任务对象.
(3)任务分配
如果作业很小,应用管理器会选择在其自己的JVM中运行任务。
如果不是小作业,那么应用管理器向资源管理器请求container来运行所有的map和reduce任务(第8步). 这些请求是通过心跳来传输的, 包括每个map任务的数据位置, 比如存放输入split的主机名和机架(rack). 调度器利用这些信息来调度任务, 尽量将任务分配给存储数据的节点, 或者分配给和存放输入split的节点相同机架的节点.
(4)任务运行
当一个任务由资源管理器的调度器分配给一个container后, 应用管理器通过联系节点管理器来启动container(第9步). 任务由一个主类为YarnChild的Java应用执行. 在运行任务之前首先本地化任务需要的资源, 比如作业配置, JAR文件, 以及分布式缓存的所有文件(第10步). 最后, 运行map或reduce任务(第11步).
YarnChild运行在一个专用的JVM中, 但是YARN不支持JVM重用.
(5)进度和状态更新
YARN中的任务将其进度和状态(包括counter)返回给应用管理器, 客户端每秒(通过mapreduce.client.progressmonitor.pollinterval设置)向应用管理器请求进度更新, 展示给用户。
(6)作业完成
除了向应用管理器请求作业进度外,客户端每5分钟都会通过调用waitForCompletion()来检查作业是否完成. 时间间隔可以通过mapreduce.client.completion.pollinterval来设置. 作业完成之后, 应用管理器和container会清理工作状态, OutputCommiter的作业清理方法也会被调用. 作业的信息会被作业历史服务器存储以备之后用户核查.
5.5 YARN优点
与旧MapReduce相比,YARN 采用了一种分层的集群框架,它解决了旧MapReduce 一系列的缺陷,具有以下几种优势。
(1)提出了HDFS Federation,它让多个NameNode分管不同的目录进而实现访问隔离和横向扩展。对于运行中NameNode的单点故障,通过 NameNode热备方案(NameNode HA)实现
(2)YARN通过将资源管理和应用程序管理两部分分剥离开,分别由ResouceManager和ApplicationMaster负责,其中,ResouceManager专管资源管理和调度,而ApplicationMaster则负责与具体应用程序相关的任务切分、任务调度和容错等,每个应用程序对应一个ApplicationMaster
(3)YARN具有向后兼容性,用户在MRv1上运行的作业,无需任何修改即可运行在YARN之上。
(4)对于资源的表示以内存为单位 (在目前版本的Yarn中,没有考虑 cpu 的占用 ),比之前以剩余 slot 数目更合理。
(5)支持多个框架, YARN不再是一个单纯的计算框架,而是一个框架管理器,用户可以将各种各样的计算框架移植到YARN之上,由YARN进行统一管理和资源分配。目前可以支持多种计算框架运行在YARN上面,比如MapReduce、Storm、Spark、Flink等
(6)框架升级更容易, 在YARN中,各种计算框架不再是作为一个服务部署到集群的各个节点上(比如MapReduce框架,不再需要部署JobTracler、 TaskTracker等服务),而是被封装成一个用户程序库(lib)存放在客户端,当需要对计算框架进行升级时,只需升级用户程序库即可,多么容易!
5.6 YARN优势
一、可扩展性
(1)MapReduce 1 最多可支持4000个节点的集群.因为JobTracker负责的职责太多而成为瓶颈
(2)Yarn 可以支持10000个节点,并行100000个task.
二、可靠性
(1)Yarn的ResourceManager职责很简单,很容易实现HA;
(2)MapReduce 1 的JobTracker的状态变化非常迅速(想象下每个Task过几秒都会想它报告状态). 这使得JotTracker很难实现HA(高可用性).通常HA都是通过备份当前系统的状态然后当系统失败备用系统用备份的状态来继续工作.
三、并行性
(1)MapReduce 1只能运行MapReduce应用
(2)Yarn最大的好处之一就是职称很多其他类型的分布式Application
5.7 YARN之ResourceManager概述(RM)
YARN分层结构的本质是 ResourceManager。这个实体控制整个集群并管理应用程序向基础计算资源的分配。ResourceManager 将各个资源部分(计算、内存、带宽等)精心安排给基础 NodeManager(YARN 的每节点代理)。ResourceManager 还与 ApplicationMaster 一起分配资源,与 NodeManager 一起启动和监视它们的基础应用程序。在此上下文中,ApplicationMaster 承担了以前的 TaskTracker 的一些角色,ResourceManager 承担了 JobTracker 的角色。
a)调度器(Schedule)
调度器根据容量、队列等限制条件(如每个队列分配一定的资源,最多执行一定数量的作业等),将系统中的资源分配给各个正在运行的应用程序。该调度器是一个“纯调度器”,它不再从事任何与具体应用程序相关的工作。
b)应用程序管理器(Application Manager)
应用程序管理器负责管理整个系统中所有的应用程序,包括应用程序提交、调度协调资源以启动ApplicationMaster、监控ApplicationMaster运行状态并在失败时重新启动它。
总的来说,RM有以下作用
1)处理客户端请求
2)启动或监控ApplicationMaster
3)监控NodeManager
4)资源的分配与调度
5.8 YARN之ResourceManager功能
(1)ResourceManager负责整个集群的资源管理和分配,是一个全局的资源管理系统。
(2)NodeManager以心跳的方式向ResourceManager汇报资源使用情况(目前主要是CPU和内存的使用情况)。RM只接受NM的资源回报信息,对于具体的资源处理则交给NM自己处理。
(3)YARN Scheduler根据application的请求为其分配资源,不负责application job的监控、追踪、运行状态反馈、启动等工作。
5.9 YARN之ApplicationMaster概述(AM)
ApplicationMaster管理在YARN内运行的每个应用程序实例。ApplicationMaster 负责协调来自 ResourceManager 的资源,并通过 NodeManager 监视容器的执行和资源使用(CPU、内存等的资源分配)。请注意,尽管目前的资源更加传统(CPU 核心、内存),但未来会带来基于手头任务的新资源类型(比如图形处理单元或专用处理设备)。从 YARN 角度讲,ApplicationMaster 是用户代码,因此存在潜在的安全问题。YARN 假设 ApplicationMaster 存在错误或者甚至是恶意的,因此将它们当作无特权的代码对待。
总的来说,AM有以下作用
1)负责数据的切分
2)为应用程序申请资源并分配给内部的任务
3)任务的监控与容错
5.10 YARN之ApplicationMaster功能
(1)用户提交的每个应用程序均包含一个ApplicationMaster,它可以运行在ResourceManager以外的机器上。
(2)负责与RM调度器协商以获取资源(用Container表示)。
(3)将得到的任务进一步分配给内部的任务(资源的二次分配)。
(4)与NM通信以启动/停止任务。
(5)监控所有任务运行状态,并在任务运行失败时重新为任务申请资源以重启任务。
(6)当前YARN自带了两个ApplicationMaster实现,一个是用于演示AM编写方法的实例程序DistributedShell,它可以申请一定数目的Container以并行运行一个Shell命令或者Shell脚本;另一个是运行MapReduce应用程序的AM—MRAppMaster。
(7)注:RM只负责监控AM,并在AM运行失败时候启动它。RM不负责AM内部任务的容错,任务的容错由AM完成。
5.11 YARN之NodeManager概述(NM)
NodeManager管理YARN集群中的每个节点。NodeManager 提供针对集群中每个节点的服务,从监督对一个容器的终生管理到监视资源和跟踪节点健康。MRv1 通过插槽管理 Map 和 Reduce 任务的执行,而 NodeManager 管理抽象容器,这些容器代表着可供一个特定应用程序使用的针对每个节点的资源。
总的来说,NM有以下作用
1)管理单个节点上的资源
2)处理来自ResourceManager的命令
3)处理来自ApplicationMaster的命令
5.12 YARN之NodeManager功能
(1)NodeManager是每个节点上的资源和任务管理器,它是管理这台机器的代理,负责该节点程序的运行,以及该节点资源的管理和监控。YARN集群每个节点都运行一个NodeManager。
(2)NodeManager定时向ResourceManager汇报本节点资源(CPU、内存)的使用情况和Container的运行状态。当ResourceManager宕机时NodeManager自动连接RM备用节点。
(3)NodeManager接收并处理来自ApplicationMaster的Container启动、停止等各种请求。
5.12 YARN之Container
Container是 YARN 中的资源抽象,它封装了某个节点上的多维度资源,如内存、CPU、磁盘、网络等,当AM向RM申请资源时,RM为AM返回的资源便是用Container表示的。YARN会为每个任务分配一个Container,且该任务只能使用该Container中描述的资源。
总的来说,Container有以下作用
对任务运行环境进行抽象,封装CPU、内存等多维度的资源以及环境变量、启动命令等任务运行相关的信