hadoop 的数据完整性
数据存储或 IO 时可能损坏,需要使用校验和来检查数据完整性(校验和当然也会出错,但是由于校验和数据量很低,所以出错的概率也很低)。常用的校验和有 CRC-32
HDFS 数据完整性
对于每 io.bytes.per.checksum 字节数据会计算一次校验和,默认 512 字节计算一次,产生 4 自己校验和。数据节点会运行一个 DataBlockScanner,检查数据的校验和,如果有出错的就从别的 datanode 复制过来修复之。
本地文件系统
在写入文件 filename 时,会同时写一个 .filename.crc 文件,记录校验和。
压缩
压缩可以减少在网络上传输的数据。Hadoop 的压缩是通过压缩编码解码器实现的。
压缩的方法,使用 createOutputStream 获取输出流,压缩数据:
解压缩的方法,使用扩展名判断需要使用的解码器,然后使用 createInputStream 将数据解压缩:
压缩和解压缩时可以使用本地库,通过 java.library.path 设置。
需要注意,压缩后,文件可能不再支持分割,会对 “数据本地化” 造成影响,需要考虑是否压缩,使用什么算法压缩。
map-reduce 中使用压缩
mapReduce 读取的数据如果是压缩过的,mapReduce 读取前会根据扩展名选择合适的 codec 解压缩,但是对输出要进行特别的设置才会输出压缩文件。
FileOutputFormat.setCompressOutput(job, true);
FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
map 结果的压缩
如果使用 lzo 等快速压缩算法,可以减少 shuffle 阶段的数据传输量。
下面的代码可以使用 gunzip 压缩 map 结果。
序列化
hadoop 使用 Writables 作为自己的序列化格式。
Writable 接口
该接口的定义如下:
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface Writable {
/**
* Serialize the fields of this object to <code>out</code>.
*
* @param out <code>DataOuput</code> to serialize this object into.
* @throws IOException
*/
void write(DataOutput out) throws IOException;
/**
* Deserialize the fields of this object from <code>in</code>.
*
* <p>For efficiency, implementations should attempt to re-use storage in the
* existing object where possible.</p>
*
* @param in <code>DataInput</code> to deseriablize this object from.
* @throws IOException
*/
void readFields(DataInput in) throws IOException;
}
对于 IntWritable,实现了一个扩展 Comparator 的 RawComparator,可以在不反序列化的情况下比较数据大小,可以提升处理效率。
@InterfaceAudience.Public
@InterfaceStability.Stable
public interface RawComparator<T> extends Comparator<T> {
/**
* Compare two objects in binary.
* b1[s1:l1] is the first object, and b2[s2:l2] is the second object.
*
* @param b1 The first byte array.
* @param s1 The position index in b1. The object under comparison's starting index.
* @param l1 The length of the object in b1.
* @param b2 The second byte array.
* @param s2 The position index in b2. The object under comparison's starting index.
* @param l2 The length of the object under comparison in b2.
* @return An integer result of the comparison.
*/
public int compare(byte[] b1, int s1, int l1, byte[] b2, int s2, int l2);
}
先扩展了 WritableComparator 然后 注册之:
/** A Comparator optimized for IntWritable. */
public static class Comparator extends WritableComparator {
public Comparator() {
super(IntWritable.class);
}
@Override
public int compare(byte[] b1, int s1, int l1,
byte[] b2, int s2, int l2) {
int thisValue = readInt(b1, s1);
int thatValue = readInt(b2, s2);
return (thisValue<thatValue ? -1 : (thisValue==thatValue ? 0 : 1));
}
}
static { // register this comparator
WritableComparator.define(IntWritable.class, new Comparator());
}
下面是各种 Writable 的继承结构,注意其中的可变长编码 VIntWritable 和 VLongWritable 其实是相同的:
(1)Text
类比于 String,用变长 int 存储字符串长度,最大支持 2GB 字符串。Text 的 length() 返回的是 编码的字节数,find() 返回的也是字节偏移量,所以逐个读取 Text 中的字符比较复杂,需要借助一个 ByteBuffer 实现:
ObjectWritable 和 GenericWritable:封装多种不同的类型,ObjectWritable 可以序列化时需要写入 被封装对象的类型,而 GenericWritable 则通过一个静态数组记录索引和被封装类型的关系,序列化时写入索引即可。
(2).集合
集合包括 ArrayWritable、ArrayPrimitiveWritable、TwoDArrayWritable、MapWritable、SortedMapWritable、EnumSetWritable
对于 ArrayWritable、TwoDArrayWritable 需要设置其包含的元素的类型(也可以通过继承来实现):
ArrayWritable writable = new ArrayWritable(Text.class);
对于 MapWritable 和 SortedMapWritable,其实可以存储不同类型的 key-value 对:
诸如 Set 等结合可以靠已有的集合实现。
自定义 Writable
自定义 Writable 一般可以通过实现 WritableComparable 然后实现各种方法来完成。比较重要的是实现一个快速比较的 RawComparator:
由于 Text 使用 VInt 存储字符串长度,所以先获取 Vint 的size,然后读取字符串的长度,就知道了第一个 Text 在 b1 中的位置,对于 第二个 Text 同理。
SequenceFile
SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件,也很适合将小文件整合起来以便于更高效的处理。下面是一个 demo,说明如何生成 SequenceFile:
SequenceFile 的生成与读取
public class SequenceFileWriteDemo {
private static final String[] DATA = { "One, two, buckle my shoe",
"Three, four, shut the door", "Five, six, pick up sticks",
"Seven, eight, lay them straight", "Nine, ten, a big fat hen" };
public static void main(String[] args) throws IOException {
String uri = args[0];
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://xxx.xxx.xxx.xx:9000");
FileSystem fs = FileSystem.get(URI.create(uri), conf);
Path path = new Path(uri);
IntWritable key = new IntWritable();
Text value = new Text();
SequenceFile.Writer writer = null;
try {
String compressType = args[1];
System.out.println("compressType "+compressType);
// Writer : Uncompressed records.
if(compressType.equals("1") ){
System.out.println("compress none");
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.NONE);
}else if(compressType .equals("2") ){
System.out.println("compress record");
//RecordCompressWriter : Record-compressed files, only compress values.
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.RECORD);
}else if(compressType.equals("3") ){
System.out.println("compress block");
// BlockCompressWriter : Block-compressed files, both keys & values are collected in 'blocks' separately and compressed. The size of the 'block' is configurable.
writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),value.getClass(),CompressionType.BLOCK);
}
for (int i = 0; i < 100; i++) {
key.set(100 - i);
value.set(DATA[i % DATA.length]);
System.out.printf("[%s]\t%s\t%s\n", writer.getLength(), key,value);
writer.append(key, value);
}
} finally {
IOUtils.closeStream(writer);
}
}
}
然后就是如何从生成的 SequenceFile 中读取数据了:
注意2点:
- 使用反射生成 key、value 对象,可以应用于所有类型的 key 和 value
- syncSeen() 返回写入文件时生成的 “同步点”,表示对象的边界,帮助 reader 找到对象。
使用 Seek(int position) 可以切到 position 指定的位置,使用 sync(int position) 可以切到 position 后的第一个同步点(个人感觉并没有什么鸟用啊?)。
命令行 API
查看文件,注意 key 和 value 必须有 toString 才能显示,并且 key 与 value 的 class 文件必须在 hadoop 的 classpath 中。
hdfs dfs -text numbers.seq | head
SequenceFile 的格式
无压缩或使用 record 压缩的 SequenceFile 由一个头部以及一个或多个 record 组成:
如果启用了 block 压缩,则结构会更紧凑:
MapFile
MapFile 是经过排序的且带索引的 SequenceFile,可以类比于 Map。写 MapFile 与写 SequenceFile 几乎相同,只不过 Writer 使用 MapFile.Writer,不过会生成一个目录,目录下有 data 和 index 两个文件,使用下面命令可以查看文件的内容:
hdfs dfs -text numbers.map/data | head
hdfs dfs -text numbers.map/index | head
默认情况下,索引只有 128 个,可以通过 MapFile.Writer 的 setIndexInterval 改变这个值。
读取方式与 SequenceFile 相同,创建一个 MapFile.reader(),一直调用 next 直到返回 false。不同的是,也可以随机读取:
类似的还有 SetFile,ArrayFile 以及 BloomMapFile。