Java读写Hadoop文件

1、从Hadoop URL读取文件

示例一:

import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;

import java.io.IOException;
import java.io.InputStream;
import java.net.URL;
public class HDFSFileReader {

    static {
        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    public static void main(String[] args) {
        try {
            InputStream inputStream = new URL("hdfs://192.168.1.100:9000/home/ossuser/test.txt").openStream();
            IOUtils.copyBytes(inputStream, System.out, 4096, true);
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory())使得Java程序能够识别hdfs url schema;全局只能调用一次,若其他组件已声明一个URLStreamHandlerFactory实例,将无法使用当前示例读取Hadoop文件数据。
copyBytes最后两个参数,第一个设置复制缓冲区大小,第二个设置复制结束后是否关闭数据流。

示例二:

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;

import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HDFSFileReader1 {

    public static void main(String[] args) {
        Configuration conf = new Configuration();

        try {
            URI uri = new URI("hdfs://192.168.1.100:9000/home/ossuser/test.txt");

            FileSystem fs = FileSystem.get(uri, conf);

            Path path = new Path(uri);
            FSDataInputStream fsDataInputStream = fs.open(path);
            IOUtils.copyBytes(fsDataInputStream, System.out, 4096, true);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (URISyntaxException e) {
            e.printStackTrace();
        }
    }
}
  • public static FileSystem get(Configuration conf) throws IOException返回默认文件系统(在 core-site.xml指定,若没有设置,返回默认的本地文件系统)
  • public static FileSystem get(URI uri, Configuration conf) throws IOException通过给定的URI方案确定要使用的文件系统,若URI没有指定方案,返回默认文件系统
  • public static FileSystem get(final URI uri, final Configuration conf, String user) throws IOException, InterruptedException使用给定用户访问文件系统

FSDataInputStream对象
FSDataInputStream继承了java.io.DataInputStream类,实现了Seekable接口,可从流任意位置开始读取文件,

public interface Seekable {
    void seek(long pos) throws IOException;

    long getPos() throws IOException;

    @Private
    boolean seekToNewSource(long var1) throws IOException;
}
  • seek()定位长度大于文件长度,会抛出IOException
  • seek()绝对值定位,java.io.InputStream.skip()相对位置定位
public class HDFSFileReader2 {

    public static void main(String[] args) {
        Configuration configuration = new Configuration();
        try {
            URI uri = new URI("hdfs://192.168.1.100:9000/home/ossuser/test.txt");

            FileSystem fileSystem = FileSystem.get(uri, configuration);

            FSDataInputStream fsDataInputStream = fileSystem.open(new Path(uri));
            IOUtils.copyBytes(fsDataInputStream, System.out, 4096, false);  -- 必须false

            System.out.println("-----------------------------");
            fsDataInputStream.seek(5);
            IOUtils.copyBytes(fsDataInputStream, System.out, 4096, true);
        } catch (URISyntaxException e) {
            e.printStackTrace();
        } catch (IOException e) {
            e.printStackTrace();
        }
    }
}

输出:

line1
line2
-----------------------------
line2

FSDataInputStream也实现了PositionedReadable接口,从一个指定偏移量处读取文件的一部分

public interface PositionedReadable {
  /**
   * Read up to the specified number of bytes, from a given
   * position within a file, and return the number of bytes read. This does not
   * change the current offset of a file, and is thread-safe.
   *
   * <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
   * @param position position within file
   * @param buffer destination buffer
   * @param offset offset in the buffer
   * @param length number of bytes to read
   * @return actual number of bytes read; -1 means "none"
   * @throws IOException IO problems.
   */
  int read(long position, byte[] buffer, int offset, int length)
    throws IOException;
  
  /**
   * Read the specified number of bytes, from a given
   * position within a file. This does not
   * change the current offset of a file, and is thread-safe.
   *
   * <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
   * @param position position within file
   * @param buffer destination buffer
   * @param offset offset in the buffer
   * @param length number of bytes to read
   * @throws IOException IO problems.
   * @throws EOFException the end of the data was reached before
   * the read operation completed
   */
  void readFully(long position, byte[] buffer, int offset, int length)
    throws IOException;
  
  /**
   * Read number of bytes equal to the length of the buffer, from a given
   * position within a file. This does not
   * change the current offset of a file, and is thread-safe.
   *
   * <i>Warning: Not all filesystems satisfy the thread-safety requirement.</i>
   * @param position position within file
   * @param buffer destination buffer
   * @throws IOException IO problems.
   * @throws EOFException the end of the data was reached before
   * the read operation completed
   */
  void readFully(long position, byte[] buffer) throws IOException;
}

read()方法,从文件position位置开始,读取最多length长度数据,存入至buffer指定偏移量offset处,返回值为实际读取的字节数,可能小于length,-1代表未读取到字节;readFully从文件position位置开始,读取指定length长度的字节至buffer,若读取字节数未达到length,就已到达文件尾,抛出EOFException异常.
所有这些方法会保留文件的偏移量,且是线程安全,在读取文件的主体时,可以访问文件的其他部分。
seek()是高开销的操作

2、Hadoop写文件

FileSystem有两种创建文件的方式:

  • public FSDataOutputStream create(Path f, Progressable progress) throws IOException;
  • public FSDataOutputStream append(Path f, int bufferSize, Progressable progress) throws IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.util.Progressable;

import java.io.FileInputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

public class HDFSFileWriter {

    public static void main(String[] args) {
        try {
            //System.setProperty("HADOOP_USER_NAME", "ossuser");

            URI uri = URI.create("hdfs://192.168.1.100:9000/home/ossuser/log1.txt");

            Configuration configuration = new Configuration();
            FileSystem fs = FileSystem.get(uri, configuration, "ossuser");
            FSDataOutputStream fos = fs.create(new Path(uri), new CreateHDFSFileProgress("log1.txt"));

            FileInputStream fis = new FileInputStream("D:/temp/xx.txt.tmp");

            IOUtils.copyBytes(fis, fos, 4096, true);
        } catch (IOException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }

    static class CreateHDFSFileProgress implements Progressable {

        private String fileName;
        private int count;

        public CreateHDFSFileProgress(String fileName) {
            this.fileName = fileName;
        }

        @Override
        public void progress() {
            count ++;
            System.out.println(fileName + " has written " + (count * 64) + " KB");
        }
    }
}

本地开发调试,将以当前操作系统用户名往HDFS目标文件夹写文件,会报无权限异常,如下:

org.apache.hadoop.security.AccessControlException: Permission denied: user=z00442530, access=WRITE, inode="/home/ossuser":ossuser:supergroup:drwxr-xr-x

两种方式设置用户名:

  • System.setProperty("HADOOP_USER_NAME", "ossuser");
  • FileSystem fs = FileSystem.get(uri, configuration, "ossuser");

create方法有多个重载版本,可传入包括是否覆盖文件、缓冲区大小、副本数、块大小、文件权限、进度回调接口
Hadoop每写入约64KB数据至datanod后,即调用一次progess()方法

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,612评论 5 471
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,345评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 147,625评论 0 332
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,022评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,974评论 5 360
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,227评论 1 277
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,688评论 3 392
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,358评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,490评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,402评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,446评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,126评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,721评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,802评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,013评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,504评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,080评论 2 341