Java 接口
本文为 《Hadoop The Definitive Guide 4th Edition》的读书笔记(或者叫翻译),仅限交流使用, 转载请注明出处。
在本节中,我们开看看 Hadoop FileSystem 类。一个 Hadoop 文件系统的API。虽然我们只是探讨一下 HDFS 的实现,DistributedFileSystem。你也应该在必要时看看他的其他实现。
从 Hadoop URL 读取数据
读取数据的一个简单方式,是直接使用 java.net.URL 对象,用他来打开一个流并读取数据。
InputStream in = null;
try {
in = new URL("hdfs://host/path").openStream();
//process in
} finally {
IOUtils.closeStream(in);
}
完整的代码如下,注意看注释
例子3-1
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
/**
* 使用java.net.URL API读取HDFS中的数据
* Created by henvealf on 16-9-23.
*/
public class URLCat {
/**
* 因为下面的方法在同一个JVM中只能被调用一次,所以将其放在静态的代码快中就可以。
* 如果在这之前突然有一个在你控制之外的第三方的组件调用此方法。
* 你就不能在这样来获取Hadoop中的数据
*/
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws IOException {
InputStream in = null;
try {
// 这里注意,如果自己在core-site.xml中指定了端口号,在这里就一定要输入自己指定的端口号
// "hdfs://localhost:9000/user/henvealf/output4/part-r-00000"
in = new URL(args[0]).openStream();
// 使用下面的类的静态方法就能够很方便的将输入流中的数据输出到指定的输出流,这里是屏幕标准输出
// 第三个参数是设定流的缓冲区的大小,
// 第四个参数表示在复制数据结束后是否关闭流。
// 我们在下面自己关闭输入流,而标准输出流并不需要关闭
IOUtils.copyBytes(in, System.out, 4096, false);
} finally {
//最后不管程序有没有正常结束,都要将输入流关闭。
IOUtils.closeStream(in);
}
}
}
简单的运行:
% export HADOOP_CLASSPATH=hadoop-examples.jar
% hadoop URLCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty Tree
The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
使用FileSystem API 读取数据
前面已经说过,当无法使用 URLStreamHandlerFactory 的时候,就需要使用 FileSystem API 来打开一个文件的输入流。
在 Hadoop 文件系统中,一个文件代表使用一个 Path 对象来代表。也就是文件的路劲你可以将他认为是 Hadoop 文件系统的一个 URI,比如:
hdfs://localhost/user/tom/quangle.txt
FileSystem 是文件系统 API 中的一个抽象类,所以第一步就是检索到你想要使用的文件系统的实例,这里是HDFS,为了得到他,FileSystem 提供了一些静态工厂方法。
public static FileSystem get(Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf) throws IOException
public static FileSystem get(URI uri, Configuration conf, String name) throws IOException
一个 Configuration 对象代表一个客户端或者服务定义的配置,通过读取classpath中的配置文件得到,比如 etc/hadoop/core-site.xml。
- 第一个方法只传入了一个 Configuration 对象,然后返回一个默认的文件系统(在core-site.xml中定义的)。
- 第二个方法通过给定的 URI 模式和授权决定使用哪一个文件系统,如果所指定的文件系统找不到,就使用默认的文件系统。
- 第三个方法给定一个用户名,这个在控制上下文的安全中是非常重要的。
在一些情况下,你可能想要检索并获得一个本地文件系统的实例。这里有一个很简单的方法:
public static LocalFileSystem getLocal(Configuration conf) throws IOException
当你得到了一个文件系统的实例,我们就需要调用 open()方法打开一个文件的文件流。
public FSDataInputStream open(Path f) throws IOException
public abstract FSDataInputStream open(Path f, int bufferSize) throws IOException
第一个方法使用的缓冲区默认大小为4KB。
将他们放在一起,我们就可以重写 3-1 了,见3-2:
例子 3-2
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.io.InputStream;
import java.net.URI;
/**
* 使用FileSystem API读取HDFS中的数据,
* 如同在URLCat中提到的,有时候调用不到URLStreamHandlerFactory,这时候就可以使用FileSystem了。
* 在Hadoop文件系统中的文件是使用Path对象来代表的,就像 hdfs://localhost/user/tom/quangle.txt。
*
* Created by henvealf on 16-9-23.
*/
public class FileSystemCat {
public static void main(String[] args) throws IOException {
String uri = args[0];
// 实例化一个代表配置的对象,使用默认的配置文件。
Configuration conf = new Configuration();
// 得到相应文件系统的引用
FileSystem fs = FileSystem.get(URI.create(uri),conf);
InputStream in = null;
try {
// 真正的打开文件系统中的文件路径,并得到输出流,实际上是一个FSDataInputStream
// 关于FSDataInputStream请看FileSystemDoubleCat
in = fs.open(new Path(uri));
// 下面和URLCat相同
IOUtils.copyBytes(in, System.out, 4069, false);
} finally {
IOUtils.closeStream(in);
}
}
}
使用下面的命令运行程序:
% hadoop FileSystemCat hdfs://localhost/user/tom/quangle.txt
On the top of the Crumpetty
Tree The Quangle Wangle sat,
But his face you could not see,
On account of his Beaver Hat.
FSDataInputStream
在 FilSystem中的open() 方法事实上返回的是一个 FSDataInputSteam 而不是 java.io 类。这个类是一个特殊的能够支持随机存取的 java.io.DataInputSteam,所以你能读取流中的任何部分。
package org.apache.hadoop.fs
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable {
}
Seekable 接口允许寻找文件中的一个位置指针,并且提供了一个得到当前距离文件开始的偏移量的方法--getPos()
public interface Seekable {
void seek(long pos) throws IOException;
long getPos() throws IOException;
}
使用一个超出文件长度的指针作为参数调用 seek()的话,会抛出 IOException。他不像 java.io.InputStream 中的 skip() 方法,指针必须指向当前指针位置之后,seek()能够在文件中任意的移动。
看下面的一个例子。向文件中写两次数据:
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.IOException;
import java.net.URI;
/**
* FSDataInputStream的使用:
* 他是继承于标准IO的DataInputStream。
* 实现了Seekable接口,于是乎他就能够支持随机存取,
* 两个方法:
* void seek(long pos) throws IOException;
* long getPos() throws IOException;
* 还实现了PositionedReadable接口,所以你可以使用一个偏移量,这样你就能用他来读取到流中的任何部分。
* int read(long position, byte[] buffer, int offset, int length)
* throws IOException;
* void readFully(long position, byte[] buffer, int offset, int length)
* throws IOException;
* public void readFully(long position, byte[] buffer)
* throws IOException;
*
* Created by henvealf on 16-9-23.
*/
public class FileSystemDoubleCat {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri) ,conf);
FSDataInputStream in = null;
try {
in = fs.open(new Path(uri));
IOUtils.copyBytes(in, System.out, 1024, false);
in.seek(0);
IOUtils.copyBytes(in, System.out, 1024, false);
byte[] b3 = new byte[3];
byte[] b33 = new byte[6];
// 当都使用这个参数时,可以看出下面两个方法没有区别
in.read(1,b3,0,3);
in.readFully(1,b3,0,3);
// 当只使用这两个参数,就会从平position开始读数据,直到填充满Buffer,
// 当读的过程中遇到EOF,就会抛出异常。
in.readFully(1,b33);
System.out.println(new String(b3));
System.out.println(new String(b33));
} finally {
IOUtils.closeStream(in);
}
}
}
read方法的参数解释:
- long position : 文件里的位置指针
- byte[] buffer : 存放数据的缓冲区
- int offset: 缓冲区里的偏移量,意思是读取缓冲区时的起始位置。
- int length: 放在buffer中数据的长度,并不必须等于缓冲区的长度。
这些方法在维护当前偏移量的时候是线程安全的。所以当读取文件的主体内容时,他也能同时够很方便的读取文件的其他部分--比如元数据。
最后要注意 seek() 方法的使用,如果两次移动之间的距离过长,会影响到文件读取的效率。
写数据
package com.henvealf.learn.hadoop.filesystem.datawrite;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import java.io.*;
import java.net.URI;
/**
* 将本地的文件拷贝到HDFS文件系统中
*
* FSDataOutputStream
* FileSystem方法会返回一个FSDataOutputStream,
* FSDataOutputStream里有一个能都得到当前文件指针的方法。
* long getPos() throws IOException;
* 他只有这个方法,没有seek方法去任意移动指针。
*
* Directories //目录
* FileSystem 提供一个创建目录的方法:
* boolean mkdirs(Path f) throws IOException;
* 这个方法会自动创建不存在的父级目录。如果创建目录成功就返回true。
* 不过,你没必要单独来创建不存在的目录,因为create()方法也会自动将不存在的父级目录创建好。
* Created by henvealf on 16-9-23.
*/
public class FileCopyWithProcess {
public static void main(String[] args) throws IOException {
String localSrc = args[0];
String dst = args[1];
// 使用Java类库获取本地文件输入流
InputStream in = new BufferedInputStream(new FileInputStream(localSrc));
Configuration conf = new Configuration();
// 获取 FileSystem 对象
FileSystem fs = FileSystem.get(URI.create(dst),conf);
// 使用Hadoop的文件系统对象获取输入文件流。
// 如果目录与文件不存在,就自动创建目录与文件。
// 一个回调接口,Progressable,能够告诉你,你的文件写入进度。
OutputStream out = fs.create(new Path(dst),
() -> System.out.print("."));
// 开始写入
IOUtils.copyBytes(in, out, 4096, true);
// 还有一个
// public FSDataOutputStream append(Path f) throws IOException
// 用于在已有的文件最后追加内容.可以使用它来生成无边界的文件,例如日志文件。
// 该方法在并没有在所有的 Hadoop 文件系统中实现,比如HDFS中有,而S3中就没有。
}
}
FSDataOutputStream
FileSystem 的 create() 方法返回一个 FSOutputSteam,和输入流一样,他也有一个能够查询当前文件指针所在的位置:
package org.apache.hadoop.fs
public class FSDataOutputStream extends DataOutputStream implements Syncable {
public long getPos() throws IOException {
}
}
不过输入流没有 seek 方法,输出流自允许从头顺序写或者从已经存在的文件结尾追加。
Directories 目录
FileSystem 提供一个创建目录的方法:
- boolean mkdirs(Path f) throws IOException;
这个方法会自动创建不存在的父级目录。如果创建目录成功就返回true。
不过,你没必要单独来创建不存在的目录,因为create()方法也会自动将不存在的父级目录创建好。
查询文件系统
文件元数据:FileStatus
package com.henvealf.learn.hadoop.filesystem.dataquery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import java.io.IOException;
import java.io.OutputStream;
/**
* 查询文件系统中的文件的元数据,意思是找到某一个文件或者目录,以及他们的状态
* FileStatus中封装了文件系统中文件与目录的元数据。
* 如果查找的文件不存在,就会抛出FileNotFoundException异常。
* 为了处理这种情况,可以在使用文件的时候使用boolean exists(Path path) throws IOException 方法。
* Created by henvealf on 16-9-23.
*/
public class ShowFileStatusTest {
public static void main(String[] args) throws IOException {
MiniDFSCluster cluster;
FileSystem fs ;
Configuration conf = new Configuration();
if (System.getProperty("test.build.data") == null) {
System.setProperty("test.build.data", "/tmp");
}
cluster = new MiniDFSCluster.Builder(conf).build();
fs = cluster.getFileSystem();
OutputStream out = fs.create(new Path("dir/file"));
out.write("content".getBytes("UTF-8"));
out.close();
System.out.println("------初始化成功------");
System.out.println("-----查看文件状态------");
Path file = new Path("dir/file");
FileStatus stat = fs.getFileStatus(file);
printFileStatus(stat);
System.out.println("----查看目录状态-------");
Path dir = new Path("dir");
stat = fs.getFileStatus(dir);
printFileStatus(stat);
if (fs != null) {
fs.close();
}
if(cluster != null) {
cluster.shutdown();
}
System.out.println("-----关闭资源成功-----");
//
}
public static void printFileStatus(FileStatus stat) {
System.out.println("stat.getPath().toUri().getPath() > " + stat.getPath().toUri().getPath());
System.out.println("stat.isDirectory() > " + stat.isDirectory());
System.out.println("stat.getLen() > " + stat.getLen());
System.out.println("stat.getModificationTime() > " + stat.getModificationTime());
System.out.println("stat.getReplication() > " + stat.getReplication());
System.out.println("stat.getBlockSize() > " + stat.getBlockSize());
System.out.println("stat.getOwner() > " + stat.getOwner());
System.out.println("stat.getGroup() > " + stat.getGroup());
System.out.println("stat.getPermission().toString() > " + stat.getPermission().toString());
}
}
不多说,就是查看文件的原数据用。
文件列表
列出一个目录中所有文件和目录的信息。看代码中
package com.henvealf.learn.hadoop.filesystem.dataquery;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import java.io.IOException;
import java.net.URI;
/**
* 列出一个目录中的文件以及文件夹。
* 使用SystemFile的listStatus方法
* 一共重载了四个方法
* 参数分别为:
* Path f > 列出该目录下的所有文件和目录信息
* Path f, PathFilter filter > 在列出之前使用过滤器筛选出想要的结果。
* Path[] files > 同时列出若干个文件下的文件和目录信息
* Path[] files, PathFilter filter >
* 他们都返回一个 FileStatus[]
* Created by henvealf on 16-9-24.
*/
public class ListStatus {
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
// 根据路径和配置获得相应的文件系统引用
FileSystem fs = FileSystem.get(URI.create(uri), conf);
// 一个Path数组
Path[] paths = new Path[args.length];
for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
}
// 列出目录下所有文件的状态
FileStatus[] status = fs.listStatus(paths);
// 使用工具类将FileStatus[]转换为Path[], 数组
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths) {
System.out.println(p);
}
}
}
文件模式匹配
我们常常需要对多个文件进行相同的操作。相比于挨个的将文件的路径添加到程序中,我们可以使用通配符来自动将匹配的文件路径放入我们的代码中。也就是一团文件(globbin),FileSystem 为此提供了两个方法:
public FileStatus[] globStatus(Path pathPattern) throws IOException
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException
可以发现他返回一个 FileStatus 数组。除此之外还有一个 PathFilter 选项来进行进一步的过滤。
Hadoop 支持的路径通配符和Unix bash shell 相同。
PathFiler 路径过滤器
使用通配符选择多个文件有时候并不能满足我们的需求。这时候我们就可以使用上面提到的 PathFilter 选项了。
他是一个接口类:
package org.apache.hadoop.fs
public interface PathFilter {
boolean accept(Path path);
}
这个类和Java类库中的FileFilter很相似。
下面是他的一个实现:
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
/**
* PathFilter接口
* 使用该接口能够让我们使用正则表达式来筛选出想要的Path。
*
* 删除数据
* 使用
* Created by henvealf on 16-9-24.
*
*
*/
public class RegexExcludePathFilter implements PathFilter{
private final String regex;
public RegexExcludePathFilter(String regex) {
this.regex = regex;
}
@Override
public boolean accept(Path path) {
return !path.toString().matches(regex);
}
// 使用实例
// fs.globStatus(new Path("/2007/*/*"), new RegexExcludePathFilter("^.*/2007/12/31$"))
// 将会得到 /2007/12/13
}
该实现类会进一步过滤掉那些满足了该正则表达式的路径。
删除文件
public boolean delete(Path f, boolean recursive) throws IOException
使用该方法就可以删除文件或者目录。
第二个参数是控制删除文件时是或否递归删除其中的所有目录文件。同 rm -r 属性。