spark配置与dataframe常用操作

from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark import StorageLevel
import uuid
import pandas as pd

random_name = str(uuid.uuid1()).replace('-', '_')
spark_app_name = 'just_a _test' + random_name

spark = SparkSession.builder \
    .config('spark.executor.memory', '10g') \
    .config('spark.executor.cores', '6') \
    .config('spark.executor.instances', '70') \
    .config('spark.driver.memory', '8g') \
    .config('spark.driver.maxResultSize', '10g') \
    .config('spark.rpc.message.maxSize', '256') \
    .config('spark.default.parallelism', '2000') \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.dynamicAllocation.executorIdleTimeout", "300s") \
    .config('spark.driver.extraJavaOptions', '-Xss10m') \
    .config('spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation', 'true') \
    .config('spark.sql.hive.convertMetastoreOrc', 'true') \
    .config('spark.sql.crossJoin.enabled', 'true') \
    .config('spark.dynamicAllocation.maxExecutors', '300') \
    .appName(spark_app_name) \
    .enableHiveSupport().getOrCreate()

# spark.executor.instances:执行任务使用多少个 executor 实例
# spark.driver.maxResultSize : 过大可能会造成内存溢出
# spark.rpc.message.maxSize : 节点间通信的最大值
# spark.default.parallelism :task数量(多少个线程)
# spark.sql.sources.default :默认数据源
# spark.dynamicAllocation.enabled:动态资源配置
# spark.driver.egtraJavaOptions :配置driver的jvm参数,垃圾回收,gss10m表示为虚拟机开辟了大小为10m大小的空间
# spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation:解决写入Hive表时,表已经删除但依然无法写入问题
# spark.sql.hive.convertMetastoreOrc:解决hive和spark元数据混乱无法读表的问题

sc = spark.sparkContext
hiveCtx = HiveContext(sc)
print('spark.version:', spark.version)   # 2.4.4
print('application id:', sc.applicationId)  # application_1627462564975_68133043
# 读取数据
pt = '20221011'
df = spark.sql(f"""   
    select
      can2_id,
      city_name,
      cast(lng as bigint) as lng,
      cast(lat as bigint) as lat
    from
      XXXX
    where
      pt = '{pt}'  
    limit 10
 """).cache() # cache和persist类似,persist可选择存储等级;persist() + show()不能缓存全部数据
# .persist(StorageLevel.MEMORY_ONLY_SER) ,MEMORY_ONLY_SER序列化之后再在内存存储.对应释放 unpersist() 
df.show()
# 基本action操作
df_collect = df.collect() # list
for it in df_collect:
    print(it['can2_id'])
    
df.count() # int: 10
df.describe() # pyspark.sql.dataframe.DataFrame:  DataFrame[summary: string, can2_id: string, city_guid: string, city_name: string, can2_address: string, lng: string, lat: string]
df.describe('lat').show() 
# +-------+--------------------+---------+-----------------+-----------------+
# |summary|             can2_id|city_name|              lng|              lat|
# +-------+--------------------+---------+-----------------+-----------------+
# |  count|                  10|       10|               10|               10|
# |   mean|1.275945804931907...|     null|            115.5|             29.8|
# | stddev|9.647139018018636E19|     null|4.006938426723771|4.211096452627668|
# |    min|20221011111669149...|   上海市|              109|               22|
# |    max|20221011876173328715|   邵阳市|              121|               38|
# +-------+--------------------+---------+-----------------+-----------------+

df.first() # pyspark.sql.types.Row: Row(can2_id='20221011876173328715', city_name='丽水市', lng=120, lat=28)
df.head() # pyspark.sql.types.Row: Row(can2_id='20221011876173328715', city_name='丽水市', lng=120, lat=28)

# 基本描述
df.columns  # list : ['can2_id', 'city_name', 'lng', 'lat']
df.dtypes # list: [('can2_id', 'string'),('city_name', 'string'),('lng', 'bigint'),('lat', 'bigint')],describe返回的 lng 是 string !!?
df.printSchema() # lng 变成long了!!?
df.describe()
# root
#  |-- can2_id: string (nullable = true)
#  |-- city_name: string (nullable = true)
#  |-- lng: long (nullable = true)
#  |-- lat: long (nullable = true)

df.explain('true') # 执行计划流程,调优用,和UI界面的流程图类似
df.createOrReplaceGlobalTempView('tmp_table_view')   # registerTempTable 2.0已弃用,替用版本

#创建
# 一行一行
schema = StructType([StructField('id',LongType(),True),
                    StructField('name',StringType(),True),
                    StructField('age',IntegerType(),True)])
rdd = sc.parallelize([(1,'Karal',19),(2,'Bob',18)])
data = rdd.toDF(['id','name','age'])
# 
# +---+-----+---+
# | id| name|age|
# +---+-----+---+
# |  1|Karal| 19|
# |  2|  Bob| 18|
# +---+-----+---+
data = spark.createDataFrame(rdd, schema = schema)

# +---+-----+---+
# | id| name|age|
# +---+-----+---+
# |  1|Karal| 19|
# |  2|  Bob| 18|
# +---+-----+---+

# 一列一列
ids = [1,2,3]
grades = [3,3,1]
name = ['Karal','Bob','Lily']
age = [19,18,15]
data = spark.createDataFrame(pd.DataFrame({'id':ids,'name':name,'age':age,'grade':grades}))
data.show()

# 一列一列
ids = [5,2,2]
grades = [2,3,3]
name = ['Maria','Bob','Bob']
age = [17,18,18]
data2 = spark.createDataFrame(pd.DataFrame({'id':ids,'name':name,'age':age,'grade':grades}))
data2.show()

# 集成查询
data.groupBy('grade').agg(F.max('age').alias('max_age'),F.max('id').alias('max_id')).show()

# 自定义UDAF1:先聚合成列表再求 2. 通过pandas_udf
@udf(returnType = IntegerType())
def udf_max(l1):
    return max(l1)


# 自定义UDAF1:先聚合成列表再求 2. 通过pandas_udf
# UDF的定义:1.@udf的方式spark外不能单独使用 2. F.udf()的方式要用的时候需要
@udf(returnType = IntegerType())
def udf_max(l1):
    return max(l1)
# udf_max_new = F.udf(udf_max, IntegerType())

data.groupBy('grade').agg(F.collect_list('age').alias('list_info')).withColumn('max_age',udf_max('list_info'))

# sql中的UDF函数
def my_add(a):
    return a+1
spark.udf.register('udf_add',my_add,IntegerType())

data.createOrReplaceTempView('data_view')
sqldf = spark.sql("""
    select 
    *,
    udf_add(age) as age_new
    from data_view
""")
sqldf.show()
# +---+-----+---+-----+-------+
# | id| name|age|grade|age_new|
# +---+-----+---+-----+-------+
# |  1|Karal| 19|    3|     20|
# |  2|  Bob| 18|    3|     19|
# |  3| Lily| 15|    1|     16|
# +---+-----+---+-----+-------+


data.withColumn('gender',F.lit('female')).withColumn('grade_new',F.col('grade')).withColumnRenamed('grade','grade_old').show()
# +---+-----+---+---------+------+---------+
# | id| name|age|grade_old|gender|grade_new|
# +---+-----+---+---------+------+---------+
# |  1|Karal| 19|        3|female|        3|
# |  2|  Bob| 18|        3|female|        3|
# |  3| Lily| 15|        1|female|        1|
# +---+-----+---+---------+------+---------+

data.join(data2,on='id',how='left').show()
# +---+-----+---+-----+----+----+-----+
# | id| name|age|grade|name| age|grade|
# +---+-----+---+-----+----+----+-----+
# |  1|Karal| 19|    3|null|null| null|
# |  3| Lily| 15|    1|null|null| null|
# |  2|  Bob| 18|    3| Bob|  18|    3|
# |  2|  Bob| 18|    3| Bob|  18|    3|
# +---+-----+---+-----+----+----+-----+

data.union(data2).show()
# +---+-----+---+-----+
# | id| name|age|grade|
# +---+-----+---+-----+
# |  1|Karal| 19|    3|
# |  2|  Bob| 18|    3|
# |  3| Lily| 15|    1|
# |  5|Maria| 17|    2|
# |  2|  Bob| 18|    3|
# |  2|  Bob| 18|    3|
# +---+-----+---+-----+


data.where((F.col('age')>=18) & (F.col('age')<19)).show()
data.filter("age == 18").show()
data.withColumn('age_string',F.col('age').cast('string')).show()
data.drop('grade').show()
data.withColumn('is_adult',F.when(F.col("age")<18,0).when(F.col("age")==18,1).otherwise(2)).show()
data.dropDuplicates()
data.withColumn("rank_age",F.row_number().over(Window.partitionBy('grade').orderBy(F.col("age").desc()))).show()

# 聚合与拆分
data.withColumn('info_arr',F.array(['id','name','age','grade'])).show()
data_concat = data.withColumn('info_concat',F.concat_ws('#','id','name','age')).show()
# +---+-----+---+-----+-----------------+
# | id| name|age|grade|         info_arr|
# +---+-----+---+-----+-----------------+
# |  1|Karal| 19|    3|[1, Karal, 19, 3]|
# |  2|  Bob| 18|    3|  [2, Bob, 18, 3]|
# |  3| Lily| 15|    1| [3, Lily, 15, 1]|
# +---+-----+---+-----+-----------------+
# +---+-----+---+-----+-----------+
# | id| name|age|grade|info_concat|
# +---+-----+---+-----+-----------+
# |  1|Karal| 19|    3| 1#Karal#19|
# |  2|  Bob| 18|    3|   2#Bob#18|
# |  3| Lily| 15|    1|  3#Lily#15|
# +---+-----+---+-----+-----------+

data_concat = data.withColumn('info_concat',F.concat_ws('#','id','name','age'))
data_concat.withColumn('info_explode_id',F.explode(F.split('info_concat','#'))).show()

# +---+-----+---+-----+-----------+---------------+
# | id| name|age|grade|info_concat|info_explode_id|
# +---+-----+---+-----+-----------+---------------+
# |  1|Karal| 19|    3| 1#Karal#19|              1|
# |  1|Karal| 19|    3| 1#Karal#19|          Karal|
# |  1|Karal| 19|    3| 1#Karal#19|             19|
# |  2|  Bob| 18|    3|   2#Bob#18|              2|
# |  2|  Bob| 18|    3|   2#Bob#18|            Bob|
# |  2|  Bob| 18|    3|   2#Bob#18|             18|
# |  3| Lily| 15|    1|  3#Lily#15|              3|
# |  3| Lily| 15|    1|  3#Lily#15|           Lily|
# |  3| Lily| 15|    1|  3#Lily#15|             15|
# +---+-----+---+-----+-----------+---------------+
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,529评论 5 475
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,015评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,409评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,385评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,387评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,466评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,880评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,528评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,727评论 1 295
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,528评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,602评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,302评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,873评论 3 306
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,890评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,132评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,777评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,310评论 2 342

推荐阅读更多精彩内容