Spark 结构化API DataFrames

结构化API

结构化API是处理各种数据的工具,从非结构化log文件到半结构化CSV文件和高度结构化的Parquet文件,spark中三种主要的三类结构化API为:

  • Datasets
  • DataFrames
  • SQL tables and views

App dag stage task

DataFrame Dataset

DataFrame、 Dataset是(分布式)1-。4类表集合具有定义良好的行和列
DataFrame无类型(runtime check)
Dataset有类型 (compile time)

对于Spark来说, DataFrame是类型为 RowDataset
特点:

  • immutable
  • lazily evaluated plans

.Scala type reference

Schemas

模式定义了一个DataFrame的列名和类型。
可以手动定义schema或者schema on read
Spark类型直接映射到Spark维护的不同语言api,在Scala、Java、Python、SQLR中,每种api都有一个查询表,简单的说最终代码 使用纯spark执行( Spark’s internal Catalyst representation)

结构化```API的执行流程

  1. 编写DataFrame/Dataset/SQL 代码.
  2. 如果代码是正确的spark将其转化为Logical Plan.
  3. Logical Plan 转为 Physical Plan
    4.Spark 在集群上执行Physical Plan (RDD 操作)

Logical Planning

catalogy是一个包含所有tableDataFrame信息的仓库,用于check代码是否有问题 (eg:table column 不存在)
check 通过的plan 通过Catalyst Optimizer优化
用户可以扩展Catalyst自定义优化规则

Physical Planning

DataFrams 相关操作

加载数据

df=spark.read.json("file:///usr/local/xldw/2015-summary.json")

或者

df = spark.read.format("json").load("file:///usr/local/xldw/2015-summary.json")

查看schema

df.schema
df.printSchema()
org.apache.spark.sql.types.StructType = ...
StructType(StructField(DEST_COUNTRY_NAME,StringType,true),
StructField(ORIGIN_COUNTRY_NAME,StringType,true),
StructField(count,LongType,true))
  • spark 可以根据文件的前几行推断出schema(schema on read)
  • schemaStructType 实例
  • StructType 由一个StructFields 构成
  • Boolean代表这个列是否可以为空

手动指定Schema

// in Scala
import org.apache.spark.sql.types.{StructField, StructType, StringType, LongType}
import org.apache.spark.sql.types.Metadata
val myManualSchema = StructType(Array(StructField("DEST_COUNTRY_NAME", StringType,true),
  StructField("ORIGIN_COUNTRY_NAME", StringType, true),
  StructField("count", LongType, false,
  Metadata.fromJson("{\"hello\":\"world\"}"))
))
val df = spark.read.format("json").schema(myManualSchema).load("/data/flight-data/json/2015-summary.json")

Columns

spark来说, 列是一种逻辑结构,它仅表示通过表达式按每个记录计算的值. 这意味着要为列赋实值,我们需要有一行;为了得到一行,我们需要一个DataFrame

构造和引用列的两种最基本的方式:
colcolumn 方法

Column作为表达式

expr函数实际上可以解析字符串中的转换和列引用,然后可以将它们传递到进一步的转换中,下面三者等价:

col("someCol") - 5
expr("someCol - 5")
expr("someCol") - 5

Columns 只是表达式
这些列和这些列的转换编译成与解析表达式相同的逻辑计划

这意味着您可以将表达式编写为DataFrame代码或SQL表达式,并获得完全相同的性能特征

from pyspark.sql.functions import expr, col, column
df.select(
expr("DEST_COUNTRY_NAME"),
col("DEST_COUNTRY_NAME"),
column("DEST_COUNTRY_NAME"))\
.show(2)

Records and Rows

spark中一行使用Row对象表示,Spark使用列表达式操作行对象,以生成可用的值,行对象在内部表示字节数组。字节数组接口永远不会显示给用户,因为我们只使用列表达式来操作它们

展示

df.show(2)

+-----------------+-------------------+-----+
|DEST_COUNTRY_NAME|ORIGIN_COUNTRY_NAME|count|
+-----------------+-------------------+-----+
|    United States|            Romania|   15|
|    United States|            Croatia|    1|
+-----------------+-------------------+-----+

select and selectExpr

df.select("DEST_COUNTRY_NAME").show(2)
-- in SQL
SELECT DEST_COUNTRY_NAME FROM dfTable LIMIT 2

# select muti columns
df.select("DEST_COUNTRY_NAME", "ORIGIN_COUNTRY_NAME").show(2)

# select function favor
df.select(expr("DEST_COUNTRY_NAME").alias("dest")).show(2)

# select sql favor
df.select(expr("DEST_COUNTRY_NAME AS destination")).show(2)

# selectExpr
df.selectExpr("DEST_COUNTRY_NAME as dest", "ORIGIN_COUNTRY_NAME").show(2)
# selectExpr  opens up the true power of Spark
df.selectExpr("*",
              "(DEST_COUNTRY_NAME=ORIGIN_COUNTRY_NAME) as withinCountry").show(2)


df.selectExpr("avg(count)", "count(distinct(DEST_COUNTRY_NAME))").show(1)

字面量

df.select(expr("*"), lit(1).alias("One")).show(2)

添加列

df.withColumn("numberOne", lit(1)).show(2)
-- in SQL
SELECT *, 1 as numberOne FROM dfTable LIMIT 2

df.withColumn("withinCountry", expr("ORIGIN_COUNTRY_NAME == DEST_COUNTRY_NAME"))
.show(2)
//rename
df.withColumn("Destination", expr("DEST_COUNTRY_NAME")).columns

重命名列

df.withColumnRenamed("DEST_COUNTRY_NAME", "dest").columns

保留字符和关键字

withColumn不用特殊处理
selectExpr需要加上`

dfWithLongColumnName = df.withColumn("this is a long column", expr("ORIGIN_COUNTRY_NAME"))
dfWithLongColumnName.selectExpr("`this is a long column` as `long column`").show(2)

大小写敏感性

默认spark大小写不敏感,可以通过配置开启大小写敏感

set spark.sql.caseSensitive true

删除列

df.drop("ORIGIN_COUNTRY_NAME").columns

改变列的类型cast

df.withColumn("count2", col("count").cast("long"))
-- in SQL
SELECT *, cast(count as long) AS count2 FROM dfTabl

过滤行

wherefilter有一样的过滤功能

df.filter(col("count") < 2).sho
df.where("count < 2").show(2)

spark自动在同一时间执行所有过滤操作,而不管过滤器的顺序如何。这意味着,如果您想指定多个过滤器,只需按顺序将它们链接起来,其余的由Spark处理

df.where(col("count") < 2).where(col("ORIGIN_COUNTRY_NAME") != "Croatia")\
.show(2)

Unique

df.select("ORIGIN_COUNTRY_NAME", "DEST_COUNTRY_NAME").distinct().count()
-- in SQL
SELECT COUNT(DISTINCT(ORIGIN_COUNTRY_NAME, DEST_COUNTRY_NAME)) FROM dfTable

随机分片

dataFrames = df.randomSplit([0.25, 0.75], seed)
dataFrames[0].count() > dataFrames[1].count() # False

Union

from pyspark.sql import Row
schema = df.schema
newRows = [
Row("New Country", "Other Country", 5L),
Row("New Country 2", "Other Country 3", 1L)
]
parallelizedRows = spark.sparkContext.parallelize(newRows)
newDF = spark.createDataFrame(parallelizedRows, schema)

df.union(newDF)\
.where("count = 1")\
.where(col("ORIGIN_COUNTRY_NAME") != "United States")\
.show()

sort

sortorderBy 有相同的排序功能

df.sort("count").show(5)
df.orderBy("count", "DEST_COUNTRY_NAME").show(5)
df.orderBy(col("count"), col("DEST_COUNTRY_NAME")).show(5)

from pyspark.sql.functions import desc, asc
df.orderBy(expr("count desc")).show(2)
df.orderBy(col("count").desc(), col("DEST_COUNTRY_NAME").asc()).show(2)
-- in SQL
SELECT * FROM dfTable ORDER BY count DESC, DEST_COUNTRY_NAME ASC LIMIT 2

一个高级技巧是使用asc_nulls_firstdesc_nulls_first、asc_nulls_lastdesc_nulls_last来指定希望DataFrame空值按顺序出现在哪里

出于优化目的,有时建议先对每个分区排序然后执行之后的transformations

spark.read.format("json").load("/data/flight-data/json/*-summary.json")\
.sortWithinPartitions("count")

Limit

df.limit(5).show()
-- in SQL
SELECT * FROM dfTable LIMIT 6

df.orderBy(expr("count desc")).limit(6).show()
-- in SQL
SELECT * FROM dfTable ORDER BY count desc LIMIT 6

重新分区和合并

Sparkdriver程序中维护集群的状态。有时您需要向driver程序收集一些数据,以便在本地机器上对其进行操作
到目前为止,我们还没有明确定义这个操作。然而,我们使用了几种不同的方法来实现这一点,它们实际上都是一样的
collect从整个DataFrame获取所有数据,take获取前N行,show以表格样式打印

collectDF = df.limit(10)
collectDF.take(5) # take works with an Integer count
collectDF.show() # this prints it out nicely
collectDF.show(5, False)
collectDF.collect()
collectDF.toLocalIterator()
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,340评论 5 467
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,762评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,329评论 0 329
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,678评论 1 270
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,583评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 47,995评论 1 275
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,493评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,145评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,293评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,250评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,267评论 1 328
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,973评论 3 316
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,556评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,648评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,873评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,257评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,809评论 2 339

推荐阅读更多精彩内容