摘要:当数据集超出一台物理计算机的存储能力量时,便有必要将它分布到多个独立的计算机。管理着跨计算机网络存储的文件系统称为分布式文件系统。HDFS全称为Hadoop Distributed Filesystem,是Hadoop的分布式文件系统,也是当下开源社区最受欢迎的分布式文件系统。本文主要介绍了HDFS的设计、概念、文件读写原理等基础知识,并给出了HDFS各类文件操作的Java实现。
1. HDFS的设计
理念一:超大文件
HDFS面向存储几百MB,几百GB甚至几百TB大小的大文件,这是传统的本地文件系统所不及的。
理念二:流式数据访问
流式数据访问,就是一次写入,多次读取,不可以修改数据,只能删除数据。那么,为什么写入是一次性的,不可以再修改呢?在学完HDFS文件读写原理后,我们再来探讨其原因。
理念三:商用硬件
HDFS设计理念之一就是让它能运行在普通的硬件之上,即便硬件出现故障,也可以通过容错策略来保证数据的高可用。
不适合一:低延迟读写
由于HDFS主要是为达到高的数据吞吐量而设计的,所以其延迟较高,对于那些有低延时要求的应用程序,HBase是一个更好的选择。
不适合二:存储大量小文件
因为namenode把文件系统的元数据放置在内存中,所以文件系统所能容纳的文件数目是由namenode的内存大小来决定。一般来说,每一个文件、文件夹和Block需要占据150字节左右的空间,所以,如果你有100万个文件,每一个占据一个Block,你就至少需要300MB内存。当前来说,数百万的文件还是可行的,当扩展到数十亿时,对于当前的硬件水平来说就没法实现了。HBase利用SequenceFile、MapFile、Har等方式归档小文件,同时,保存归档文件的映射关系,从而解决了HDFS不适合存储大量小文件的难题。
不适合三:多用户写入和任意修改文件
在HDFS的一个文件中只有一个写入者,而且写操作只能在文件末尾完成,即只能执行追加操作。目前HDFS还不支持多个用户对同一文件的写操作,以及在文件任意位置进行修改。其原因,我们会在后面进行探讨。
2. HDFS的概念
2.1 块
在《操作系统原理》中就有磁盘块的概念,它是操作系统中最小的逻辑存储单位,可以屏蔽底层硬件存储结构的复杂性,为上层应用提供统一的存储结构和编程规范。而HDFS引入了块的概念,且默认块的大小为64MB,它比磁盘块大的多,其原因主要是为了减小块寻址操作在整个块操作中的时间比。
在分布式文件系统中使用抽象块可以带来很多好处:
好处一:可存储超大文件
对于一个超大文件,可以通过将文件分成多个块,每个分块可以存储到集群的任意一个磁盘,最后,通过整合文件的分块来恢复文件。
好处二:简化存储子系统
因为块的大小固定,计算一个磁盘能存多少块就相对容易。另外,块只是一部分存储的数据,对于文件的元数据,不需要与块一同存储,其他系统可以正交地管理元数据。
好处三:利于数据的备份
块很适合于为提供容错和实用性而做的复制操作。如果一个块损坏了,系统可以在其它地方读取另一个副本(在Hadoop搭建时可以设置块备份数量,搭建方法可参考文章大数据之路起航——大数据环境搭建),从而保证了数据的完整性。
2.2 名称节点和数据节点
HDFS集群有两种节点,以管理者-工作者的模式运行,即一个名称节点(管理者)和多个数据节点(工作者)。
名称节点(NameNode)
NameNode负责管理文件目录、文件和Block的对应关系以及Block和数据节点(DataNode)的对应关系。
数据节点(DataNode)
DataNode负责存储数据块,同时,在被用户或NameNode调用时提供块定位服务。DataNode还定时向NameNode发送它们存储的块的列表。
二级名称节点(SecondryNameNode)
负责合并NameNode的edit logs到fsimage文件中。
https://blog.csdn.net/xh16319/article/details/31375197
3. HDFS数据流
3.1 文件读取剖析
为了了解客户端及与之交互NameNode和DataNode之间的数据流是怎样的,我们可以参考图3-1,其中显示了在读取文件时一些事件的主要顺序。
- 客户端通过调用FileSystem对象的open()来读取希望打开的文件。对于HDFS来说,这个对象是分布式文件系统的一个实例。
- DistributedFileSystem通过RPC(Remote Procedure Call)通信协议来远程调用namenode,以确定文件的开头部分的块位置。对于每一块,namenode返回具有该块副本的datanode地址。此外,这些datanode根据他们与client的距离来排序(根据网络集群的拓扑)。如果该client本身就是一个datanode,便从本地datanode中读取。DistributedFileSystem 返回一个FSDataInputStream对象给client读取数据,FSDataInputStream转而包装了一个DFSInputStream对象。
- 接着client对这个输入流调用read()。存储着文件开头部分块的数据节点地址的DFSInputStream随即与这些块最近的datanode相连接。
- 通过在数据流中反复调用read(),数据会从datanode返回client。
- 到达块的末端时,DFSInputStream会关闭与datanode间的联系,然后为下一个块找到最佳的datanode。client端只需要读取一个连续的流,这些对于client来说都是透明的。
另外,在读取的时候,如果client与datanode通信时遇到一个错误,那么它就会去尝试对这个块来说下一个最近的块。它也会记住那个故障节点的datanode,以保证不会再对之后的块进行徒劳无益的尝试。client也会确认datanode发来的数据的校验和。如果发现一个损坏的块,它就会在client试图从别的datanode中读取一个块的副本之前报告给namenode。这个设计的一个重点是,client直接联系datanode去检索数据,并被namenode指引到块中最好的datanode。因为数据流在此集群中是在所有datanode分散进行的。所以这种设计能使HDFS可扩展到最大的并发client数量。同时,namenode只不过提供块的位置请求(存储在内存中,十分高效),不是提供数据。否则如果客户端数量增长,namenode会快速的成为一个“瓶颈”。
3.2 文件写入剖析
客户端要向HDFS写数据,首先要跟namenode通信以确认可以写文件并获得接收文件block的datanode,然后,客户端按顺序将文件block逐个传递给相应datanode,并由接收到block的datanode负责向其他datanode复制block的副本。
- 客户端通过在DistributedFileSystem中调用create()来创建文件。
- DistributedFileSystem 使用RPC去调用namenode,在文件系统的命名空间创一个新的文件,没有块与之相联系。namenode执行各种不同的检查以确保这个文件不会已经存在,并且在client有可以创建文件的适当的许可。如果检查通过,namenode就会生成一个新的文件记录;否则,文件创建失败并向client抛出一个IOException异常。分布式文件系统返回一个文件系统数据输出流,让client开始写入数据。就像读取事件一样,文件系统数据输出流控制一个DFSOutputStream,负责处理datanode和namenode之间的通信。
- 在client写入数据时,DFSOutputStream将它分成一个个的包,写入内部队列,称为数据队列。数据流处理数据队列,数据流的责任是根据适合的datanode的列表要求namenode分配适合的新块来存储数据副本。这一组datanode列表形成一个管线————假设副本数是3,所以有3个节点在管线中。
- 数据流将包分流给管线中第一个的datanode,这个节点会存储包并且发送给管线中的第二个datanode。同样地,第二个datanode存储包并且传给管线中的第三个数据节点。
- DFSOutputStream也有一个内部的数据包队列来等待datanode收到确认,称为确认队列。一个包只有在被管线中所有的节点确认后才会被移除出确认队列。如果在有数据写入期间,datanode发生故障, 则会执行下面的操作,当然这对写入数据的client而言是透明的。首先管线被关闭,确认队列中的任何包都会被添加回数据队列的前面,以确保故障节点下游的datanode不会漏掉任意一个包。为存储在另一正常datanode的当前数据块制定一个新的标识,并将该标识传给namenode,以便故障节点datanode在恢复后可以删除存储的部分数据块。从管线中删除故障数据节点并且把余下的数据块写入管线中的两个正常的datanode。namenode注意到块复本量不足时,会在另一个节点上创建一个新的复本。后续的数据块继续正常接收处理。只要dfs.replication.min的副本(默认是1)被写入,写操作就是成功的,并且这个块会在集群中被异步复制,直到其满足目标副本数(dfs.replication 默认值为3)。
- client完成数据的写入后,就会在流中调用close()。
- 在向namenode节点发送完消息之前,此方法会将余下的所有包放入datanode管线并等待确认。namenode节点已经知道文件由哪些块组成(通过Data streamer 询问块分配),所以它只需在返回成功前等待块进行最小量的复制。
复本的布局需要对可靠性、写入带宽和读取带宽进行权衡。Hadoop的默认布局策略是在运行客户端的节点上放第1个复本(如果客户端运行在集群之外,就随机选择一个节点,不过系统会避免挑选那些存储太满或太忙的节点。)第2个复本放在与第1个复本不同且随机另外选择的机架的节点上(离架)。第3个复本与第2个复本放在相同的机架,且随机选择另一个节点。其他复本放在集群中随机的节点上,不过系统会尽量避免相同的机架放太多复本。总的来说,这一方法不仅提供了很好的稳定性(数据块存储在两个机架中)并实现很好的负载均衡,包括写入带宽(写入操作只需要遍历一个交换机)、读取性能(可以从两个机架中选择读取)和集群中块的均匀分布(客户端只在本地机架上写入一个块)。
4. Java应用程序代码
首先,需要在maven中引入Hadoop的依赖,版本需要根据安装的hadoop版本而定,作者的hadoop版本是2.7.6。
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.6</version>
</dependency>
然后,创建HDFSConfig类,该类负责配置连接信息,并连接HDFS。
package com.whut.config;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import java.io.IOException;
public class HDFSConfig {
private static FileSystem fileSystem = null;
static {
Configuration conf = new Configuration();
conf.set("fs.defaultFS","hdfs://localhost:9000");
conf.setBoolean("dfs.support.append", true);
conf.set("dfs.client.block.write.replace-datanode-on-failure.policy", "NEVER");
conf.set("dfs.client.block.write.replace-datanode-on-failure.enable", "true");
try {
fileSystem = FileSystem.get(conf);
} catch (IOException e) {
e.printStackTrace();
}
}
public static FileSystem getFileSystem() {
return fileSystem;
}
public static void close() {
try {
fileSystem.close();
} catch (IOException e) {
e.printStackTrace();
}
}
}
最后,创建HDFS文件操作工具类,具体包括:创建文件夹、列出文件夹、文件上传、文件下载、写文件(追加和覆盖模式)、列出文件、删除文件、读文件等。
package com.whut.util;
import com.whut.config.HDFSConfig;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.io.IOUtils;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
/**
* Created by 杨赟 on 2018-07-18.
*/
public class HDFSUtil {
/**
* @since 2018-07-18
* @author 杨赟
* @describe 上传本地文件到HDFS
* @param local 本地路径
* @param hdfs DFS路径
* @return void
*/
public static void upload(String local, String hdfs){
FileSystem fs = HDFSConfig.getFileSystem();
if(fs == null)
return;
try {
fs.copyFromLocalFile(new Path(local),new Path(hdfs));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @since 2018-07-18
* @author 杨赟
* @describe 将hdfs上文件下载到本地
* @param hdfs HDFS路径
* @param local 本地路径
* @return void
*/
public static void download(String hdfs, String local){
FileSystem fs = HDFSConfig.getFileSystem();
if(fs == null)
return;
try {
fs.copyToLocalFile(new Path(hdfs), new Path(local));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @since 2018-07-18
* @author 杨赟
* @describe 在hdfs目录下面创建文件夹
* @param name HDFS文件夹名
* @return void
*/
public static void createDir(String name){
FileSystem fs = HDFSConfig.getFileSystem();
if(fs == null)
return;
try {
fs.mkdirs(new Path(name));
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @since 2018-07-18
* @author 杨赟
* @describe 创建新的文件
* @param name 文件名
* @param content 内容
* @return void
*/
public static void createFile(String name, String content) {
FileSystem fs = HDFSConfig.getFileSystem();
if(fs == null)
return;
FSDataOutputStream os = null;
try {
os = fs.create(new Path(name));
os.write(content.getBytes());
os.close();
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @since 2018-07-18
* @author 杨赟
* @describe 向文件写入新的数据
* @param name 文件名
* @param content 内容
* @return void
*/
public static void append(String name, String content){
FileSystem fs = HDFSConfig.getFileSystem();
if(fs == null)
return;
try {
if (fs.exists(new Path(name))) {
try {
InputStream in = new ByteArrayInputStream(content.getBytes());
OutputStream out = fs.append(new Path(name));
IOUtils.copyBytes(in, out, 4096, true);
out.close();
in.close();
} catch (Exception e) {
e.printStackTrace();
}
} else {
createFile(name, content);
}
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @since 2018-07-18
* @author 杨赟
* @describe 删除hdfs上的文件或文件夹
* @param name 文件或文件夹名
* @return void
*/
public static void remove(String name) {
FileSystem fs = HDFSConfig.getFileSystem();
if(fs == null)
return;
try {
fs.delete(new Path(name),true);
} catch (IOException e) {
e.printStackTrace();
}
}
/**
* @since 2018-07-18
* @author 杨赟
* @describe 列出文件
* @param path HDFS文件夹名
* @return 文件列表
*/
public static List<String> listFile(String path) {
List<String> list = new ArrayList<>();
FileSystem fs = HDFSConfig.getFileSystem();
if(fs == null)
return list;
FileStatus[] fileStatuses = new FileStatus[0];
try {
fileStatuses = fs.listStatus(new Path(path));
} catch (IOException e) {
e.printStackTrace();
}
for (FileStatus fileStatus : fileStatuses) {
if(fileStatus.isFile())
list.add(path);
}
return list;
}
/**
* @since 2018-07-18
* @author 杨赟
* @describe 列出文件夹
* @param path HDFS文件夹名
* @return 文件夹列表
*/
public static List<String> listDir(String path){
List<String> list = new ArrayList<>();
FileSystem fs = HDFSConfig.getFileSystem();
if(fs == null)
return list;
FileStatus[] fileStatuses = new FileStatus[0];
try {
fileStatuses = fs.listStatus(new Path(path));
} catch (IOException e) {
e.printStackTrace();
}
for (FileStatus fileStatus : fileStatuses) {
if(fileStatus.isDirectory())
list.add(path);
}
return list;
}
/**
* @since 2018-07-18
* @author 杨赟
* @describe 读文件
* @param name 文件名
* @return byte[]
*/
public static byte[] readFile(String name){
FileSystem fs = HDFSConfig.getFileSystem();
if(fs == null)
return null;
Path path = new Path(name);
try {
if (fs.exists(path)) {
FSDataInputStream is = fs.open(path);
FileStatus stat = fs.getFileStatus(path);
byte[] buffer = new byte[Integer.parseInt(String.valueOf(stat.getLen()))];
is.readFully(0, buffer);
is.close();
return buffer;
}
} catch (IOException e) {
e.printStackTrace();
}
return null;
}
/**
* @since 2018-07-18
* @author 杨赟
* @describe 文件操作测试
*/
public static void main(String[] args) {
HDFSUtil.download("/data/output.csv","/home/ubuntu/桌面");
}
}
5. Hadoop采用流数据访问模式的原因
现在,我们可以来总结一下Hadoop采用流数据访问模式的原因了。
原因一:写性能低于读性能
因为hadoop在写数据的时候要在namenode上进行meta元素的写入,与此同时还需要将数据写入到datanode上,如果是大型的数据,文件会分成很多个区块。这样在meta数据和块数据的写入时会产生大量的消耗。相对于读取操作来说,就会简单很多,只需要在namenode上查询到数据的meta信息,接着就不关namenode的事了。从而为namenode省下大量资源来进行其他的操作。
原因二:互斥锁影响性能
可以试想一下,当有多个用户同时写数据时,那么就要给写操作加互斥锁,而锁会大幅度降低性能。所以为了简单,就不能修改了,这是非常暴力但是却很有效的手段。
原因三:大文件修改意义不大
我们进行大数据分析时,并不会经常性的修改原始数据,而且对于大文件来说,想要在文件中定位位置是非常耗费时间的,还不如直接删除重新写入。
6. 总结
本文主要参考了《Hadoop权威指南》,对HDFS中相对基础的知识点进行了归纳总结。并通过Java编程实现了HDFS的文件操作工具类。