pySpark API实操(3)

#if run in windows use this

import findspark

findspark.init()

# import

from pyspark import SparkContext

from pyspark.sql import SQLContext

from pyspark.sql import SparkSession

import IPython

# #version

# print("pyspark version:" + str(sc.version))

# print("Ipython version:" + str(IPython.__version__))

# #agg 聚合

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.agg({"amt": "avg"})

# x.show()

# y.show()

# # alias 返回这个列的新的别名或别名们

# from pyspark.sql.functions import col

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#    [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.alias('transactions')

# x.show()

# y.show()

# y.select(col("transactions.to")).show()

# # cache

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#    [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# x.cache()

# print(x.count())  # first action materializes x in memory

# print(x.count())  # later actions avoid IO overhead

# # coalesce 重分区函数

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x_rdd = sc.parallelize(

#    [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], 2)

# x = sqlContext.createDataFrame(x_rdd, ['from', 'to', 'amt'])

# y = x.coalesce(numPartitions=1)

# print(x.rdd.getNumPartitions())

# print(y.rdd.getNumPartitions())

# # collect

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#    [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.collect()  # creates list of rows on driver

# x.show()

# print(y)

# # columns

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#    [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.columns  # creates list of column names on driver

# x.show()

# print(y)

# # # corr

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1, 0.001), (

#    "Bob", "Carol", 0.2, 0.02), ("Carol", "Dave", 0.3, 0.02)], ['from', 'to', 'amt', 'fee'])

# y = x.corr(col1="amt", col2="fee")

# x.show()

# print(y)

# # count

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame(

#    [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# x.show()

# print(x.count())

# # cov

# sc = SparkContext('local')

# sqlContext = SQLContext(sc)

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1, 0.001), (

#    "Bob", "Carol", 0.2, 0.02), ("Carol", "Dave", 0.3, 0.02)], ['from', 'to', 'amt', 'fee'])

# y = x.cov(col1="amt", col2="fee")

# x.show()

# print(y)

sc = SparkContext('local')

sqlContext = SQLContext(sc)

# # crosstab

# x = sqlContext.createDataFrame(

#    [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.crosstab(col1='from', col2='to')

# x.show()

# y.show()

# # cube

# x = sqlContext.createDataFrame(

#    [("Alice", "Bob", 0.1), ("Alice", "Carol", 0.2)], ['from', 'to', 'amt'])

# y = x.cube('from', 'to')

# x.show()

# print(y)  # y is a grouped data object, aggregations will be applied to all numerical columns

# y.sum().show()

# y.max().show()

# # describe

'''计算数值列的统计信息。

包括计数,平均,标准差,最小和最大。如果没有指定任何列,这个函数计算统计所有数值列'''

# x = sqlContext.createDataFrame(

#    [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# x.show()

# x.describe().show()

# # distinct 返回行去重的新的DataFrame。

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), (

#    "Carol", "Dave", 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])

# y = x.distinct()

# x.show()

# y.show()

# # drop

# '''

# 返回删除指定列的新的DataFrame。

# 参数:●  col – 要删除列的字符串类型名称,或者要删除的列。

# '''

# x = sqlContext.createDataFrame(

#    [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.drop('amt')

# x.show()

# y.show()

# # dropDuplicates / drop_duplicates

# '''

# 返回去掉重复行的一个新的DataFrame,通常只考虑某几列。

# drop_duplicates()和dropDuplicates()类似。

# '''

# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), (

#    "Bob", "Carol", 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])

# y = x.dropDuplicates(subset=['from', 'to'])

# x.show()

# y.show()

# # dropna

# '''

# 返回一个删除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()类似。

# 参数:●  how – 'any'或者'all'。如果'any',删除包含任何空值的行。如果'all',删除所有值为null的行。

#    ●  thresh – int,默认为None,如果指定这个值,删除小于阈值的非空值的行。这个会重写'how'参数。

#    ●  subset – 选择的列名称列表。

# '''

# x = sqlContext.createDataFrame([(None, "Bob", 0.1), ("Bob", "Carol", None), (

#    "Carol", None, 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])

# y = x.dropna(how='any', subset=['from', 'to'])

# x.show()

# y.show()

# # dtypes

# '''

# 返回所有列名及类型的列表。

# '''

# x = sqlContext.createDataFrame(

#    [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.dtypes

# x.show()

# print(y)

# # explain

# '''

# 将(逻辑和物理)计划打印到控制台以进行调试。

# 参数:●  extended – boolean类型,默认为False。如果为False,只打印物理计划。

# '''

# x = sqlContext.createDataFrame(

#    [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# x.show()

# x.agg({"amt": "avg"}).explain(extended=True)

# # fillna

# '''

# 替换空值,和na.fill()类似,DataFrame.fillna()和dataframenafunctions.fill()类似。

# 参数:●  value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset参数将被忽略。值必须是要替换的列的映射,替换值必须是int,long,float或者string.

#      ●  subset - 要替换的列名列表。在subset指定的列,没有对应数据类型的会被忽略。例如,如果值是字符串,subset包含一个非字符串的列,这个非字符串的值会被忽略。

# '''

# x = sqlContext.createDataFrame(

#    [(None, "Bob", 0.1), ("Bob", "Carol", None), ("Carol", None, 0.3)], ['from', 'to', 'amt'])

# y = x.fillna(value='unknown', subset=['from', 'to'])

# x.show()

# y.show()

# # filter

# x = sqlContext.createDataFrame(

#    [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.filter("amt > 0.1")

# x.show()

# y.show()

# # first

# x = sqlContext.createDataFrame(

#    [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

# y = x.first()

# x.show()

# print(y)

# flatMap

'''

返回在每行应用F函数后的新的RDD,然后将结果压扁。

是df.rdd.flatMap()的简写。

'''

x = sqlContext.createDataFrame(

    [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])

y = x.flatMap(lambda x: (x[0], x[2]))

print(y)  # implicit coversion to RDD

y.collect()

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,056评论 5 474
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 84,842评论 2 378
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 148,938评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,296评论 1 272
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,292评论 5 363
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,413评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,824评论 3 393
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,493评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,686评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,502评论 2 318
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,553评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,281评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,820评论 3 305
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,873评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,109评论 1 258
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,699评论 2 348
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,257评论 2 341

推荐阅读更多精彩内容

  • 案来案去又年终,岁考分低早震惊。 老朽常怀昔日景,青春难与旧时同。 卧思乡下民还苦,醒怒官场路不平。 莫怨红尘乘野...
    飞哥判案阅读 657评论 0 5
  • 罗兴 律师 知名律师 埃孚欧学院 联合创始人、上海律师协会保险业务研究委员会委员、上海律师协会民事法律业务研究会委...
    刘丽敏A阅读 1,911评论 0 0
  • 摄影大多讲究光线和角度,那么,阴天又该如何拍摄? 鲜花和人像的光线要求较高,故不在我的考虑范围,最终决定拍拍山水,...
    木旸阅读 858评论 9 11
  • “喂,你不知道你这样说话会吓死人啊!有病的真是。”夏微微顿时火了。 她最讨厌他们在谈论诡秘事件的时候忽然有人在背后...
    蔷薇下的阳光阅读 245评论 0 1