hadoop 学习笔记(4)-- hadoop IO

hadoop 的数据完整性

数据存储或 IO 时可能损坏,需要使用校验和来检查数据完整性(校验和当然也会出错,但是由于校验和数据量很低,所以出错的概率也很低)。常用的校验和有 CRC-32

HDFS 数据完整性

对于每 io.bytes.per.checksum 字节数据会计算一次校验和,默认 512 字节计算一次,产生 4 自己校验和。数据节点会运行一个 DataBlockScanner,检查数据的校验和,如果有出错的就从别的 datanode 复制过来修复之。

本地文件系统

在写入文件 filename 时,会同时写一个 .filename.crc 文件,记录校验和。

压缩

压缩可以减少在网络上传输的数据。Hadoop 的压缩是通过压缩编码解码器实现的。

可选压缩编码解码器

压缩的方法,使用 createOutputStream 获取输出流,压缩数据:

解压缩的方法,使用扩展名判断需要使用的解码器,然后使用 createInputStream 将数据解压缩:

压缩和解压缩时可以使用本地库,通过 java.library.path 设置。

需要注意,压缩后,文件可能不再支持分割,会对 “数据本地化” 造成影响,需要考虑是否压缩,使用什么算法压缩。

map-reduce 中使用压缩

mapReduce 读取的数据如果是压缩过的,mapReduce 读取前会根据扩展名选择合适的 codec 解压缩,但是对输出要进行特别的设置才会输出压缩文件。

FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);

map 结果的压缩

如果使用 lzo 等快速压缩算法,可以减少 shuffle 阶段的数据传输量。

map可选压缩属性

下面的代码可以使用 gunzip 压缩 map 结果。

序列化

hadoop 使用 Writables 作为自己的序列化格式。

Writable 接口

该接口的定义如下:

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
  /** 
   * Serialize the fields of this object to <code>out</code>.
   * 
   * @param out <code>DataOuput</code> to serialize this object into.
   * @throws IOException
   */
  void write(DataOutput out) throws IOException;

  /** 
   * Deserialize the fields of this object from <code>in</code>.  
   * 
   * <p>For efficiency, implementations should attempt to re-use storage in the 
   * existing object where possible.</p>
   * 
   * @param in <code>DataInput</code> to deseriablize this object from.
   * @throws IOException
   */
  void readFields(DataInput in) throws IOException;
}

对于 IntWritable,实现了一个扩展 Comparator 的 RawComparator,可以在不反序列化的情况下比较数据大小,可以提升处理效率。

@InterfaceAudience.Public
@InterfaceStability.Stable
public interface RawComparator<T> extends Comparator<T> {

  /**
   * Compare two objects in binary.
   * b1[s1:l1] is the first object, and b2[s2:l2] is the second object.
   * 
   * @param b1 The first byte array.
   * @param s1 The position index in b1. The object under comparison's starting index.
   * @param l1 The length of the object in b1.
   * @param b2 The second byte array.
   * @param s2 The position index in b2. The object under comparison's starting index.
   * @param l2 The length of the object under comparison in b2.
   * @return An integer result of the comparison.
   */
  public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}

先扩展了 WritableComparator 然后 注册之:

 /** A Comparator optimized for IntWritable. */ 
  public static class Comparator extends WritableComparator {
    public Comparator() {
      super(IntWritable.class);
    }
    
    @Override
    public int compare(byte[] b1, int s1, int l1,
                       byte[] b2, int s2, int l2) {
      int thisValue = readInt(b1, s1);
      int thatValue = readInt(b2, s2);
      return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
    }
  }

  static {                                        // register this comparator
    WritableComparator.define(IntWritable.class, new Comparator());
  }

下面是各种 Writable 的继承结构,注意其中的可变长编码 VIntWritable 和 VLongWritable 其实是相同的:

(1)Text
类比于 String,用变长 int 存储字符串长度,最大支持 2GB 字符串。Text 的 length() 返回的是 编码的字节数,find() 返回的也是字节偏移量,所以逐个读取 Text 中的字符比较复杂,需要借助一个 ByteBuffer 实现:

ObjectWritable 和 GenericWritable:封装多种不同的类型,ObjectWritable 可以序列化时需要写入 被封装对象的类型,而 GenericWritable 则通过一个静态数组记录索引和被封装类型的关系,序列化时写入索引即可。

(2).集合
集合包括 ArrayWritable、ArrayPrimitiveWritable、TwoDArrayWritable、MapWritable、SortedMapWritable、EnumSetWritable

对于 ArrayWritable、TwoDArrayWritable 需要设置其包含的元素的类型(也可以通过继承来实现):

ArrayWritable writable = new ArrayWritable(Text.class);

对于 MapWritable 和 SortedMapWritable,其实可以存储不同类型的 key-value 对:

诸如 Set 等结合可以靠已有的集合实现。

自定义 Writable

自定义 Writable 一般可以通过实现 WritableComparable 然后实现各种方法来完成。比较重要的是实现一个快速比较的 RawComparator:

由于 Text 使用 VInt 存储字符串长度,所以先获取 Vint 的size,然后读取字符串的长度,就知道了第一个 Text 在 b1 中的位置,对于 第二个 Text 同理。

SequenceFile

SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件,也很适合将小文件整合起来以便于更高效的处理。下面是一个 demo,说明如何生成 SequenceFile:

SequenceFile 的生成与读取

public class SequenceFileWriteDemo {
    private static final String[] DATA = { "One, two, buckle my shoe",
            "Three, four, shut the door", "Five, six, pick up sticks",
            "Seven, eight, lay them straight", "Nine, ten, a big fat hen" };
 
    public static void main(String[] args) throws IOException {
        String uri = args[0];
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://xxx.xxx.xxx.xx:9000");
        FileSystem fs = FileSystem.get(URI.create(uri), conf);
        Path path = new Path(uri);
        IntWritable key = new IntWritable();
        Text value = new Text();
        SequenceFile.Writer writer = null;
        try {
            String compressType = args[1];
            System.out.println("compressType "+compressType);
             
                //  Writer : Uncompressed records. 
            if(compressType.equals("1") ){
                System.out.println("compress none");
                writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.NONE);
            }else if(compressType .equals("2") ){
                System.out.println("compress record");
                //RecordCompressWriter : Record-compressed files, only compress values. 
                writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.RECORD);   
            }else if(compressType.equals("3") ){
                System.out.println("compress block");
                //  BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable. 
                writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.BLOCK);
            }
             
            for (int i = 0; i < 100; i++) {
                key.set(100 - i);
                value.set(DATA[i % DATA.length]);
                System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,value);
                writer.append(key, value);
                 
            }
        } finally {
            IOUtils.closeStream(writer);
        }
    }
}

然后就是如何从生成的 SequenceFile 中读取数据了:

注意2点:

  1. 使用反射生成 key、value 对象,可以应用于所有类型的 key 和 value
  2. syncSeen() 返回写入文件时生成的 “同步点”,表示对象的边界,帮助 reader 找到对象。

使用 Seek(int position) 可以切到 position 指定的位置,使用 sync(int position) 可以切到 position 后的第一个同步点(个人感觉并没有什么鸟用啊?)。

命令行 API

查看文件,注意 key 和 value 必须有 toString 才能显示,并且 key 与 value 的 class 文件必须在 hadoop 的 classpath 中。

hdfs dfs -text numbers.seq | head

SequenceFile 的格式

无压缩或使用 record 压缩的 SequenceFile 由一个头部以及一个或多个 record 组成:

如果启用了 block 压缩,则结构会更紧凑:

MapFile

MapFile 是经过排序的且带索引的 SequenceFile,可以类比于 Map。写 MapFile 与写 SequenceFile 几乎相同,只不过 Writer 使用 MapFile.Writer,不过会生成一个目录,目录下有 data 和 index 两个文件,使用下面命令可以查看文件的内容:

hdfs dfs -text numbers.map/data | head
hdfs dfs -text numbers.map/index | head

默认情况下,索引只有 128 个,可以通过 MapFile.Writer 的 setIndexInterval 改变这个值。

读取方式与 SequenceFile 相同,创建一个 MapFile.reader(),一直调用 next 直到返回 false。不同的是,也可以随机读取:

类似的还有 SetFile,ArrayFile 以及 BloomMapFile。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容

  • 思考问题 HDFS的IO操作总结 Hadoop工程下与I/O相关的包如下: org.apache.hadoop.i...
    Sakura_P阅读 442评论 0 2
  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,579评论 18 139
  • 1. Java基础部分 基础部分的顺序:基本语法,类相关的语法,内部类的语法,继承相关的语法,异常的语法,线程的语...
    子非鱼_t_阅读 31,555评论 18 399
  • Spark SQL, DataFrames and Datasets Guide Overview SQL Dat...
    草里有只羊阅读 18,284评论 0 85
  • ——李沧东电影《密阳》 石衡潭 李沧东的电影《密阳》刺痛了不少人,也惹恼了不少人。这是一部涉及基督徒生活的电影。信...
    艾尚萱阅读 368评论 0 0