Spark SQL中Kryo反序列化问题分析

1 问题描述

当使用Spark-sql执行 Hive UDF时会发生NullPointerException(NPE),从而导致作业异常终止。NPE具体堆栈信息如下:

Serialization trace:
fields (com.xiaoju.dataservice.api.hive.udf.LoadFromDataServiceMetricSetUDTF)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:144)
    at com.esotericsoftware.kryo.serializers.FieldSerializer.read(FieldSerializer.java:551)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:686)
    at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.deserializeObjectByKryo(HiveShim.scala:155)
    at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.deserializePlan(HiveShim.scala:171)
    at org.apache.spark.sql.hive.HiveShim$HiveFunctionWrapper.readExternal(HiveShim.scala:210)
    at java.io.ObjectInputStream.readExternalData(ObjectInputStream.java:1842)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1799)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at scala.collection.immutable.List$SerializationProxy.readObject(List.scala:479)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at java.io.ObjectStreamClass.invokeReadObject(ObjectStreamClass.java:1058)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1900)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2000)
    at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:1924)
    at java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:1801)
    at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1351)
    at java.io.ObjectInputStream.readObject(ObjectInputStream.java:371)
    at org.apache.spark.serializer.JavaDeserializationStream.readObject(JavaSerializer.scala:75)
    at org.apache.spark.serializer.JavaSerializerInstance.deserialize(JavaSerializer.scala:114)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:80)
    at org.apache.spark.scheduler.Task.run(Task.scala:108)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.lang.NullPointerException
    at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:234)
    at java.util.ArrayList.ensureCapacity(ArrayList.java:218)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:114)
    at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:40)
    at com.esotericsoftware.kryo.Kryo.readObject(Kryo.java:708)
    at com.esotericsoftware.kryo.serializers.ObjectField.read(ObjectField.java:125)

2 问题分析

2.1 NPE直接原因分析

从上述堆栈信息可知,NPE发生在Kryo反序列化ArrayList对象时。

Kryo是一个快速高效的序列化框架,它不强制使用某种模式或具有特殊操作特点的数据,所有的规范都交由Serializers自己来处理。不同的数据类型采用的Serializers进行处理,同时也允许用户自定义Serializers来处理数据。而针对ArrayList类型的集合类型的数据,Kryo默认提供了CollectionSerializer.

at java.util.ArrayList.ensureExplicitCapacity(ArrayList.java:234)
at java.util.ArrayList.ensureCapacity(ArrayList.java:218)
at com.esotericsoftware.kryo.serializers.CollectionSerializer.read(CollectionSerializer.java:114)

结合上述堆栈信息,通过源码调试,我们发现CollectionSerializer#read中会反序列化生成ArrayList对象,在调用ensureCapacity设置ArrayList容量时发生NPE异常. 通过试信息发现生成的ArrayList中elementData属性未初始化,调试信息如下:


image

而通过查看ArrayList的各个构造函数,均对ArrayList@elementData进行了初始化。为什么调试结果显示elementData为NULL呢,除非创建对象时未调用任何构造函数,于是问题的分析方向转移到了ArrayList的创建方式上。

 /**
     * Constructs an empty list with an initial capacity of ten.
     */
    public ArrayList() {
        this.elementData = DEFAULTCAPACITY_EMPTY_ELEMENTDATA;
    }
    
    //其它构造函数也均对elementData进行了初始化
     

2.2 ArrayList对象的创建方式

上文提到,创建的ArrayList对象的elementData属性为NULL,而ArrayList的各个构造方法中都对elementData进行了初始化,出现此结果的原因可能是由于创建对象时未使用任何构造方法。带着此假设,再次对程序进行调试。

    //创建ArrayList对象的方法

    /** Creates a new instance of a class using {@link Registration#getInstantiator()}. If the registration's instantiator is null,
     * a new one is set using {@link #newInstantiator(Class)}. */
    public <T> T newInstance (Class<T> type) {
        Registration registration = getRegistration(type);
        ObjectInstantiator instantiator = registration.getInstantiator();
        if (instantiator == null) {
            instantiator = newInstantiator(type);
            registration.setInstantiator(instantiator);
        }
        return (T)instantiator.newInstance();

ArrayList对象由Kryo#newInstance方法进行实例化,而具体采用的实例化器(创建对象采用的构造器),类型向Kryo注册Registration时指定的实例器,若注册时未指定,则会依据Class Type按设置的InstantiatorStrategy创建实例化器。实现如下:

/** Returns a new instantiator for creating new instances of the specified type. By default, an instantiator is returned that
     * uses reflection if the class has a zero argument constructor, an exception is thrown. If a
     * {@link #setInstantiatorStrategy(InstantiatorStrategy) strategy} is set, it will be used instead of throwing an exception. */
    protected ObjectInstantiator newInstantiator (final Class type) {
        // InstantiatorStrategy.
        return strategy.newInstantiatorOf(type);
    }

SparkSql在序列化及反序列化Hive UDF时默认采用的Kryo实例由Hive代码定义的,其采用的实例化器策略为StdInstantiatorStrategy(若注册的Registration未设置instantiator,则使用该策略创建instantiator),具体实现如下:


  // Kryo is not thread-safe,
  // Also new Kryo() is expensive, so we want to do it just once.
  public static ThreadLocal<Kryo> runtimeSerializationKryo = new ThreadLocal<Kryo>() {
    @Override
    protected synchronized Kryo initialValue() {
      Kryo kryo = new Kryo();
      kryo.setClassLoader(Thread.currentThread().getContextClassLoader());
      kryo.register(java.sql.Date.class, new SqlDateSerializer());
      kryo.register(java.sql.Timestamp.class, new TimestampSerializer());
      kryo.register(Path.class, new PathSerializer());
      kryo.setInstantiatorStrategy(new StdInstantiatorStrategy());
      ......
      return kryo;
    };
  };

而StdInstantiatorStrategy在创建对象时是依据JVM version信息及JVM vendor信息进行的,而不是依据Class的具体实现,
其可以不调用对象的任何构造方法创建对象。

// StdInstantiatorStrategy的描述信息
/**
 * Guess the best instantiator for a given class. The instantiator will instantiate the class
 * without calling any constructor. Currently, the selection doesn't depend on the class. It relies
 * on the
 * <ul>
 * <li>JVM version</li>
 * <li>JVM vendor</li>
 * <li>JVM vendor version</li>
 * </ul>
 * However, instantiators are stateful and so dedicated to their class.
 * 
 * @author Henri Tremblay
 * @see ObjectInstantiator
 */
public class StdInstantiatorStrategy extends BaseInstantiatorStrategy {

而我们发现Kryo在注册各类型Class的Registration对象时都未显式设置instantiator,因此都会采用StdInstantiatorStrategy策略构造对象。
至此,我们的假设成立,NPE的原因是由于生成ArrayList对象时未调用任何构造方法,从而使其elementData属性未初始化所致。

3 部分Spark版本可以正常执行的原因

同样的用户程序,在公司较早期的Spark中可以正常执行,而在最新提供的Spark版本中会出现上述Bug,为什么会出现这样的问题呢,我们的第一反应是可能Kryo的版本不同,通过查看IDE的External Libraries 观查到老版本Spark采用的是Kryo 2, 而最新版本中依赖的是Kryo 3。

通过分析两个版本的Kryo代码实现,并没有发现对ArrayList的操作行为有何不同。于是重新进行排查,因问题发生于Hive UDF的反序列化过程,因此排查了两个版本Spark 依赖的Hive版本信息。

公司老版本Spark依赖的Hive信息(Spark官方的依赖版本,即:阉割版):

 <hive.group>org.spark-project.hive</hive.group>
    <!-- Version used in Maven Hive dependency -->
<hive.version>1.2.1.spark</hive.version>

公司新版本Spark依赖的Hive信息(本质为社区版Hive):

 <hive.group>com.my corporation.hive</hive.group>
    <!-- Version used in Maven Hive dependency -->
<hive.version>1.2.1-200-spark</hive.version>

显然,公司使用的新老版本的Spark依赖的Hive是不同的。通过调研发现Spark社区版的Hive依赖“org.spark-project.hive” 系在原版Hive基础上修改过的独立的工程,其中存在自己定义的Kryo的组件(即对Hive社区版进行了阉割,并自己实现了Kryo)。 而公司新版Spark中依赖的Hive是社区版Hive, Hive中使用的Kryo组件为第三方依赖(Kryo官方版,并通过maven-shade-plugin的relocation将包路径重定义到了hive-exec中)。

通过对比分析发现:

公司老版本依赖的Hive(即Spark社区版中依赖的Hive)中对Kryo的newInstantiator方法进行了改造,其并未设置实例化器策略(InstantiatorStrategy),而是直接通过获取Class的默认构造函数来创建对象,即其创建的对象是被实例化的。因此,创建ArrayList时,elementData属性可以被初始化。

对该问题存在影响的不同实现:

  • 公司老版本Spark依赖Hive(即社区版Spark中阉割的Hive)中使用的Kryo

    protected ObjectInstantiator newInstantiator(final Class type) {
        if (!Util.isAndroid) {
            Class enclosingType = type.getEnclosingClass();
            boolean isNonStaticMemberClass = enclosingType != null && type.isMemberClass() && !Modifier.isStatic(type.getModifiers());
            if (!isNonStaticMemberClass) {
                try {
                    // 获取无参构造方法
                    final ConstructorAccess access = ConstructorAccess.get(type);
                    return new ObjectInstantiator() {
                        public Object newInstance() {
                            try {
                                return access.newInstance();
                            } catch (Exception var2) {
                                throw new KryoException("Error constructing instance of class: " + Util.className(type), var2);
                            }
                        }
                    };
                } catch (Exception var7) {
                    ;
                }
            }
        }
    ......
    }


  • 公司新版本Spark依赖的Hive(实为社区版Hive)中使用的Kryo,是依据InstantiatorStrategy选取不同的策略进行创建对象,在本文2.2节已进行描述,不再赘述。
/** Returns a new instantiator for creating new instances of the specified type. By default, an instantiator is returned that
     * uses reflection if the class has a zero argument constructor, an exception is thrown. If a
     * {@link #setInstantiatorStrategy(InstantiatorStrategy) strategy} is set, it will be used instead of throwing an exception. */
    protected ObjectInstantiator newInstantiator (final Class type) {
        // InstantiatorStrategy.
        return strategy.newInstantiatorOf(type);
    }
    

4 解决方案

经过以上分析,可知NPE的主要原因是由于Spark调用了Hive中设置了StdInstantiatorStrategy的Kryo对象对ArrayList对象反序列化时未调用其任何构造函数,从而使用创建的对象未实例化所致。

因此,可以在Spark、Hive、Kryo三者中任一中修复。目前,该问题只在Spark引擎中出现,故选择在Spark中进行修复。主要思想是首先使用默认无参构造策略DefaultInstantiatorStrategy,若创建对象失败则采用StdInstantiatorStrategy

@transient
def deserializeObjectByKryo[T: ClassTag](
    kryo: Kryo,
    in: InputStream,
    clazz: Class[_]): T = {
  val inp = new Input(in)
  // 显式设置instantiator
kryo.setInstantiatorStrategy(new Kryo.DefaultInstantiatorStrategy(new StdInstantiatorStrategy))

  val t: T = kryo.readObject(inp, clazz).asInstanceOf[T]
  inp.close()
  t
}

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容