#if run in windows use this
import findspark
findspark.init()
# import
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import SparkSession
import IPython
# #version
# print("pyspark version:" + str(sc.version))
# print("Ipython version:" + str(IPython.__version__))
# #agg 聚合
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.agg({"amt": "avg"})
# x.show()
# y.show()
# # alias 返回这个列的新的别名或别名们
# from pyspark.sql.functions import col
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
# [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.alias('transactions')
# x.show()
# y.show()
# y.select(col("transactions.to")).show()
# # cache
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
# [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# x.cache()
# print(x.count()) # first action materializes x in memory
# print(x.count()) # later actions avoid IO overhead
# # coalesce 重分区函数
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x_rdd = sc.parallelize(
# [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], 2)
# x = sqlContext.createDataFrame(x_rdd, ['from', 'to', 'amt'])
# y = x.coalesce(numPartitions=1)
# print(x.rdd.getNumPartitions())
# print(y.rdd.getNumPartitions())
# # collect
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
# [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.collect() # creates list of rows on driver
# x.show()
# print(y)
# # columns
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
# [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.columns # creates list of column names on driver
# x.show()
# print(y)
# # # corr
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1, 0.001), (
# "Bob", "Carol", 0.2, 0.02), ("Carol", "Dave", 0.3, 0.02)], ['from', 'to', 'amt', 'fee'])
# y = x.corr(col1="amt", col2="fee")
# x.show()
# print(y)
# # count
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame(
# [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# x.show()
# print(x.count())
# # cov
# sc = SparkContext('local')
# sqlContext = SQLContext(sc)
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1, 0.001), (
# "Bob", "Carol", 0.2, 0.02), ("Carol", "Dave", 0.3, 0.02)], ['from', 'to', 'amt', 'fee'])
# y = x.cov(col1="amt", col2="fee")
# x.show()
# print(y)
sc = SparkContext('local')
sqlContext = SQLContext(sc)
# # crosstab
# x = sqlContext.createDataFrame(
# [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.crosstab(col1='from', col2='to')
# x.show()
# y.show()
# # cube
# x = sqlContext.createDataFrame(
# [("Alice", "Bob", 0.1), ("Alice", "Carol", 0.2)], ['from', 'to', 'amt'])
# y = x.cube('from', 'to')
# x.show()
# print(y) # y is a grouped data object, aggregations will be applied to all numerical columns
# y.sum().show()
# y.max().show()
# # describe
'''计算数值列的统计信息。
包括计数,平均,标准差,最小和最大。如果没有指定任何列,这个函数计算统计所有数值列'''
# x = sqlContext.createDataFrame(
# [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# x.show()
# x.describe().show()
# # distinct 返回行去重的新的DataFrame。
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), (
# "Carol", "Dave", 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])
# y = x.distinct()
# x.show()
# y.show()
# # drop
# '''
# 返回删除指定列的新的DataFrame。
# 参数:● col – 要删除列的字符串类型名称,或者要删除的列。
# '''
# x = sqlContext.createDataFrame(
# [("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.drop('amt')
# x.show()
# y.show()
# # dropDuplicates / drop_duplicates
# '''
# 返回去掉重复行的一个新的DataFrame,通常只考虑某几列。
# drop_duplicates()和dropDuplicates()类似。
# '''
# x = sqlContext.createDataFrame([("Alice", "Bob", 0.1), ("Bob", "Carol", 0.2), (
# "Bob", "Carol", 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])
# y = x.dropDuplicates(subset=['from', 'to'])
# x.show()
# y.show()
# # dropna
# '''
# 返回一个删除null值行的新的DataFrame。dropna()和dataframenafunctions.drop()类似。
# 参数:● how – 'any'或者'all'。如果'any',删除包含任何空值的行。如果'all',删除所有值为null的行。
# ● thresh – int,默认为None,如果指定这个值,删除小于阈值的非空值的行。这个会重写'how'参数。
# ● subset – 选择的列名称列表。
# '''
# x = sqlContext.createDataFrame([(None, "Bob", 0.1), ("Bob", "Carol", None), (
# "Carol", None, 0.3), ("Bob", "Carol", 0.2)], ['from', 'to', 'amt'])
# y = x.dropna(how='any', subset=['from', 'to'])
# x.show()
# y.show()
# # dtypes
# '''
# 返回所有列名及类型的列表。
# '''
# x = sqlContext.createDataFrame(
# [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.dtypes
# x.show()
# print(y)
# # explain
# '''
# 将(逻辑和物理)计划打印到控制台以进行调试。
# 参数:● extended – boolean类型,默认为False。如果为False,只打印物理计划。
# '''
# x = sqlContext.createDataFrame(
# [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# x.show()
# x.agg({"amt": "avg"}).explain(extended=True)
# # fillna
# '''
# 替换空值,和na.fill()类似,DataFrame.fillna()和dataframenafunctions.fill()类似。
# 参数:● value - 要代替空值的值有int,long,float,string或dict.如果值是字典,subset参数将被忽略。值必须是要替换的列的映射,替换值必须是int,long,float或者string.
# ● subset - 要替换的列名列表。在subset指定的列,没有对应数据类型的会被忽略。例如,如果值是字符串,subset包含一个非字符串的列,这个非字符串的值会被忽略。
# '''
# x = sqlContext.createDataFrame(
# [(None, "Bob", 0.1), ("Bob", "Carol", None), ("Carol", None, 0.3)], ['from', 'to', 'amt'])
# y = x.fillna(value='unknown', subset=['from', 'to'])
# x.show()
# y.show()
# # filter
# x = sqlContext.createDataFrame(
# [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.filter("amt > 0.1")
# x.show()
# y.show()
# # first
# x = sqlContext.createDataFrame(
# [('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
# y = x.first()
# x.show()
# print(y)
# flatMap
'''
返回在每行应用F函数后的新的RDD,然后将结果压扁。
是df.rdd.flatMap()的简写。
'''
x = sqlContext.createDataFrame(
[('Alice', "Bob", 0.1), ("Bob", "Carol", 0.2), ("Carol", "Dave", 0.3)], ['from', 'to', 'amt'])
y = x.flatMap(lambda x: (x[0], x[2]))
print(y) # implicit coversion to RDD
y.collect()