1. 背景
Spark在的Dataframe在使用的过程中或涉及到schema的问题,schema就是这个Row的数据结构(StructType),在代码中就是这个类的定义。如果你想解析一个json或者csv文件成dataframe,那么就需要知道他的StructType。
徒手写一个复杂类的StructType是个吃力不讨好的事情,所以Spark默认是支持自动推断schema的。但是如果使用流处理(Streaming)的话,他的支持力度是很受限的,最近在做Streaming处理的时候,遇到一些schema inference的问题,所以借机学习整理下Spark源码是如何实现的。
2. Spark版本
以下的代码基于spark的版本:
项目 | version |
---|---|
scala | 2.11 |
spark-core | 2.4.0 |
spark-sql | 2.4.0 |
mongo-spark-connector | 2.11 |
gradle的配置:
providedRuntime group: 'org.apache.spark', name: 'spark-core_2.11', version: '2.4.0'
providedRuntime group: 'org.apache.spark', name: 'spark-sql_2.11', version: '2.4.0'
providedRuntime group: 'org.mongodb.spark', name: 'mongo-spark-connector_2.11', version: '2.3.1'
3. Schema inference
3.1 spark的Schema inference
3.1.1 通过DDL来解析Schema
DDL的格式类似于:"a INT, b STRING, c DOUBLE"
,
StructType提供了接口直接通过解析DDL来识别StructType
this.userSpecifiedSchema = Option(StructType.fromDDL(schemaString))
先把DDL string解析成SqlBaseLexer
val lexer = new SqlBaseLexer(new UpperCaseCharStream(CharStreams.fromString(command)))
然后, 然后...就看的不太懂了...
3.1.2 解析一个Json的Schema
Spark中Dataframe的文件读取是通过DataFrameReader来完成的.
都是通过DataSet的ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan)
方法转为DataFrame
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
schema是由QueryExecution得到的
def ofRows(sparkSession: SparkSession, logicalPlan: LogicalPlan): DataFrame = {
val qe = sparkSession.sessionState.executePlan(logicalPlan)
qe.assertAnalyzed()
new Dataset[Row](sparkSession, qe, RowEncoder(qe.analyzed.schema))
}
其中的qe.analyzed.schema
这句就是QueryExecution先分析生成LogicPlan,分析的源码在CheckAnalysis.scala
中的def checkAnalysis(plan: LogicalPlan): Unit
def checkAnalysis(plan: LogicalPlan): Unit = {
// We transform up and order the rules so as to catch the first possible failure instead
// of the result of cascading resolution failures.
plan.foreachUp {
case p if p.analyzed => // Skip already analyzed sub-plans
case u: UnresolvedRelation =>
u.failAnalysis(s"Table or view not found: ${u.tableIdentifier}")
case operator: LogicalPlan =>
// Check argument data types of higher-order functions downwards first.
// If the arguments of the higher-order functions are resolved but the type check fails,
// the argument functions will not get resolved, but we should report the argument type
// check failure instead of claiming the argument functions are unresolved.
operator transformExpressionsDown {
case hof: HigherOrderFunction
if hof.argumentsResolved && hof.checkArgumentDataTypes().isFailure =>
hof.checkArgumentDataTypes() match {
case TypeCheckResult.TypeCheckFailure(message) =>
hof.failAnalysis(
s"cannot resolve '${hof.sql}' due to argument data type mismatch: $message")
}
...
...
}
最终由Logic的output: Seq[Attribute]
转为StructType
:
lazy val schema: StructType = StructType.fromAttributes(output)
具体每个Attribute
转你为StructType
的代码如下:
private[sql] def fromAttributes(attributes: Seq[Attribute]): StructType =
StructType(attributes.map(a => StructField(a.name, a.dataType, a.nullable, a.metadata)))
3.1.3 Kafka的Schema
在使用Kafka的Streaming的时候,自动推断只能推断到固定的几个StructField
, 如果value是Json的话,也不会进一步解析出来。
这个是因为Kafka和json的dataSource是不一样的
DataFrame在load的时候,会有DataSource基于provider name
来找到这个provider
的data source的类定义
// DataSource.scala line 613
def lookupDataSource(provider: String, conf: SQLConf): Class[_] = {
val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match {
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" =>
classOf[OrcFileFormat].getCanonicalName
case name if name.equalsIgnoreCase("orc") &&
conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" =>
"org.apache.spark.sql.hive.orc.OrcFileFormat"
case "com.databricks.spark.avro" if conf.replaceDatabricksSparkAvroEnabled =>
"org.apache.spark.sql.avro.AvroFileFormat"
case name => name
}
val provider2 = s"$provider1.DefaultSource"
...
}
- 如果输入
provider name
是"json"返回的是JsonFileFormat
- 如果是”kafka“返回的是
KafkaSourceProvider
而KafkaSourceProvider
的sourceSchema
是kafkaSchema
override def sourceSchema(
sqlContext: SQLContext,
schema: Option[StructType],
providerName: String,
parameters: Map[String, String]): (String, StructType) = {
validateStreamOptions(parameters)
require(schema.isEmpty, "Kafka source has a fixed schema and cannot be set with a custom one")
(shortName(), KafkaOffsetReader.kafkaSchema)
}
具体kafkaSchema
的定义如下:
def kafkaSchema: StructType = StructType(Seq(
StructField("key", BinaryType),
StructField("value", BinaryType),
StructField("topic", StringType),
StructField("partition", IntegerType),
StructField("offset", LongType),
StructField("timestamp", TimestampType),
StructField("timestampType", IntegerType)
))
3.2 mongo-spark的Schema inference
3.2.1 MongoInferSchema源码分析
看mongoSpark源码的时候,意外从一个toDF
的方法里发现了有个MongoInferSchema
实现了类型推断.
/**
* Creates a `DataFrame` based on the schema derived from the optional type.
*
* '''Note:''' Prefer [[toDS[T<:Product]()*]] as computations will be more efficient.
* The rdd must contain an `_id` for MongoDB versions < 3.2.
*
* @tparam T The optional type of the data from MongoDB, if not provided the schema will be inferred from the collection
* @return a DataFrame
*/
def toDF[T <: Product: TypeTag](): DataFrame = {
val schema: StructType = MongoInferSchema.reflectSchema[T]() match {
case Some(reflectedSchema) => reflectedSchema
case None => MongoInferSchema(toBsonDocumentRDD)
}
toDF(schema)
}
于是研究了下,发现MongoInferSchema
的实现分两种情况:
- 给定了要解析的class类型
如果是给定了要解析的class类型,那就很好办,直接基于Spark的ScalaReflection
的 schemaFor
函数将class转为Schema
:
case class Schema(dataType: DataType, nullable: Boolean)
这个Schema
是ScalaReflection
中定义的一个case class,本质是个catalyst DataType
所以可以再进一步直接转为StructType
, 所以代码实现很简单:
ScalaReflection.schemaFor[T].dataType.asInstanceOf[StructType]
- 未给定要解析的class类型
如果没有给定要解析的class类型,那就直接根据从mongo里读取的RDD来推断Schema. 这个具体的实现方式是对RDD进行采样,采样数可以在readConfig中设置,默认值是1000(private val DefaultSampleSize: Int = 1000
).
因为从mongo读取出来的格式就是BsonDocument
, 所以采样的过程就是将每个BsonDocument
转为StructType
private def getSchemaFromDocument(document: BsonDocument, readConfig: ReadConfig): StructType = {
val fields = new util.ArrayList[StructField]()
document.entrySet.asScala.foreach(kv => fields.add(DataTypes.createStructField(kv.getKey, getDataType(kv.getValue, readConfig), true)))
DataTypes.createStructType(fields)
}
然后将采样的1000个集合进行两两merge,获取兼容的类型,最终得到RootType,即为所需的Schema:
// perform schema inference on each row and merge afterwards
val rootType: DataType = sampleData
.map(getSchemaFromDocument(_, mongoRDD.readConfig))
.treeAggregate[DataType](StructType(Seq()))(
compatibleType(_, _, mongoRDD.readConfig, nested = false),
compatibleType(_, _, mongoRDD.readConfig, nested = false)
)
3.2.2 MongoInferSchema存在的问题
3.2.2.1 Java兼容性问题
虽然scala脱胎于java,但是在类型和结构上也逐渐出现了很多的不同点,包括部分基础结构和各种各样的复杂结构。所以如果要推断的类是java类,MongoInferSchema
也提供了MongoInferSchemaJava
实现类型反射:
/**
* A helper for inferring the schema from Java
*
* In Spark 2.2.0 calling this method from Scala 2.10 caused compilation errors with the shadowed library in
* `JavaTypeInference`. Moving it into Java stops Scala falling over and allows it to continue to work.
*
* See: SPARK-126
*/
final class MongoInferSchemaJava {
@SuppressWarnings("unchecked")
public static <T> StructType reflectSchema(final Class<T> beanClass) {
return (StructType) JavaTypeInference.inferDataType(beanClass)._1();
}
}
具体的推断实现在def inferDataType(typeToken: TypeToken[_], seenTypeSet: Set[Class[_]]
函数中,代码如下,这里就不详细展开了。
/**
* Infers the corresponding SQL data type of a Java type.
* @param typeToken Java type
* @return (SQL data type, nullable)
*/
private def inferDataType(typeToken: TypeToken[_], seenTypeSet: Set[Class[_]] = Set.empty)
: (DataType, Boolean) = {
typeToken.getRawType match {
case c: Class[_] if c.isAnnotationPresent(classOf[SQLUserDefinedType]) =>
(c.getAnnotation(classOf[SQLUserDefinedType]).udt().newInstance(), true)
case c: Class[_] if UDTRegistration.exists(c.getName) =>
val udt = UDTRegistration.getUDTFor(c.getName).get.newInstance()
.asInstanceOf[UserDefinedType[_ >: Null]]
(udt, true)
...
...
...
val properties = getJavaBeanReadableProperties(other)
val fields = properties.map { property =>
val returnType = typeToken.method(property.getReadMethod).getReturnType
val (dataType, nullable) = inferDataType(returnType, seenTypeSet + other)
new StructField(property.getName, dataType, nullable)
}
(new StructType(fields), true)
}
}
所以如果大家要使用mongo-spark的类型推断,那么可以基于scala和java封装2个接口函数用于Schema Infer, 下面是我自己封装的2个函数:
/**
* @see [[MongoInferSchema.apply]]
*/
protected def inferSchemaScala[T <: Product : TypeTag](): StructType = {
MongoInferSchema.reflectSchema[T]() match {
case Some(reflectedSchema) => reflectedSchema
// canonicalizeType erases all empty structs, including the only one we want to keep
case None => StructType(Seq())
}
}
/**
* @see [[MongoInferSchema.apply]]
*/
protected def inferSchemaJava[T](beanClass: Class[T]): StructType = {
MongoInferSchema.reflectSchema(beanClass)
}
3.2.2.2 采样推断不准确问题
产生不准确的原因在于:
- 采用点不完整
毕竟是采样,如果某个字段在采样点没出现,则会导致最终推断的不准确 - 集合类结构泛型推断错误
另外一个问题是,比如字段里有个Map[String , String]
类型,可能会把其中的key推断成不同的StrutType
,而不是统一推断成String
。我自己做过测试,会一定程度上依赖某些key是否会高频出现,所以说这种infer schema具有不确定性。
解决方案:
- 使用的数据结构尽量简单,不要有嵌套或者复杂结构
但这种情况,真正的生产环境很难,大部分公司的代码结构,迭代了那么久,怎么会那么简单呢,对吧? - 给定要解析的class类型
这个是个很好的方案,可以确保没有错误,同时,如果类的字段或者结构发生变化了,可以确保无缝兼容,不用重新修改代码。
4. 总结
以上介绍了几种spark内部实现 schema inference 源码和使用方式。在日常大部分工作中这些东西都是被spark隐藏的,而且如果没有特殊场景,也是不需要涉及到这里的东西。我是因为刚好遇到Spark Streaming读写Kafka的Topic,但发现读到的RDD/DataFrame没有很好的解析Schema,于是研究了下相关的实现。
最终基于项目选择了MongoInferSchema
的实现方式,友好的解决了问题。