一、为什么要学习Spark SQL
我们已经学习了Hive,它是将Hive SQL转换成MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!所以我们类比的理解:Hive---SQL-->MapReduce,Spark SQL---SQL-->RDD。都是一种解析传统SQL到大数据运算模型的引擎,属于数据分析的范围。
二、什么是DataFrame和DataSet
首先,最简单的理解我们可以认为DataFrame就是Spark中的数据表(类比传统数据库),DataFrame的结构如下:
DataFrame(表)= Schema(表结构) + Data(表数据)
总结:DataFrame(表)是Spark SQL对结构化数据的抽象。可以将DataFrame看做RDD。
DataFrame
DataFrame是组织成命名列的数据集。它在概念上等同于关系数据库中的表,但在底层具有更丰富的优化。DataFrames可以从各种来源构建,
例如:
结构化数据文件(JSON)
外部数据库或现有RDDs
DataFrame API支持的语言有Scala,Java,Python和R。
DataSet
Dataset是数据的分布式集合。Dataset是在Spark 1.6中添加的一个新接口,是DataFrame之上更高一级的抽象。它提供了RDD的优点(强类型化)以及Spark SQL优化后的执行引擎的优点。一个Dataset 可以从JVM对象构造,然后使用函数转换(map, flatMap,filter等)去操作。 Dataset API 支持Scala和Java。 Python不支持Dataset API。
三、读取json、csv、mysql案例
from pyspark import SparkContext,SparkConf
from pyspark.sql import SQLContext,SparkSession,Row
from pyspark.sql.types import StructField,StructType,StringType
conf = SparkConf().setMaster("local[2]").setAppName("sqldemo1")
sc = SparkContext(conf=conf)
sqlContext = SQLContext(sc)
3.1 读取json
jsons = sqlContext.read.json("file:///Users//test.json")
print(jsons.collect())
print(jsons.printSchema())
print(jsons.toPandas())
print(jsons.show())
print(jsons.select("age").show())
print(jsons.select(jsons["lovesPandas"],jsons["age"]+100).show())
jsons.createOrReplaceTempView("people")
sqlDF = sqlContext.sql("select * from people")
print(sqlDF.show())
print(jsons.filter(jsons.age>20).show())
3.2 读取csv
csvs = sqlContext.read.csv("file:///Users//test.csv")
csvs.show(2)
csvs.show()
print(csvs.collect())
print(csvs.columns)
3.3 读取mysql
session = SparkSession.builder.appName("spark.mysql").master("local[2]").getOrCreate()
ctx = SQLContext(session)
jdbcDF = ctx.read.format("jdbc").options(url="jdbc:mysql://1.1.1.1:3306/test",driver="com.mysql.jdbc.Driver",dbtable="(SELECT * FROM yourtable) tmp",user="username",password="password").load()
print(jdbcDF.printSchema())
print(jdbcDF.show())
print(jdbcDF.head(3))
print(jdbcDF.take(4))
print(jdbcDF.columns)
print(jdbcDF.country)
print(jdbcDF["country"])
四、RDD和sparkSQL相互转换
4.1 方式一
lines = sc.textFile("file:///Users/people.txt")
parts = lines.map(lambda line:line.split(","))
people = parts.map(lambda p:Row(name=p[0],age=int(p[1])))
print(parts.collect())
peopleSchema = sqlContext.createDataFrame(people)
peopleSchema.createOrReplaceTempView("people")
teenagers = sqlContext.sql("select * from people WHERE age >= 13 AND age <= 50")
print(teenagers.show())
teenNames = teenagers.rdd.map(lambda p:"Name : " + p.name).collect()
for teenNamein teenNames:
print(teenName)
4.2 方式二
dicts = [{'col1':'a','col2':1}, {'col1':'b','col2':2}]
dict_dataframe = sqlContext.createDataFrame(dicts)
print(dict_dataframe.show())
print("-----dict end-----")
lists = [['a',1], ['b',2]]
list_dataframe = sqlContext.createDataFrame(lists, ['col1','col2'])
print(list_dataframe.show())
print("-----list end-----")
rows = [Row(col1='a',col2=1), Row(col1='b',col2=2)]
row_dataframe = sqlContext.createDataFrame(rows)
print(row_dataframe.show())
print("-----row end-----")
4.3 方式三
lines = sc.textFile("file:///Users/people.txt")
parts = lines.map(lambda l:l.split(","))
people = parts.map(lambda p:(p[0],p[1].strip()))
schemaString ="name age"
fields = [StructField(field_name,StringType(),True)for field_namein schemaString.split()]
schema = StructType(fields)
schemaPeople = sqlContext.createDataFrame(people,schema)
schemaPeople.createOrReplaceTempView("people")
result = sqlContext.sql("select * from people")
result.show()