SparkSQL温习笔记-1

 一、介绍

   Shark是SparkSQL(其完全脱离了 Hive 的限制)的前身,Shark的性能比 MapReduce 的 Hive 普遍快 2 倍以上,当数据全部 load 在内存的话,将快 10 倍以上,因此 Shark 可以作为交互式查询应用服务来使用。

    SparkSQL具有很多特性(官网):

    1.Integrated 

    Seamlessly mix SQL and Spark programs,Apply functions to results of SQL queries.我的理解是SQL能和RDD完美结合使用,可视为RDD一样去操作,各种算子可以直接使用。

    2.Uniform Data Access -- Connect to any data source the same way.

    整合各种数据比较方便,读取json的数据注册成表,读取hive的表,两张表可以直接join,其实底层都是转换成RDD去操作。

     DataFrames and SQL provide a common way to access a variety of data sources, including Hive, Avro, Parquet, ORC, JSON, and JDBC. You can even join data across these sources.

    常见的是数据源Hive(ETL后的数据),JSON,JDBC(mysql等存放分析参数),还可以读取hdfs,S3(Amazon),Postgresql(gp、hawq直接读取表?还是先放在hdfs中,有空调研下)。

    3.Hive Integration 和hive兼容性很好,很好配置就可以读取hive metaStore

    Spark core的算子拼接的操作,可以用Sparksql去实现。。有时候会sql实现会更简单一些。

    DataFrame存储是列式存储,不需要的字段不加载到内存中,这样查询、聚合速度快。

二、创建 DataFrame

    1.动态创建 schema (scala/python)

--scala

    val sc = new SparkContext(conf)

    val sqlContext = new SQLContext(sc)

    val people = sc.textFile("scores.txt")

    val schemaString = "cla:String sc:Integer"

    //如果schema中制定了除String以外别的类型  在构建rowRDD的时候要注意指定类型    例如: p(2).toInt

    val rowRDD = people.map(_.split("\t")).map(p => Row(p(0), p(1).toInt))

    val schema =

      StructType(schemaString.split(" ").map(fieldName => StructField(fieldName.split(":")(0), if (fieldName.split(":")(1).equals("String")) StringType else IntegerType, true)))

//    val structFields = Array(StructField("clazz",StringType,true),StructField("score",IntegerType))

//    val schema = StructType(structFields)

    //  val arr = Array(StructField("name",StringType,true),StructField("age",IntegerType,true))

    //  val schema = StructType.apply(arr)

    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    peopleDataFrame.printSchema()

    peopleDataFrame.show()

--python

sqlContext = SQLContext(sc)

#读取hdfs

data = sc.textFile("/user/nixm/czrk_data.txt")

#切分字符串

data_noheader = data.map(lambda x:x.split(";"))

#创建schema

schema = StructType([StructField("hh",StringType(),True),

StructField("sfzjhm",StringType(),True),

StructField("yhzgx",StringType(),True)

])

#创建DF

df = sqlContext.createDataFrame(data_noheader,schema)

#DF注册成临时表

df.registerTempTable('test')

#执行sql(返回DF)并持久化

df_dis = sqlContext.sql(sqlQuery="select hh,sfzjhm from test where yhzgx='户主' group by hh,sfzjhm").cache()

#将读取的结果(DF)注册成临时表

df_dis.registerTempTable('df_dis')

#执行sql并持久化

df_all= sqlContext.sql(sqlQuery="select * from test where yhzgx!='户主'").cache()

#注册成临时表

df_all.registerTempTable('df_all')

#表与表的join

result=sqlContext.sql(sqlQuery="select a.sfzjhm as a_sfzjhm, a.yhzgx, b.sfzjhm as b_sfzjhm from df_all a inner join df_dis b on a.hh=b.hh ").cache()

#对dataFrame使用map算子后,返回类型是RDD

hdfsRDD=result.rdd.map(lambda p:p. a_sfzjhm+";"+p.yhzgx+";"+p.b_sfzjhm)

#重新分区,将DF最终存放到hdfs

hdfsRDD.repartition(1).saveAsTextFile("/user/nixm/jlout/czrk.txt")


    2.通过反射将RDD转换成DF[比较死板]

peoples.txt     1,lucy,18    /n    2,jim,11

主要代码

case class Person(name:String, age: Int)

def main(args: Array[String]): Unit = {

val conf =new SparkConf()//创建sparkConf对象

conf.setAppName("Spark App")//设置应用程序的名称

conf.setMaster("local")

val sqlContext =new SQLContext(sc)

import sqlContext.implicits._ //隐式转换

//使用反射方法将RDD转换成DF

val people = sc.textFile("peoples.txt").map(_.split(",")).map(p =>Person(p(1), p(2).trim.toInt)).toDF()

people.registerTempTable("people") //注册成临时表

//执行sql查询

val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 6 AND age <= 19")

//对dataFrame使用map算子后,返回类型是RDD

teenagers.map(t =>"Name: " + t(0)).foreach(println)

// or by field name:

teenagers.map(t =>"Name: " + t.getAs[String]("name")).foreach(println)

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,636评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,890评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,680评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,766评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,665评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,045评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,515评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,182评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,334评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,274评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,319评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,002评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,599评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,675评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,917评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,309评论 2 345
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,885评论 2 341

推荐阅读更多精彩内容