先思考问题
我们处在一个大数据的时代已经是不争的事实,这主要表现在数据源多且大,如互联网数据,人们也认识到数据里往往隐藏着规律或模式,能够带来巨大的商业价值。其中最为人们所津津乐道的案例就是沃尔玛将啤酒和纸尿裤搭配销售的案例。
现在往往不缺少数据,但是面对海量数据势必要求有高效安全的存储海量数据的方案,有了数据后要发现其中隐藏的价值就需要分析数据,因此,存储和分析是大数据工具所需要解决的主要问题。
目前,机器的存储能力在不断的增长,但是读取速度却相形见绌。最容易想到的提高读取效率的方法是多磁盘并行读取,就像RAID阵列一样。多磁盘协同存储所要解决的第一个问题就是硬件失效问题,常用的方法就是狡兔三窟-复制,另外一个问题就是分析任务需要从多个存储设备中读取数据,如何协同的做好这些事情是一个不小的挑战。
Hadoop正是为解决存储和分析问题而生,Hadoop的HDFS提供了一种安全可靠的分布式文件存储系统,并提供了基于批处理模式的MapReduce数据分析框架。
那么,存储和分析的问题为什么就不能用我么所熟悉的RDBMS来处理呢?
在存储上,RDBMS适合结构化数据的存储,且保证数据的完整性等约束,而HDFS适合存储非结构化或半结构化数据,并不对数据进行完整性约束。且HDFS本身是以分布式方式存储海量数据而做为己任,在存储的可扩展能力具备线性增长能力。在分析上,RDBMS的B树索引结构适合小范围更新和点查,MapReduce适合一次存储多次即席查询,适合需要批处理的方式读取整个数据块来做线下分析场景。在其他方面,RDBMS承担着数据操作事务控制等责任,HDFS并不存在事务的特性。
简介
Hadoop的作者是Doug Cutting,Lucene的作者。该项目起源于一个web搜索引擎项目Nutch,这个项目是Lucene的一部分。当时项目正需要一个强大的分布式文件系统来管理从网络上抓取的海量数据,而此时google发表了GFS的论文,因此团队就以GFS论文而开发并将其开源。
Hadoop这个名字是编造的,作者的孩子给一个黄色的布料玩具小象起的名字。
Hadoop这个词在狭义上指HDFS和MapReduce,广义上来讲是Hadoop生态,包含了很多和分布式计算以及数据处理的工程。了解Hadoop最重要的是要理解HDFS和MapReduce。
HDFS
概念
DFS即分布式文件系统,分布式文件存储在多个机器组成的集群中,用来管理分布式文件存储的系统称之为分布式文件系统。
HDFS即Hadoop分布式文件系统,它擅长存储大文件,流式读取,运行于一般性的商业硬件上。HDFS不适合存储大量的小文件,namenode会在内存中存储元数据,通常情况下每个文件、目录和块都将占用150个字节;也不适合任意并发写的场景,HDFS的写文件操作是append的模式。
基础中的基础
在HDFS中,文件被分割成不同的块存储在集群的数据节点里,文件系统的元数据由文件系统集中管理。
-
block
文件系统的块通常是512字节,但是HDFS默认128M,但是和普通的文件系统相比,如果一个文件没有达到128M,其并不会占满整个块。块默认如此大是为了减少寻址时间。抽象出块的概念的好处在于一个文件的大小可以超过整个磁盘,简化存储管理,很适合复制机制来实现高可用。
-
namenode和datanode
namenode管理文件系统的命名空间,维护文件系统树、所有文件、目录以及块的元数据,元数据有两种namespace image和edit log。
namenode拥有block和datanode之间的映射关系,但是它并不持久化这些信息,这些信息来源依赖于datanode启动时向namenode发送的报告。
datanode维护着最终的block,并定期向namenode发送该datanode包含的block列表。
-
datanode的选取策略
HDFS的块采用复制机制做到数据高可用,默认情况下块副本一共是三个。namenode在选取副本存储的datanode节点时,平衡稳定性和读写带宽后遵循一定的策略。Hadoop的策略是第一个副本放在和客户端程序同个节点上,如果客户端不在集群里,该数据节点将随机选取,当然也会有其他常规检查,如不会选择非常忙碌的机器。第二个副本放在另一个不同的机架的数据节点,第三个副本放在和第二个副本同机架的不同节点上。如果有更多的副本,其他的就随机放入集群的节点,但是也会避免太多的副本放在同一个机架上。
了解下可用性相关
namenode的抗失效能力很重要,一种方法是备份元数据文件,Hadoop提供相关配置,可以使namenode的持久化状态存储于多个文件系统,比如本地磁盘或NFS。另一种方式是启动secondary namenode,它定期的将namespace image和edit log合并,但是它毕竟和主namenode存在状态差,所以主namenode失效后,经常存在丢失数据的问题。这种情况下,一般会拷贝主namenode的元数据文件到secondary namenode,并使其成为主namenode。
-
HDFS联邦
HDFS联邦通过一组namenode合作,每个namenode管理不同的命名空间,且不相互依赖。但是block池的信息并不进行分区,datanode要向每个namenode报告block信息。
-
HDFS HA
备份元数据以及secondary namenode用来低于数据丢失,但是并不能达到高可用的状态。
要从一个失效的namenode中恢复,管理员需要启动一个新的namenode应用备份的元数据,并配置datanode和相关客户端使用新的namenode。新的namenode需要加载完namespace image,重做edit log,接收到足够的block报告,才能对外正常提供服务。
为了实现HA,
- 所有namenode必须采用高可用的共享存储来共享edit log。
- datanode发送block报告给所有namenode。
- 客户端程序需要处理namenode实现的场景,客户端对失效转移的处理是透明的,由客户端的相关库实现
- secondary namenode作为备用,定期从主namenode获取命名空间
一旦发生失效,管理员可以通过冷启动的方式激活备用。
-
失效转移
采用ZooKeeper维持一个转移序列来保证只有一个namenode处于激活状态。HA要确保前一个激活的namenode不会做出冲突的破坏活动,QJM只允许一次只有一个namenode写edit log。然是仍然可能存在前一个激活的namenode向某个客户端请求提供着过时的服务,最好是设置一个ssh命令来杀死那个namenode进程。
Hadoop文件系统
Hadoop对文件系统进行了抽象:org.apache.hadoop.fs.FileSystem代表Hadoop文件系统,并提供一些具体的实现。例如:本地文件系统和HDFS,
Filesystem | URI scheme | Java Implementation |
---|---|---|
Local | file | fs.LocalFileSystem |
HDFS | hdfs | hdfs.DistributedFileSystem |
-- | -- | -- |
Java接口
读取数据
获取FileSystem实例的静态工程方法:
public static FileSystem get(Configuration conf);
public static FileSystem get(URI uri,Configuration conf);
public static FileSystem get(URI uri,Configuration conf,String user);
configuration对象封装了classpath下的配置文件,例如etc/hadoop/core-site.xml。文件系统类型是由URI scheme决定,或者由core-site.xml的配置决定(如果没有指定便默认本地文件系统)。第三个方法是作为某个用户去获取FileSystem实例。
通过open方法来获取一个文件的输入流:
public FSDataInputStream open(Path f);
例如:
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri),conf);
InputStream in = fs.open(new Path(uri));
FSDataInputStream继承java.io.DataInputStream并实现了Seekable,所以它支持读取流的任何位置。
void seek(long pos);
long getPos();
FSDataInputStream也实现了PositionedReadable,可以做到在给定位置读取内容。
int read(long position,byte[] buffer,int offset,int length)
int readFully(long position,byte[] buffer,int offset,int length)
int readFully(long position,byte[] buffer)
read方法实现了在position位置读取length长度的字节,并存入buffer的offset为位置。
readFull从buffer中读取length长度的字节,或者全部字节(如果没有指定length)。
写数据
FileSystem提供创建文件的方法:
public FSDataOutputStream create(Path f)
也可以利用带Progressable参数的方法,这样应用程序就会得到进度通知。默认每向管线存储64KB的数据将会通知一次。
public FSDataOutputStream create(Path f,Progressable p)
public interface Progressable{
public void progress();
}
另一种方案是在现有的文件基础上累加内容:
public FSDataOutputStream append(Path f)
FSDataOutputStream有个方法来获取当前位置
public long getPos();
最后,FileSystem提供了创建目录的操作
public boolean mkdirs(Path f)
如果父目录不存在,将自动创建。
查询文件
-
FileStatus
FileStatus封装了文件的元数据。FileSystem提供如下方法
public FileStatus getFileStatus(Path f)
FileStatus提供很多有用的方法:
getPath isDirectory getLen getModificationTime getReplication getBlockSize getOwner getGroup getPermission
-
文件列
FileSystem提供了一些方法获取文件列表:
FileStatus[] listStatus(Path f) FileStatus[] listStatus(Path f,PathFilter filter) FileStatus[] listStatus(Path[] files) FileStatus[] listStatus(Path[] files,PathFilter filter)
-
文件模式
有两个方法可以通过通配符来匹配多个文件
FileStatus[] globStatus(Path pathPattern) FileStatus[] globStatus(Path pathPattern,PathFilter filter)
Hadoop支持Unix bash shell的glob字符集。
-
路径过滤
public interface PathFilter{ boolean accept(Path path) }
删除数据
如下方法提供删除文件或目录
boolean delete(Path f,boolean recursive)
读写工作流
-
读文件
在namenode的指引下,客户端程序找到每个数据块的最佳datanode,然后直接和其联系读取数据。
- 客户端调用FileSystem,此处为分布式文件系统DistributedFileSystem.open方法
- DistributedFileSystem向namenode获取第一批块的位置,namenode返回包含块拷贝的datanode地址,并按着和客户端的就近原则进行排序
- DistributedFileSystem返回FSDataInputStream实例,该实例负责和datanode间的IO。
- 客户端调用流的read方法
- FSDataInputStream联系连接最近的datanode,数据从datanode传回客户端
- 当块读取完后,FSDataInputStream关闭和该datanode的连接,并为下个块连接最佳的datanode,这个过程对客户端来说是透明的
- 如果需要,文件系统将向namenode获取下一批块的位置
- 客户端调用close方法结束读取
- 写文件
- 客户端调用DistributedFileSystem的write方法。
- DistributedFileSystem请求namenode创建一个新文件。namenode做一些常规检查,如是否已经存在同名文件,是否有权限创建。
- DistributedFileSystem返回一个FSDataOutputStream实例用于写数据
- 客户端写数据,FSDataOutputStream对数据进行分包,并将数据包放入data队列
- DataStreamer要求namenode分配新的块,这些块会被namenode分配一些用于存放拷贝的datanode。这些datanode组成一个管线,第一个datanode存储每个包,并将其传送到第二个datanode,后面依次相似。
- ack队列维护着等待datanode确认的分包,一个分包只有在收到管线中所有datanode的确认后才会移除。
- 调用close方法,完成写入。
- close方法flush所有分包,并等待确认,最后通知namenode告知文件创建完成。
MapReduce
感受MapReduce程序
MapReduce是Hadoop下的数据处理编程模型,运行于基于yarn架构的MapReduce框架中。下面通过一个例子来感受下MapReduce。该例子出自《Hadoop The Definitive Guide》第四版的关于气象数据的例子,目前第四版没有中文版,有能力的话可以看看这本书。
准备数据
原始数据是一些的tar文件,每年的气象数据都在一个单独的tar文件里。每个tar文件包含若干zip文件,每个zip文件是不同气象站当年的气象数据。类似2009.tar,其下包含气象站1.gz、气象站n.gz,其中每条气象数据占一行,这种结构。
Hadoop善于处理大文件,所以作者将文件解压,然后按年合并成单独的文件。
自己写个程序
如果让我们来实现一个程序用来计算每年的最大温度怎么办?可能会类似如下伪代码:
for(year:all){
InputStream in = readFile(year); //读取每年的文件
String line;
float max;
while(line = in.readLine()!=null){
float tmp = 在line固定区间读取温度值;
if(tmp>max){
max = tmp;
}
}
print(year,max);
}
为了加快处理速度,最好是能并行处理这些数据文件,比如按年分给不同的进程去处理。但是这会有一些问题,给每个进程分配均等的任务量不是件容易的事情,每年的数据量可能大相径庭,按木桶原理,任务的处理速度取决于最慢的那个进程也就是处理气象数据文件最大的那个进程。
那么最好的方式就是将文件分成相同大小的块,每个块都由不同的进程去处理。这样,每个进程处理不同的块,计算当前块范围内的温度最大值,形成中间结果。要得到最终结果,接下来需要把这些中间结果再汇集起来,求出最终的最大值,这就是mapreduce框架要的事情。
MapReduce程序
MapReduce程序运行分为两个阶段,map和reduce。每个阶段都由key-value这种形式的数据做为输入输出,具体的数据类型由编程者决定,编程者并未这两个阶段分别指定map函数和reduce函数。
在map阶段,map函数接收原始数据,在该例中选择文本类型的格式,这种格式会自动将一行的文本内容作为value,文本内容在文件中的偏移量作为key,一般情况下代表偏移量的key对我们来说没有太大用处。map函数读取文本内容,读取出年并作为key,读取温度值作为value,输出给mapreduce框架。
mapreduce框架拿到map阶段的输出结果后,按key分组并排序,这样每一年对应一组温度值,然后将分组排序后的结果按key传给reduce,进入reduce阶段。
reduce函数遍历传来的value集合,计算最大值,以传入的key也就是年作为key,最大值作为value进行输出。如下图所示:
java实现
实现mapreduce程序需要实现三个主要部件,mapper、reducer和一个用来启动job的main程序。
Hadoop提供了抽象的Mapper类,具有四个泛型参数,用来指定输入key、输入value、输出key、输出value的类型。
public class MaxTemperatureMapper extends Mapper
<LongWritable,Text,Text,IntWritable>{
public void map(LongWritable key,Text value,Context context){
String line = value.toString();
String year = line.substring(15,19);
int airTemperature;
......
context.write(new Text(year),new IntWritable(airTemperature))
}
}
map方法有三个参数,分别代表输入key,输入value,上下文对象。通过上下文对象输出map阶段的分析结果。
同样的,Hadoop提供了抽象的Reducer类,具有四个参数,指定了reducer阶段的输入输出类型。
public class MaxTemperatureReducer extends Reducer
<Text,IntWritable,Text,IntWritable>{
public void reduce(Text key,Iterable<IntWritable> values,Context context){
int maxValue=Integer.MIN_VALUE;
for(IntWritable v:values){
......
}
context.write(key,new IntWritable(maxValue));
}
}
最后,需要开发一个main函数来运行job。
public class MaxTemperature{
public static void main(String[] args){
Job job = new Job();
job.setJarByClass(MaxTemperature.class);
job.setJobName("Max Temperature");
FileInputFormate.addInputPath(job,new Path(args[0]));
FileOutputFormate.setOutputPath(job,new Path(args[1]));
job.setMapperClass(MaxTemperatureMapper.class);
job.setReducerClass(MaxTemperatureReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
System.exit(job.waitForCompletion(true)?0:1);
}
}
运行这个job时需要将代码打包成jar包,在一个分布式模式的hadoop中运行时,jar包会在集群中传递,以便在目标机器运行。setJarByClass告诉hadoop根据class找到相关的jar包,但也可以在运行时直接指定jar包。
输入路径可以是文件或文件夹或模式匹配的路径,但输出输出路径必须是一个文件夹,且在运行job之前该文件夹不存在。setMapperClass和setReducerClass指定mapper和reducer,setOutputKeyClass和setOutputValueClass指定reducer的输出类型,该类型必须和reducer类型中声明的类型一致,mapper的输出类型默认和此一致,所以不用设置。否则,通过setMapOutputKeyClass和setMapOutputValueClass来设置。
输入类型会默认为TextInputFormat,否则通过setInputFormateClass设置。
waitForCompletion用来提交job,true使用来指定是否打印调试信息。返回true代表执行成功,否则执行失败。
运行
将程序打成jar包,通过命令行运行
MapReduce任务工作原理
MapReduce任务的工作过程如下图所示:
整个过程设计5个组件:客户端、YARN资源管理器、YARN节点管理器、MapReduce application master、HDFS
- 客户端程序通常是MapReduce驱动程序提交一个任务
- YARN资源管理器统一协调集群的计算资源
- YARN节点管理器启动和监控机器上的容器
- MapReduce application master为job协调各task的运行。application master和MapReduce task都运行在集群中的某机器中的具体容器中,这些容器都受YARN资源管理器调度并被具体节点管理器管理。
- HDFS用于在组件间共享job相关的文件
job的提交
job提交的过程如下:
- 向资源管理器询问新的应用ID用做job的ID
- 检查输出路径设置是否规范,比如没有指定或者已经存在,将会报错
- 为job计算输入分片,如果无法为jbo计算出输入分片,比如指定的输入路径不存在,也将会报错
- 拷贝job所需要的资源,包括运行job的jar文件,配置文件,为其计算出的输入分片都将拷贝到共享文件系统中以job ID命名的文件夹中
- 向资源管理器提交任务
job的初始化
资源管理器接到提交job请求时,YARN调度器启动一个容器用于运行application master(step 5)。
MapReduce job的application master是一个java应用程序,其主要类是MRAppMaster。MRAppMater做一些初始化的工作,如创建一些用来来接收task的进度报告的对象,用以跟踪整个job的进度(step6)。
接收共享文件系统中的输入分片(step7)。为每个输入分片创建一个map task,根据配置创建若干reduce task。application master必须决定如何运行这些task,如果job非常小,它可以选择和这些task运行于同一个jvm中。默认的,少于10个mapper,只有一个reducer,输入大小少于一个HDFS block的job被认定为是一个小作业。application master会任务这样的job如果并行的运行于不同的容器中,弊大于利。不如将这些task运行于同一个节点串行的运行。这样的job会被称为一个uber task。
task的分配
如果job不是被认定为是一个uber task,application master会为各个map和reduce task向资源管理器申请容器(step 8)。map task的请求优先于reduce task,且在map task完成5%之前,不会创建reduce task的请求。
reduce task可以运行于集群的任何地方,但是map task必须受数据本地化的约束,最佳的场景是map task要运行于分片所在的节点,称之为数据本地化,其次,是运行于和分片数据处于同一机架的节点中,这称之为机架本地化,如果数据本地化和机架本地化都无法满足,只能跨机架完成任务了。
请求可以附加对资源的要求,如内存和CPU的需求,默认情况下,每个map和reduce task分配1024MB内存和一核。
task的执行
资源管理器给一个task分配资源(该资源是在某个节点的用于运行一个容器的资源),application master联系对应的node manager,启动容器(step 9)。
task被一个java应用来运行,其主类YarnChild,在其运行任务前,其先定位任务所需的所有资源,包括job配置、jar文件(step 10)。
运行任务(step 11)。
状态更新
每个job以及每个task都有一个status,包括状态,如running,successfully completed,failed,map和reduce的进度。每个task定时(默认一秒)向application master汇报自己的进度,对于map task,进度就是其处理的输入百分比,对于reduce task,就需要估算。
失效处理
task失效
最常见的失效情况是task代码里抛出了运行时异常,执行task的jvm在其退出之前向application master报告错误,application master会标记该task是失败状态,释放对应的容器,该容器所占用的资源会被其他task所利用。
类似jvm突然停止工作的异常,node manager会通知application master执行task的jvm进程退出了,task也将被application master置为失败状态。
对于挂起状态的task通过超时来处理,application master在很久没有收到task的进度更新后,会将其置为失效,超时时间可以设置。当然也可以设置为永不超时。
application master将重新调度失败的task,重新调度将避免选择先前失效时所在的node manager。默认情况下,一个task失败达到4次,该task不再尝试执行,job也将失败。对于一些不希望task失败就直接导致job失败的应用来说,需要设置一个参数来控制task失效在某百分比以内时不会触发job失败。
task可能会被杀死,这种状况有别于失败,例如node manager在失效状态下,运行其中的所有task都将被杀死,当然用户也可以自己通过工具将其杀死,被杀死的task,不计入尝试次数范围。
application master失效
MapReduce application master的默认失败尝试次数是2,所以达到2次时就认为job失败,也可以自行设置。Yarn框架也提供了application master的最大尝试次数,同时默认也是2。
application master定期向resource manager发送心跳,resource manager一旦检测到application master失败,就会在一个新的容器中启动新的application master实例。在这种状况下,MapReduce application master使用job的执行历史来恢复task的运行状态,已经运行的不会再重新运行
MapReduce客户端会不断从application master那里获取job的进度,application master失效后,客户端将自动重新定位到新的application master。
node manager失效
node manager定期向resource manager发送心跳,默认超时为10分钟。一旦失效,将被移出节点池。任何运行于失效node manager所在机器上的task或application master都将得到恢复。
如果一个应用在某node manager 上发生失败过多,该node manager将被application master加入黑名单,(资源管理器并不跨应用维护黑名单,所以新job的task可能会运行于一个被某application master列入黑名单的节点)即使其本身并没有失效。如果超过三个task失败,MapReduce application master将在不同的节点上重新调度task。
resource manager失效
resource manager一旦失效,job以及task容器无法启动,是一件很严重的故障,所以很有必要做HA。应用的信息被存储在HA的状态存储中(ZooKeeper或HDFS)。
新的resource manager启动时,从状态存储中读取应用的信息,重启application master.
客户端和node manager必须处理resource manager失效问题,因为可能和两个resource manager交互。