Caused by: java.lang.NullPointerException
at org.apache.spark.sql.catalyst.ScalaReflection$class.localTypeOf(ScalaReflection.scala:839)
at org.apache.spark.sql.catalyst.ScalaReflection$.localTypeOf(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:751)
at org.apache.spark.sql.catalyst.ScalaReflection$$anonfun$schemaFor$1.apply(ScalaReflection.scala:715)
at scala.reflect.internal.tpe.TypeConstraints$UndoLog.undo(TypeConstraints.scala:56)
at org.apache.spark.sql.catalyst.ScalaReflection$class.cleanUpReflectionObjects(ScalaReflection.scala:824)
at org.apache.spark.sql.catalyst.ScalaReflection$.cleanUpReflectionObjects(ScalaReflection.scala:39)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:714)
at org.apache.spark.sql.catalyst.ScalaReflection$.schemaFor(ScalaReflection.scala:711)
at org.apache.spark.sql.functions$.udf(functions.scala:3398)
...omitted...
at scala.concurrent.impl.Future$PromiseCompletingRunnable.liftedTree1$1(Future.scala:24)
at scala.concurrent.impl.Future$PromiseCompletingRunnable.run(Future.scala:24)
at scala.concurrent.impl.ExecutionContextImpl$AdaptedForkJoinTask.exec(ExecutionContextImpl.scala:121)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
)
目前怀疑是scala bug所致,https://github.com/scala/bug/issues/10766
Spark在注册用户的UDF的时候会根据UDF的输入类型和返回类型通过Scala反射的方式映射到Spark SQL内部的类型表示,比如输入类型是Int会得到IntegerType,返回类型是String亦可得到StringType类型。
这个过程通过 localTypeOf[某个类型]和预设的localTypeOf[java.lang.Integer]等的规则匹配判断,而这里用的scala的操作符 <:< 由于https://github.com/scala/bug/issues/10766的bug并不是线程安全的,可能触发这个诡异问题。
udf注册加同步,看看是否能够规避这个问题