1. Overview
本文将介绍 Spark SQL 的基本概念和基本使用,并介绍自定义数据源和 catalyst 的基本概念。通过此文可以对 Spark SQL 建立一个初步了解。
2. Basic
2.1 什么是 Spark SQL?
Spark SQL is a Spark module for structured data processing. Unlike the basic Spark RDD API, the interfaces provided by Spark SQL provide Spark with more information about the structure of both the data and the computation being performed. Internally, Spark SQL uses this extra information to perform extra optimizations.
以上内容摘自官网,这里有两个重点:1. structured data processing 2. extra optimizations 即 Spark SQL 是专门处理结构化数据的一个 Spark 模块,通过额外的 schema 信息使得计算更加便捷,框架对计算也可以进行性能上的优化。
2.2 Spark SQL 有哪些接口?
There are several ways to interact with Spark SQL including SQL and the Dataset API.
使用 Spark SQL 有两种方式,一类是通过 SQL ,一类是通过 Dataset API。其中前者更常用一些。
2.3 外部数据源获取 DataFrame/DataSet
DataFrame 可以通过 SparkSession 从已存在的 RDD,Hive 表,或者外部数据源读取。从 RDD 以及从 Hive 表获取 DataFrame 后面进行详细讲解。这里以外部数据源为例获取 DF。首先看一下 Spark 支持哪些外部数据源:
scala> spark.read.
csv format jdbc json load option options orc parquet schema table text textFile
可以看到有 csv,jdbc,json,orc,parquet,text 等。
2.3.1 DataFrame
以 json 为例,people.json 文件内容如下:
{"name":"Michael"}
{"name":"Andy", "age":30}
{"name":"Justin", "age":19}
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
scala> df.printSchema
root
|-- age: long (nullable = true)
|-- name: string (nullable = true)
scala> df.show
+----+-------+
| age| name|
+----+-------+
|null|Michael|
| 30| Andy|
| 19| Justin|
+----+-------+
scala> df.select("name").show()
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
scala> df.createOrReplaceTempView("people")
scala> spark.sql("select name from people").show
+-------+
| name|
+-------+
|Michael|
| Andy|
| Justin|
+-------+
上面代码演示了 DataFrame 的创建、以及如何使用 SQL 接口和 DataFrame API 的方式进行数据查询。
2.3.2 DataSet
DataSet是具有强类型的数据集合,需要提供对应的类型信息。
scala> case class Person(name: String, age: Long)
defined class Person
scala> val peopleDS = spark.read.json("examples/src/main/resources/people.json").as[Person]
peopleDS: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
scala> peopleDS.map(_.
age canEqual copy equals hashCode name productArity productElement productIterator productPrefix toString
上面代码演示了 DataSet 的创建,可以看到只需要在创建 DataFrame 的基础上加上类型信息即可。在 map 时可以获取到对象结构内的成员变量,SQL 操作和 API 接口和 DataFrame 都是一样的,不再演示。
2.4 RDD 获取 DataFrame/DataSet
RDD 的数据来源 people.txt 内容如下:
Michael, 29
Andy, 30
Justin, 19
2.4.1 反射方式
scala> val txtRDD = sc.textFile("/examples/src/main/resources/people.txt")
txtRDD: org.apache.spark.rdd.RDD[String] = /examples/src/main/resources/people.txt
scala> case class Person(name: String, age: Long)
defined class Person
scala> val peopleDF = txtRDD.map(_.split(",")).map(attrs => Person(attrs(0),attrs(1).trim.toLong)).toDF()
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> peopleDF.show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
2.4.2 编程方式
scala> val txtRDD = sc.textFile("/examples/src/main/resources/people.txt")
txtRDD: org.apache.spark.rdd.RDD[String] = /examples/src/main/resources/people.txt
scala> val schema = StructType(Array(StructField("name",StringType,nullable = true),StructField("age",LongType,nullable = true)))
schema: org.apache.spark.sql.types.StructType = StructType(StructField(name,StringType,true), StructField(age,LongType,true))
scala> val rowRDD = txtRDD.map(_.split(",")).map(attrs => Row(attrs(0),attrs(1).trim.toLong))
rowRDD: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row
scala> val peopleDF = spark.createDataFrame(rowRDD,schema)
peopleDF: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
scala> peopleDF.show
+-------+---+
| name|age|
+-------+---+
|Michael| 29|
| Andy| 30|
| Justin| 19|
+-------+---+
通过如上演示可以看到,反射方式的核心是通过 RDD[CaseClass].toDF()
方式实现的,而编程方式的核心是通过 spark.createDataFrame(RDD[Row],schema)
实现的。
2.5 Hive 表获取 DataFrame/DataSet
想使用 hdfs 上的 hive 表,至少需要配置一个文件 hive-site.xml 到 Spark 的 conf 目录下,并在文件中通过spark.sql.warehouse.dir
指明数仓的地址。如果有其他关于 hdfs 的配置,则需要把 core-site.xml、hdfs-site.xml 配置到 conf 目录下。成功配置后直接通过spark.sql(...)
执行 SQL 语句即可。
3. Deep
3.1 自定义外部数据源
Spark 自带了很多外部数据源,如前面章节介绍的 json、csv等等。但有时这些数据格式无法满足我们的需求,此时我们可以通过自定义数据源的方式来解析我们的数据。自定义数据源的本质是使用 RDD 转 DF 中的编程方式,即 spark.createDataFrame(RDD[Row],schema)
,通过继承抽象类、实现接口来提供所需要的参数。
3.1.1 基本使用
class MyRelation(sqlContext:SQLContext,path:String,schema:StructType=null) extends BaseRelation with TableScan
这里通过继承一个抽象类 BaseRelation 来提供 schema,并通过实现一个接口 TableScan 来提供 RDD[Row]
假如我们要解析的数据格式如下:
19.167.29.40 [2018-03-04 21:10:16] (Android o,Meizu note 7) 三国志 104
抽象类 BaseRelation 中的 schema 方法如下实现:
override def schema: StructType = {
if(userSchema != null){
userSchema
}else{
StructType(
Array(
StructField("ip",StringType),
StructField("date",StringType),
StructField("androidVersion",StringType),
StructField("phoneModel",StringType),
StructField("gameName",StringType),
StructField("gameId",IntegerType)
))
}
}
接口 TableScan 的方法 buildScan 方法如下实现:
override def buildScan(): RDD[Row] = {
val rdd = sqlContext.sparkContext.textFile(path)
rdd.map(line => line.split("\t")).map(attrs => Row(
attrs(0),
formatTime(attrs(1)),
attrs(2).split(",")(0).filter(_!='('),
attrs(2).split(",")(1).filter(_!=')'),
attrs(3),
attrs(4).toInt))
}
此时已经通过两个方法提供了spark.createDataFrame(RDD[Row],schema)
所需要的两个参数。最后再通过一个 Provider 关联我们的 MyRelation 即可。
package com.xiaoc024.spark.offline.datasource
class MySource extends RelationProvider {
override def createRelation(sqlContext: SQLContext, parameters: Map[String, String]): BaseRelation = {
val path = parameters.get("path")
path match {
case Some(p) => new CTxtRelation(sqlContext, p)
}
}
}
最后通过 spark.read.format("com.xiaoc024.spark.offline.datasource.MySource").option("path",path).load()
进行使用
3.1.2 进阶使用
上一小节介绍的是自定义外部数据源的基本使用,还有一些进阶的使用方式,比如列裁剪优化、shortName 等等。其中优化查询效率的手段有:
- 实现
PrunedScan
接口实现列裁剪 - 实现
PrunedFilteredScan
接口实现裁剪+谓词下推
上一小节中使用外部数据源,我们传入的是全类名,如果想使用更加简洁的方式可以让 MySource 实现DataSourceRegister
接口,然后实现 shortName 方法。比如 override def shortName(): String = "ctxt"
但是有一点需要注意的是,单单这样做是不够的,我们还需要修改 spark-sql 的源码来注册 MySource,具体位置在 spark-sql jar包中的 META-INF/services/org.apache.spark.sql.sources.DataSourceRegister中,将我们的 MySource 全类名添加到最后一行。此时就可以通过spark.read.format("ctxt").option("path",path).load()
来进行使用了。
3.2 Catalyst 简介
Catalyst 是 Spark SQL 中的一种优化器,执行策略主要分为两个大的方向:基于规则优化(RBO)以及基于代价优化(CBO)。工作流程如下图:
这里只对流程做简要的概述,总共分为 5 个步骤:
-
sql/df --parser---> Unresolved Logical Plan
解析 sql/df 为抽象语法树,不包含表的数据类型等任何信息,类似于建模
-
Unresolved Logical Plan --analyzer--> Resolved Logical Plan
Analyzer 通过 Catalog 元数据信息遍历抽象语法树为每个结点进行数据类型绑定和函数绑定,生成未优化的逻辑执行计划
-
Resolved Logical Plan --Optimizer(RBO)---> Optimized Logical Plan
对未优化的逻辑执行计划进一步使用 RBO 优化,如谓词下推(参与 join 的数据量减少)、常量累加(不用算很多次)、列裁剪(减少读取的数据量)生成优化的逻辑执行计划
Optimized Logical Plan --Query Planner----> Physical Plans
Physical Plans --CBO---> one Physical Plan