avro源码阅读-读avro文件

在实际工作中,会将avro文件读到一个类中。下面就看看代码是如何实现的(这里看的是avro-1.7.7版本)。

读取整个avro文件的类是DataFileReader,用于读取data的类是GenericDatumReader(data指的是用户的数据),有关avro文件结构可参见avro源码阅读-写avro文件中的图。

我们从AvroPairInputFormat切入看下DataFileReader和SpecificDatumReader的使用。

如果你看过了avro源码阅读-写avro文件,那你会觉得很熟悉。
在读avro文件时,有DataFileReader和AvroPairInputFormat。
在写avro文件时,有DataFileWrite和AvroPairOutputFormat。

AvroInputFormat

  1. AvroInputFormat.getRecordReader()返回的是AvroRecordReader;
  2. AvroRecordReader.next()读取下一条记录;
  3. AvroRecordReader.next()又调用了DataFileReader.next();
  4. DataFileReader.next()调用了GenericDatumReader.read()。
    其中,
  5. DataFileReader的next()和reader继承自DataFileStream;
  6. GenericDatumReader.read(reuse, decoder)参数中的decoder是BinaryDecoder。

类的继承及引用关系图如下:

GenericDatumReader有关类的继承及引用图

对于DataFileReader读取整个avro文件的代码这里不再详述,这里主要关心GenericDatumReader是如何读取data的。

GenericDatumReader

GenericDatumReader.read()的构造了一个ResolvingDecoder。ResolvingDecoder维护了一个解码器Decoder和解析器SkipParser。SkipParser维护了一个语法堆栈,用于指导下一步该如何处理(如是读数据还是跳过,若读数据是什么类型的数据)。Decoder实际上是BinaryDecoder,在需要读取数据时,由它从avro文件中读取一小块内容到内存中。ResolvingDecoder还引用了ResolvingGrammarGenerator,该类用于生成语法,语法由Symbol类表示。这几个类的关系图如下所示:

ResolvingDecoder有关类的继承和引用
  1. Symbol
    SkipParser维护了一个语法堆栈(从Parser继承而来,实现上是一个数组),Symbol就是存储在堆栈中的元素。Symbol就像是写avro文件时用到的Schema。写avro文件时,是将一条数据按照schema写到文件中;而读avro文件时,则是按照Symbol将一条数据读取到内存中。

  2. SkipParser
    SkipParser维护了一个Symbol的数组,也就是一个堆栈,每次弹出栈顶元素,然后执行相应动作:
    -将栈顶元素扩展后再压栈;
    -直接从avro文件中读取一小块数据;
    -其他操作比如跳过。

  3. ResolvingGrammarGenerator
    ResolvingGrammarGenerator用于生成压入堆栈中的第一个元素(根元素)。

代码里对这几个类的注释:
Symbol:Symbol is the base of all symbols (terminals and non-terminals) of the grammar.
SkipParser:A parser that capable of skipping as well read and write. This class is used by decoders who (unlink encoders) are required to implement methods to skip.
ResolvingGrammarGenerator:The class that generates validating grammar.

Symbol

和Schema一样,Symbol是个抽象类,它有很多具体的实现。这里关注三种实现:

  1. Terminal: 一个基本类型如int、long等,就对应一个terminal。当弹出的栈顶元素为terminal时,就表示应该从数据中读取某个类型对应大小的bytes了。
  2. Sequence:比如说一个类,就对应一个Sequence。
    Symbol本身又维护了一个Symbol数组production,production表示该Symbol的产物。
    Symbol本身包含一个Symbol数组,就像类包含的成员变量也可以是其他类一样。
    他们的对应关系可描述为下图:


    Sequence和Class的对应关系

    Terminal的production是空的,而Sequence的production包含了其产物。

  3. Root:根Symbol,就是第一个压入语法堆栈的元素。

GenericDatumReader.read()代码如下:
<pre>
public D read(D reuse, Decoder in) throws IOException {
ResolvingDecoder resolver = getResolver(actual, expected); //ResolvingDecoder的初始化
resolver.configure(in); //将ResolvingDecoder 的解码器配置为in(即BInaryDecoder)
D result = (D) read(reuse, expected, resolver); //读取一条记录到reuse中
resolver.drain();
return result;
}
</pre>

下面分别介绍:

  1. ResolvingDecoder的初始化;
  2. 读取一条记录到reuse中

ResolvingDecoder的初始化

ResolvingDecoder初始化的时候,new了一个SkipParser,生成Root Symbol并将其压入了SkipParser维护的堆栈中。初始化中复杂之处在于Root Symbol的生成,生成Root Symbol的代码如下:
<pre>
return Symbol.root(generate(writer, reader, new HashMap<LitS, Symbol>()));
</pre>

这里有两步调用:

  1. generate(writer, reader, map);
  2. Symbol.root(symbol)

generate(writer, reader, map)

writer指的是avro文件中记录的schema,reader指的是类的schema。

  1. 当writer和reader的类型不同时,generate()会推测可能是UNION导致的,或者进行一些类型转换,否则会抛出异常;

  2. 当writer和reader的类型相同时,代码如下:
    <pre>
    public Symbol generate(Schema writer, Schema reader,
    Map<LitS, Symbol> seen) throws IOException {
    final Schema.Type writerType = writer.getType();
    final Schema.Type readerType = reader.getType();

    if (writerType == readerType) {
    switch (writerType) {
    case NULL:
    return Symbol.NULL;
    case BOOLEAN:
    return Symbol.BOOLEAN;
    case INT:
    return Symbol.INT;
    case LONG:
    return Symbol.LONG;
    ...
    case RECORD:
    return resolveRecords(writer, reader, seen);
    ...
    }
    </pre>

即使类型相同,writer和reader的schema也可能有所差别,比如用新的类(可能增加了一些字段,或是字段顺序有所调整)来读取历史文件。resolveRecords()会综合writerSchema和readerSchema中的字段,生成并返回一个Symbol。主要的情况有下面两种:
-wirterSchema中有,但是readerSchema中没有的字段,将会被跳过;
-readerSchema中有,但是writerSchema中没有的字段,必须有默认值,否则就会出错。
resolveRecords()的代码如下:
<pre>
private Symbol resolveRecords(Schema writer, Schema reader,
Map<LitS, Symbol> seen) throws IOException {
LitS wsc = new LitS2(writer, reader);
Symbol result = seen.get(wsc); //先从缓存中取(根据writer和reader构造symbol也是一件比较耗时的事情)
if (result == null) {
List<Field> wfields = writer.getFields();
List<Field> rfields = reader.getFields();

// First, compute reordering of reader fields, plus
// number elements in the result's production
Field[] reordered = new Field[rfields.size()];
int ridx = 0;
int count = 1 + wfields.size(); //count是Symbol的产物production的大小,也就是writerSchema展开后字段的个数(既然是Record类型,肯定是有产物的嘛)
//这里的"1"被fieldOrderAction占用的

//将在writerSchema和readerSchema中都存在的字段,按照writerSchema中的顺序,存储在reordered中
//因为最终是要从文件中依次将数据读出来,所以要按照writerSchema中的字段顺序
for (Field f : wfields) {
Field rdrField = reader.getField(f.name());
if (rdrField != null) {
reordered[ridx++] = rdrField;
}
}

//处理readerSchema中有,而writerSchema中没有的字段(如类新增了一个字段,这在历史数据中是没有的)
//此时,该字段必须有自己的默认值,否则就会返回错误信息
for (Field rf : rfields) {
String fname = rf.name();
if (writer.getField(fname) == null) {
if (rf.defaultValue() == null) {
result = Symbol.error("Found " + writer.getFullName()
+ ", expecting " + reader.getFullName()
+ ", missing required field " + fname);
seen.put(wsc, result);
return result;
} else {
reordered[ridx++] = rf;
count += 3;//使用默认值的字段需要占用3个位置
}
}
}

Symbol[] production = new Symbol[count]; //初始化production数组
production[--count] = Symbol.fieldOrderAction(reordered); //FieldOrderAction记录了production中其他各个字段的schema

/**

  • We construct a symbol without filling the array. Please see
  • {@link Symbol#production} for the reason.
    */
    result = Symbol.seq(production); //生成了一个Sequence Symbol
    seen.put(wsc, result); //缓存

/*

  • For now every field in read-record with no default value
  • must be in write-record.
  • Write record may have additional fields, which will be
  • skipped during read.
    */

// Handle all the writer's fields
for (Field wf : wfields) {
String fname = wf.name();
Field rf = reader.getField(fname);
if (rf == null) { //如果writerSchema中有的字段,readerSchema中没有(比如类的某些字段删掉了,但是历史数据中还存在),则标识为跳过
production[--count] =
Symbol.skipAction(generate(wf.schema(), wf.schema(), seen));
} else {
production[--count] =
generate(wf.schema(), rf.schema(), seen); //递归的调用generate()(产物production中可能也有一些复杂的结构)
}
}

// Add default values for fields missing from Writer
for (Field rf : rfields) {
String fname = rf.name();
Field wf = writer.getField(fname);
if (wf == null) { //处理readerSchema中有,而writerSchema中没有,但是有默认值的字段(上面提到过了),可以看到,这里需要使用三个位置
byte[] bb = getBinary(rf.schema(), rf.defaultValue());
production[--count] = Symbol.defaultStartAction(bb);
production[--count] = generate(rf.schema(), rf.schema(), seen);
production[--count] = Symbol.DEFAULT_END_ACTION;
}
}
}
return result;
}
</pre>

举个栗子,如writerSchema和readerSchema都是下面这样。
<pre>
{
"type": "record",
"name": "User",
"namespace": "test",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string",
"default": null
},
{
"name": "dog",
"type": {
"name": "Dog",
"type": "record",
"fields": [
{
"name": "id",
"type": "int"
},
{
"name": "name",
"type": "string",
"default": null
}
]
}
}
]
}
</pre>

最终generate返回的Symbol结构如下(图最左边的sequenceSymbol0即是返回的结果。图中方框左侧的数字代表production数组的下标,注意,各个Symbol存储的顺序和schema中的顺序正好相反)。

sequenceSymbol0

Symbol.root(symbol)

root()会new一个RootSymbol,并将上面生成的sequenceSymbol0展开,作为该RootSymbol的production。这里最复杂的就在于将sequenceSymbol0展开的操作flatten()。这里关注两个flatten()。

  1. Symbol类的静态方法
    将In:Symbol[]平铺开为out:Symbol[],大概就是下面这么个意思。


    in2out

    代码如下:
    <pre>
    static void flatten(Symbol[] in, int start,
    Symbol[] out, int skip,
    Map<Sequence, Sequence> map,
    Map<Sequence, List<Fixup>> map2) {
    for (int i = start, j = skip; i < in.length; i++) {
    Symbol s = in[i].flatten(map, map2); //如果in[i]是一个Terminal,则直接返回其本身
    //如果in[i]是一个Sequence,则递归的展开自身
    //其他类型的Symbol在此不再关注
    if (s instanceof Sequence) {
    Symbol[] p = s.production;
    List<Fixup> l = map2.get(s);
    if (l == null) {
    System.arraycopy(p, 0, out, j, p.length);
    } else {
    l.add(new Fixup(out, j));
    }
    j += p.length;
    } else { //如果不是Sequence,直接将in赋值给out
    out[j++] = s;
    }
    }
    }
    </pre>

  2. Sequence的私有方法
    将sequence的production铺开。将上图"in2out"中的in定义为某个Sequence展开前prodcution,out定义为某个Sequence展开后的production,则上图即可表示这个方法的意思。
    <pre>
    @Override
    public Sequence flatten(Map<Sequence, Sequence> map,
    Map<Sequence, List<Fixup>> map2) {
    Sequence result = map.get(this);
    if (result == null) {
    result = new Sequence(new Symbol[flattenedSize()]); //初始化展开后的Sequence(flattenedSize也是个递归调用,用于获取一个Sequence的production层层展开后的Symbol的个数)
    map.put(this, result); //缓存展开结果
    List<Fixup> l = new ArrayList<Fixup>();
    map2.put(result, l);

    flatten(production, 0,
    result.production, 0, map, map2);
    for (Fixup f : l) {
    System.arraycopy(result.production, 0, f.symbols, f.pos,
    result.production.length);
    }
    map2.remove(result);
    }
    return result;
    }
    </pre>

最简单的情况下,假设sequenceSymbol0只有一堆terminalSymbol,那就只调用了Symbol类静态的flatten(),直接将in赋值给了out。

复杂一点的情况下,如sequenceSymbol0还包含了sequenceSymbol1,或是嵌套着更多类的引用。此时,两个flatten()会相互递归调用,但最终,总会有一个sequence只包含Terminal之类的Symbol,从而结束递归。此时,代码中的map可以用来缓存展开结果,比如ClassA中有两个ClassB成员的情况。

不过,还有个关键的地方没弄懂。据注释所说,map、map2以及Fixup可用来解决无限递归的。比如,设result为SequenceA扩展后的结果,在result的production填充之前,便存储在map中。下次因递归而再次遇到SequenceA时,则直接从map中取到result。但此时,result的production并未被填充,而是依赖后续的Fixup进行填充。
可能产生无限递归的情况我只想到一种,那就是两个类之间相互引用(即ClassA引用了ClassB,ClassB又引用了ClassA)。但是代码最终未能调试起来,因为在这种情况下,flattenedSize()没能规避无限递归,导致了StackOverflowError。
导致问题的代码如下。
类A
<pre>
public class A {
public B b;
}
</pre>
类B
<pre>
public class B {
public A a;
}
</pre>
主函数
<pre>
public static void main(String[] args) throws IOException {
Schema s = ReflectData.get().getSchema(A.class);
DecoderFactory.get().resolvingDecoder(
Schema.applyAliases(s, s), s, null);
}
</pre>

仍以上面的sequenceSymbol0为例,最终生成的Root Symbol及语法堆栈如下图所示。

stack初始状态

读取一条记录到reuse中

此时,resolver已经初始化好了,并且语法堆栈已经有了一个Root Symbol,接下来就是基于语法堆栈读取数据。

read(reuse, expected, resolver)根据expected(也就是类的schema)的类型不同,调用不同的方法,代码如下。
<pre>
protected Object read(Object old, Schema expected,
ResolvingDecoder in) throws IOException {
switch (expected.getType()) {
case RECORD: return readRecord(old, expected, in);
case ENUM: return readEnum(expected, in);
....
}
}
</pre>

这里主要还是看readRecord(),代码如下。
<pre>
protected Object readRecord(Object old, Schema expected,
ResolvingDecoder in) throws IOException {
...
for (Field f : in.readFieldOrder()) { //获取record的fields
...
readField(r, f, oldDatum, in, state); //读取field,其中又递归的调用了read()
}
...
}
</pre>

其中,for循环内递归调用了上面的read(),不再细述。这里主要看一下代码是如何获取record的fields的。

readFieldOrder()就一行代码,调用了Parser.advance()。
<pre>return ((Symbol.FieldOrderAction) parser.advance(Symbol.FIELD_ACTION)).fields;</pre>

Parser维护了语法堆栈,advance()顾名思义,也即弹出栈顶,并据此进行一次处理。
advance()代码如下。
<pre>
public final Symbol advance(Symbol input) throws IOException {
for (; ;) {
Symbol top = stack[--pos]; //栈顶出栈
if (top == input) { //如top和input都是一个terminal,像readDouble()、readInt()这些方法,就会执行这个分支
return top; // A common case
}

  Symbol.Kind k = top.kind;
  if (k == Symbol.Kind.IMPLICIT_ACTION) { //一个隐式动作,这里的doAction()其实就是ResolvingDecoder.doAction()
    Symbol result = symbolHandler.doAction(input, top);
    if (result != null) {
      return result;
    }
  } else if (k == Symbol.Kind.TERMINAL) { //如果input是一个terminal,但是栈顶却不是terminal,抛出异常
    throw new AvroTypeException("Attempt to process a "
            + input + " when a "
            + top + " was expected.");
  } else if (k == Symbol.Kind.REPEATER
      && input == ((Symbol.Repeater) top).end) {
    return input;
  } else {
    pushProduction(top); //将栈顶的production压栈
  }
}

}
</pre>

第一个栈顶是Root Symbol,所以执行的是最后一个分支:pushProduction()。pushProdcution将栈顶(刚刚出栈)的production压栈。如果堆栈大小不够,扩展堆栈。
<pre>
public final void pushProduction(Symbol sym) {
Symbol[] p = sym.production;
while (pos + p.length > stack.length) { //大小不够,则扩栈
expandStack();
}
System.arraycopy(p, 0, stack, pos, p.length);
pos += p.length;
}

</pre>

执行完pushProduction()后,堆栈成为了下面这个样子。

pushProduction()后的stack

之后会继续执行for()循环,继续弹出栈顶。
此时的栈顶是一个FieldOrderAction1,FieldOrderAction属于IMPLICIT_ACTION。此时,将会执行symbolHandler.doAction()。
这里的symbolHandler其实就是ResolvingDecoder,从ResolvingDecoder.doAction()中可以看到哪些Symbol属于IMPLICT_ACTION。这里不再深入。
doAction()的开头就是对FieldOrderAction的处理,可以看到直接返回了top(也即返回了FieldOrderAction1)。
<pre>
public Symbol doAction(Symbol input, Symbol top) throws IOException {
if (top instanceof Symbol.FieldOrderAction) {
return input == Symbol.FIELD_ACTION ? top : null;
...
}
</pre>

这就将FieldOrderAction1取出来了。FieldOrderAction1中包含了intSchema、stringSchema和recordSchema,在遍历到recordSchema时,又会递归的调用readRecord()。

left

还有问题没搞清楚,先搁着吧。
RootSymbol的production为何提前铺开了?不铺开行么?
GenericDatumReader.read()返回的是GenericData,GenericData最终是如何映射到类上的?

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 200,961评论 5 473
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,444评论 2 377
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,009评论 0 333
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,082评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,101评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,271评论 1 278
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,738评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,395评论 0 255
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,539评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,434评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,481评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,160评论 3 317
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,749评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,816评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,038评论 1 256
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,548评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,140评论 2 341

推荐阅读更多精彩内容