from pyspark.sql import SparkSession
from pyspark.sql import HiveContext
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import *
from pyspark.sql.window import Window
from pyspark import StorageLevel
import uuid
import pandas as pd
random_name = str(uuid.uuid1()).replace('-', '_')
spark_app_name = 'just_a _test' + random_name
spark = SparkSession.builder \
.config('spark.executor.memory', '10g') \
.config('spark.executor.cores', '6') \
.config('spark.executor.instances', '70') \
.config('spark.driver.memory', '8g') \
.config('spark.driver.maxResultSize', '10g') \
.config('spark.rpc.message.maxSize', '256') \
.config('spark.default.parallelism', '2000') \
.config("spark.dynamicAllocation.enabled", "true") \
.config("spark.dynamicAllocation.executorIdleTimeout", "300s") \
.config('spark.driver.extraJavaOptions', '-Xss10m') \
.config('spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation', 'true') \
.config('spark.sql.hive.convertMetastoreOrc', 'true') \
.config('spark.sql.crossJoin.enabled', 'true') \
.config('spark.dynamicAllocation.maxExecutors', '300') \
.appName(spark_app_name) \
.enableHiveSupport().getOrCreate()
# spark.executor.instances:执行任务使用多少个 executor 实例
# spark.driver.maxResultSize : 过大可能会造成内存溢出
# spark.rpc.message.maxSize : 节点间通信的最大值
# spark.default.parallelism :task数量(多少个线程)
# spark.sql.sources.default :默认数据源
# spark.dynamicAllocation.enabled:动态资源配置
# spark.driver.egtraJavaOptions :配置driver的jvm参数,垃圾回收,gss10m表示为虚拟机开辟了大小为10m大小的空间
# spark.sql.legacy.allowCreatingManagedTableUsingNonemptyLocation:解决写入Hive表时,表已经删除但依然无法写入问题
# spark.sql.hive.convertMetastoreOrc:解决hive和spark元数据混乱无法读表的问题
sc = spark.sparkContext
hiveCtx = HiveContext(sc)
print('spark.version:', spark.version) # 2.4.4
print('application id:', sc.applicationId) # application_1627462564975_68133043
# 读取数据
pt = '20221011'
df = spark.sql(f"""
select
can2_id,
city_name,
cast(lng as bigint) as lng,
cast(lat as bigint) as lat
from
XXXX
where
pt = '{pt}'
limit 10
""").cache() # cache和persist类似,persist可选择存储等级;persist() + show()不能缓存全部数据
# .persist(StorageLevel.MEMORY_ONLY_SER) ,MEMORY_ONLY_SER序列化之后再在内存存储.对应释放 unpersist()
df.show()
# 基本action操作
df_collect = df.collect() # list
for it in df_collect:
print(it['can2_id'])
df.count() # int: 10
df.describe() # pyspark.sql.dataframe.DataFrame: DataFrame[summary: string, can2_id: string, city_guid: string, city_name: string, can2_address: string, lng: string, lat: string]
df.describe('lat').show()
# +-------+--------------------+---------+-----------------+-----------------+
# |summary| can2_id|city_name| lng| lat|
# +-------+--------------------+---------+-----------------+-----------------+
# | count| 10| 10| 10| 10|
# | mean|1.275945804931907...| null| 115.5| 29.8|
# | stddev|9.647139018018636E19| null|4.006938426723771|4.211096452627668|
# | min|20221011111669149...| 上海市| 109| 22|
# | max|20221011876173328715| 邵阳市| 121| 38|
# +-------+--------------------+---------+-----------------+-----------------+
df.first() # pyspark.sql.types.Row: Row(can2_id='20221011876173328715', city_name='丽水市', lng=120, lat=28)
df.head() # pyspark.sql.types.Row: Row(can2_id='20221011876173328715', city_name='丽水市', lng=120, lat=28)
# 基本描述
df.columns # list : ['can2_id', 'city_name', 'lng', 'lat']
df.dtypes # list: [('can2_id', 'string'),('city_name', 'string'),('lng', 'bigint'),('lat', 'bigint')],describe返回的 lng 是 string !!?
df.printSchema() # lng 变成long了!!?
df.describe()
# root
# |-- can2_id: string (nullable = true)
# |-- city_name: string (nullable = true)
# |-- lng: long (nullable = true)
# |-- lat: long (nullable = true)
df.explain('true') # 执行计划流程,调优用,和UI界面的流程图类似
df.createOrReplaceGlobalTempView('tmp_table_view') # registerTempTable 2.0已弃用,替用版本
#创建
# 一行一行
schema = StructType([StructField('id',LongType(),True),
StructField('name',StringType(),True),
StructField('age',IntegerType(),True)])
rdd = sc.parallelize([(1,'Karal',19),(2,'Bob',18)])
data = rdd.toDF(['id','name','age'])
#
# +---+-----+---+
# | id| name|age|
# +---+-----+---+
# | 1|Karal| 19|
# | 2| Bob| 18|
# +---+-----+---+
data = spark.createDataFrame(rdd, schema = schema)
# +---+-----+---+
# | id| name|age|
# +---+-----+---+
# | 1|Karal| 19|
# | 2| Bob| 18|
# +---+-----+---+
# 一列一列
ids = [1,2,3]
grades = [3,3,1]
name = ['Karal','Bob','Lily']
age = [19,18,15]
data = spark.createDataFrame(pd.DataFrame({'id':ids,'name':name,'age':age,'grade':grades}))
data.show()
# 一列一列
ids = [5,2,2]
grades = [2,3,3]
name = ['Maria','Bob','Bob']
age = [17,18,18]
data2 = spark.createDataFrame(pd.DataFrame({'id':ids,'name':name,'age':age,'grade':grades}))
data2.show()
# 集成查询
data.groupBy('grade').agg(F.max('age').alias('max_age'),F.max('id').alias('max_id')).show()
# 自定义UDAF1:先聚合成列表再求 2. 通过pandas_udf
@udf(returnType = IntegerType())
def udf_max(l1):
return max(l1)
# 自定义UDAF1:先聚合成列表再求 2. 通过pandas_udf
# UDF的定义:1.@udf的方式spark外不能单独使用 2. F.udf()的方式要用的时候需要
@udf(returnType = IntegerType())
def udf_max(l1):
return max(l1)
# udf_max_new = F.udf(udf_max, IntegerType())
data.groupBy('grade').agg(F.collect_list('age').alias('list_info')).withColumn('max_age',udf_max('list_info'))
# sql中的UDF函数
def my_add(a):
return a+1
spark.udf.register('udf_add',my_add,IntegerType())
data.createOrReplaceTempView('data_view')
sqldf = spark.sql("""
select
*,
udf_add(age) as age_new
from data_view
""")
sqldf.show()
# +---+-----+---+-----+-------+
# | id| name|age|grade|age_new|
# +---+-----+---+-----+-------+
# | 1|Karal| 19| 3| 20|
# | 2| Bob| 18| 3| 19|
# | 3| Lily| 15| 1| 16|
# +---+-----+---+-----+-------+
data.withColumn('gender',F.lit('female')).withColumn('grade_new',F.col('grade')).withColumnRenamed('grade','grade_old').show()
# +---+-----+---+---------+------+---------+
# | id| name|age|grade_old|gender|grade_new|
# +---+-----+---+---------+------+---------+
# | 1|Karal| 19| 3|female| 3|
# | 2| Bob| 18| 3|female| 3|
# | 3| Lily| 15| 1|female| 1|
# +---+-----+---+---------+------+---------+
data.join(data2,on='id',how='left').show()
# +---+-----+---+-----+----+----+-----+
# | id| name|age|grade|name| age|grade|
# +---+-----+---+-----+----+----+-----+
# | 1|Karal| 19| 3|null|null| null|
# | 3| Lily| 15| 1|null|null| null|
# | 2| Bob| 18| 3| Bob| 18| 3|
# | 2| Bob| 18| 3| Bob| 18| 3|
# +---+-----+---+-----+----+----+-----+
data.union(data2).show()
# +---+-----+---+-----+
# | id| name|age|grade|
# +---+-----+---+-----+
# | 1|Karal| 19| 3|
# | 2| Bob| 18| 3|
# | 3| Lily| 15| 1|
# | 5|Maria| 17| 2|
# | 2| Bob| 18| 3|
# | 2| Bob| 18| 3|
# +---+-----+---+-----+
data.where((F.col('age')>=18) & (F.col('age')<19)).show()
data.filter("age == 18").show()
data.withColumn('age_string',F.col('age').cast('string')).show()
data.drop('grade').show()
data.withColumn('is_adult',F.when(F.col("age")<18,0).when(F.col("age")==18,1).otherwise(2)).show()
data.dropDuplicates()
data.withColumn("rank_age",F.row_number().over(Window.partitionBy('grade').orderBy(F.col("age").desc()))).show()
# 聚合与拆分
data.withColumn('info_arr',F.array(['id','name','age','grade'])).show()
data_concat = data.withColumn('info_concat',F.concat_ws('#','id','name','age')).show()
# +---+-----+---+-----+-----------------+
# | id| name|age|grade| info_arr|
# +---+-----+---+-----+-----------------+
# | 1|Karal| 19| 3|[1, Karal, 19, 3]|
# | 2| Bob| 18| 3| [2, Bob, 18, 3]|
# | 3| Lily| 15| 1| [3, Lily, 15, 1]|
# +---+-----+---+-----+-----------------+
# +---+-----+---+-----+-----------+
# | id| name|age|grade|info_concat|
# +---+-----+---+-----+-----------+
# | 1|Karal| 19| 3| 1#Karal#19|
# | 2| Bob| 18| 3| 2#Bob#18|
# | 3| Lily| 15| 1| 3#Lily#15|
# +---+-----+---+-----+-----------+
data_concat = data.withColumn('info_concat',F.concat_ws('#','id','name','age'))
data_concat.withColumn('info_explode_id',F.explode(F.split('info_concat','#'))).show()
# +---+-----+---+-----+-----------+---------------+
# | id| name|age|grade|info_concat|info_explode_id|
# +---+-----+---+-----+-----------+---------------+
# | 1|Karal| 19| 3| 1#Karal#19| 1|
# | 1|Karal| 19| 3| 1#Karal#19| Karal|
# | 1|Karal| 19| 3| 1#Karal#19| 19|
# | 2| Bob| 18| 3| 2#Bob#18| 2|
# | 2| Bob| 18| 3| 2#Bob#18| Bob|
# | 2| Bob| 18| 3| 2#Bob#18| 18|
# | 3| Lily| 15| 1| 3#Lily#15| 3|
# | 3| Lily| 15| 1| 3#Lily#15| Lily|
# | 3| Lily| 15| 1| 3#Lily#15| 15|
# +---+-----+---+-----+-----------+---------------+