HDFS读写流程剖析
本文为 《Hadoop The Definitive Guide 4th Edition》的读书笔记(或者叫翻译),仅限交流使用, 转载请注明出处。
剖析读流程
下面这个图片 3-2 总结性的描述了读文件时客户端与 HDFS 中的 namenode, datanode 之间的数据流动。
客户端首先通过在 FileSystem 上调用 open() 方法打开它想要打开的文件, 对于 HDFS 来说, 就是在 DistributedFileSystem 的实例上调用(第1步)。 之后 DistributedFileSystem 就使用 remote procedure call(RPCs)去呼叫 namenode,去查明组成文件的前几个块的位置(第2步)。对于每一个块,namenode 返回拥有块拷贝的 datanode 的地址。幸运的是,这些 datanode 会按照与客户端的接近度来排序(接近度是按照集群网络中的拓扑结构来计算的,后面会说到)。如果客户端节点自己就是一个 datanode,而且该节点的肚子里存了一个块的拷贝,客户端就直接从本地 datanode 读取块。
DistributedFileSystem 返回一个 FSDataInputStream(支持文件 seek 的输入流)给客户端,客户端就能从流中读取数据了。 FSDataInputStream 中封装了一个管理了 datanode 与 namenode I/O 的 DFSInputStream。
然后客户端就调用 read() 方法(第3步)。 存储了文件的前几个块的地址的 DFSInputStream,就会连接存储了第一个块的第一个(最近的) datanode。 然后 DFSInputStream 就通过重复调用 read() 方法,数据就从 datanode 流动到了客户端(第4步)。当 该 datanode 中最后一个块的读取完成了, DFSInputStream 会关闭与 datanode 的连接,然后为下一块寻找最佳节点(第5步)。这个过程对客户端来说是透明的,在客户端那边看来,就像是只读取了一个连续不断的流。
块是按顺序读的,通过 DFSInputStream 在 datanode 上打开新的连接去作为客户端读取的流。他也将会呼叫 namenode 来取得下一批所需要的块所在的 datanode 的位置(注意刚才说的只是从 namenode 获取前几个块的)。当客户端完成了读取,就在 FSDataInputStream 上调用 close() 方法结束整个流程。
在读取过程中, 如果 FSDataInputStream 在和一个 datanode 进行交流时出现了一个错误,他就去试一试下一个最接近的块,他当然也会记住刚才发生错误的 datanode 以至于之后不会再在这个 datanode 上进行没必要的尝试。 DFSInputStream 也会在 datanode 上传输出的数据上核查检查数(checknums).如果损坏的块被发现了,DFSInputStream 就试图从另一个拥有备份的 datanode 中去读取备份块中的数据。
在这个设计中一个重要的方面就是客户端直接从 datanode 上检索数据,并通过 namenode 指导来得到每一个块的最佳 datanode。这种设计允许 HDFS 扩展大量的并发客户端,因为数据传输只是集群上的所有 datanode 展开的。期间,namenode 仅仅只需要服务于获取块位置的请求(块位置信息是存放在内存中,所以效率很高)。如果不这样设计,随着客户端数据量的增长,数据服务就会很快成为一个瓶颈。
集群上的拓扑结构
我们知道,相对于客户端(之后就是 mapreduce task 了),块的位置有以下可能性:
- 在客户端所在节点上(0,也就是本地化的)
- 和客户端不在同一个节点上,但在同一个机架上(2)。
- 和客户端不在同一个机架上,但是在同一个数据中心里(4)。
- 就是与客户端不在一个数据中心(6)。
HDFS ( datanode? namenode?这里我也不知道是谁来完成这个排序) 就是根据上面的四种可能性来对节点进行接近度计算。他们的分值分别为 0,2,4,6:
剖析写流程
写流程的图如下:
首先客户端通过在 DistributedFileSystem 上调用 create() 方法(第1步)来创建一个文件。 DistributedFileSystem 使用 RPC 呼叫 namenode ,让他
在文件系统的命名空间上创建一个没有与任何块关联的新文件(第2步), namenode 会执行各种各样的检查以确认文件之前是不存在的,并确认客户端是否拥有创建文件的权限。如果检查通过。 namenode 就会为新文件生成一条记录;否则,文件创建就会失败,客户端会抛出一个 IOException。 成功以后,DistributedFileSystem 会返回一个 FSDataOutputStream 给客户端以让他开始写数据。和读流程中一样,FSDataOutputStream 包装了一个 DFSOutputStream,他掌握了与 datanode 与 namenode 的联系。
当客户端开始写数据(第3步),DFSOutputStream 将文件分割成很多很小的数据,然后将每个小块放进一个个包(数据包,包中除了数据还有描述数据用的标识)中, 包们会写进一个名为数据队列(data quence)的内部队列。数据队列被 DataStreamr 消费,他负责要求 namenode 去挑选出适合存储块备份的 datanode 的一个列表(注意,这里是文件的一个块,而不是整个文件)。这个列表会构成一个 pipeline(管线),这里假定备份数为3,所以在 pipeline 中就会有三个 datanode , DataStreamer 将能够组成块的的包先流入 pipeline 中的第一个 datanode ,第一个 datanode 会先存储来到的包,然后继续将所有的包转交到 pipeline 中的第二个 datanode 中。相似的,第二个 datande 也会存储这些包,并将他们转交给 pipeline 中的第三个(最后一个) datanode (第4步)。
数据的流动的方式应该还有两种,第一种就是第一个 datanode 得到所有的数据包后并写入后,才将数据包往下传递;第二种就是一旦数据包写入成功就直接传给下一个 datanode,这种可能性最大。不影响大局,具体是哪种待确认。注意这里的写入就是写入到磁盘里。
DFSOutputStream 也会维护一个包们的内部队列,其中也会有所有的数据包,该队列等待 datanode们 的写入确认,所以叫做确认队列(ack quence)。当一个包已经被 pipeline 中的所有 datanode 确认了写如磁盘成功,这个包才会从 确认队列中移除(第5步)。如果在任何一个 datanode 在写入数据的时候失败了,接下来所做的一切对客户端都是透明的:首先, pipeline 被关闭,在确认队列中的剩下的包会被添加进数据队列的起始位置上,以至于在失败的节点下游的任 何节点都不会丢失任何的包。
这里有些疑问,就是数据包写数据时的数据队列的状态,是一直不变,写入了再移除,还是早就清空了。按照上面的说法,失败了就将剩下的还未写入的数据包添加(应该是拷贝)回数据队列,数据队列“一直不变”和“写入了再移除数据包”不就会出现重复了。而清空的话,应该是出错了之后才清空。那这样为什么不用数据队列作为确认队列,当发现都写入成功了,就将包从队首移除? 这个也待确认。
然后与 namenode 联系后,当前在一个好的 datanode 会联系 namenode, 给失败节点上还未写完的块生成一个新的标识ID, 以至于如果这个失败的 datanode 不久后恢复了,这个不完整的块将会被删除。
失败节点会从 pipeline 中移除,然后剩下两个好的 datanode 会组成一个的新的 pipeline ,剩下的 这些块的包(也就是刚才放在数据队列队首的包)会继续写进 pipeline 中好的 datanode 中。
最后,namenode 注意到块备份数小于规定的备份数,他就安排在另一个节点上创建完成备份,直接从已有的块中复制就可以。然后一直到满足了备份数(dfs.replication)。
如果有多个节点的写入失败了,如果满足了最小备份数的设置(dfs.namenode.repliction.min),写入也将会成功,然后剩下的备份会被集群异步的执行备份,直到满足了备份数(dfs.replication)。
当客户端完成了数据写入,会在流上调用 close() 方法(第6步)。 这个行为会将所有剩下的包刷新(flush)进 datanode 中,然后等待确认信息达到后,客户端就联系 namenode 告诉他文件数据已经放好了(第七步)。namenode 也一直知道文件被分成了哪些块(因为在之前是 DataStreamer 请求了块分配),所以现在在成功之前,只需要等待块满足最低限度的备份(因为刚才提到的失败)。