Spark 基础

Resilient(弹性) Distributed Datasets (RDDs)

Spark revolves(围绕) around the concept of a resilient distributed dataset (RDD), which is a fault-tolerant collection of elements that can be operated on in parallel(并行操作). There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

  • Parallelized Collections

Parallelized collections are created by calling SparkContext’s parallelize method on an existing collection in your driver program (a Scala Seq). The elements of the collection are copied to form a distributed dataset that can be operated on in parallel. For example, here is how to create a parallelized collection holding the numbers 1 to 5:

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)
  • External Datasets

Spark can create distributed datasets from any storage source supported by Hadoop, including your local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

val distFile = sc.textFile("data.txt")

如果使用本地路径,必须把文件复制到集群节点相同路径下或使用共享文件系统

If using a path on the local filesystem, the file must also be accessible at the same path on worker nodes. Either copy the file to all workers or use a network-mounted shared file system.

Spark所有基于文件输入的方法,都支持文本文件、目录、压缩文件和通配符

All of Spark’s file-based input methods, including textFile, support running on directories, compressed files, and wildcards as well. For example, you can use textFile("/my/directory"), textFile("/my/directory/.txt"), and textFile("/my/directory/.gz").

textFile方法采用可选的第二个参数来控制文件的分区数。

The textFile method also takes an optional second argument for controlling the number of partitions of the file. By default, Spark creates one partition for each block of the file (blocks being 128MB by default in HDFS), but you can also ask for a higher number of partitions by passing a larger value. Note that you cannot have fewer partitions than blocks.

RDD Operations

RDD支持两种操作方式:transformations(RDD->RDD)和acitions(RDD->计算结果)

RDDs support two types of operations: transformations, which create a new dataset from an existing one, and actions, which return a value to the driver program after running a computation on the dataset. For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results. On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

所有的transformations操作都是懒加载的,仅当action需要返回值是才执行

All transformations in Spark are lazy, in that they do not compute their results right away. Instead, they just remember the transformations applied to some base dataset (e.g. a file). The transformations are only computed when an action requires a result to be returned to the driver program. This design enables Spark to run more efficiently. For example, we can realize that a dataset created through map will be used in a reduce and return only the result of the reduce to the driver, rather than the larger mapped dataset.

当运行action时RDD的transform默认会重新计算,可以使用缓存方法保存在内存中,也可以在磁盘上持久化或在多个节点复制

By default, each transformed RDD may be recomputed each time you run an action on it. However, you may also persist an RDD in memory using the persist (or cache) method, in which case Spark will keep the elements around on the cluster for much faster access the next time you query it. There is also support for persisting RDDs on disk, or replicated across multiple nodes.

Printing elements of an RDD

在集群模式,使用rdd.foreach(println) 或 rdd.map(println)不会打印数据,可使用rdd.collect().foreach(println),collect会将整个RDD提取到一个机器上,可能会有内存不足的问题,可使用rdd.take(100).foreach(println)替代

Another common idiom is attempting to print out the elements of an RDD using rdd.foreach(println) or rdd.map(println). On a single machine, this will generate the expected output and print all the RDD’s elements. However, in cluster mode, the output to stdout being called by the executors is now writing to the executor’s stdout instead, not the one on the driver, so stdout on the driver won’t show these! To print all elements on the driver, one can use the collect() method to first bring the RDD to the driver node thus: rdd.collect().foreach(println). This can cause the driver to run out of memory, though, because collect() fetches the entire RDD to a single machine; if you only need to print a few elements of the RDD, a safer approach is to use the take(): rdd.take(100).foreach(println).

Shuffle operations

Spark某些操作会触发shuffle,shuffle是Spark重新分发数据的机制,可以跨分区进行不同的分组,通常涉及跨程序和机器复制数据

Certain operations within Spark trigger an event known as the shuffle. The shuffle is Spark’s mechanism for re-distributing data so that it’s grouped differently across partitions. This typically involves copying data across executors and machines, making the shuffle a complex and costly operation.

RDD Persistence

DataFrame

导入scala和spark-sql依赖包

    <!--scala-->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>${scala.version}</version>
    </dependency>

    <!--spark-->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-sql_2.11</artifactId>
      <version>${spark.version}</version>
    </dependency>
package com.test

import org.apache.spark.sql.SparkSession

object DataFrameApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("test DataFrame").master("local[2]").getOrCreate()

    //使用$ 需要导入
    import spark.implicits._

    val df = spark.read.json("hdfs://192.168.247.100:9000/data/spark/people.json")

    //输出dataFrame对应的schema信息
    df.printSchema()

    //输出数据集的记录,默认前20条
    df.show()

    //查询某列的所有数据
    df.select("name","age").show()

    //查询某列的所有数据,并可对列进行计算
    df.select($"name", ($"age"+2).as("age2")).show()

    //根据某一列进行条件guolv
    df.filter($"age">25).show()
    //可使用内置函数,可根据"show functions"的sql进行查询所有的内置函数
    df.filter("SUBSTR(name,0,1)='M'").show()

    //根据某列进行排序
    df.sort($"age".desc,$"name".asc).show()

    //join操作
    val df1 = df
    df.join(df1,df("name")===df1("name"),"inner").show()

    //根据某列进行分组,再进行聚合操作
    df.groupBy("age","name").count().show()


    df.createGlobalTempView("people")
    df.head()
    spark.sql("SELECT * FROM global_temp.people").show()
    spark.newSession().sql("SELECT * FROM global_temp.people").show()

    spark.stop()
  }
}

Interoperating with RDDs
package com.test

import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}

object DataFrameRDDApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("DataFrameRDDApp").master("local[2]").getOrCreate()

    inferReflection(spark)

    program(spark)

    spark.stop()
  }

  private def inferReflection(spark: SparkSession) = {
    //rdd转DF需导入隐式转换
    import spark.implicits._

    //反射方式:使用反射读取case类的参数名称,并成为列的名称,提前知道schema信息
    val rdd = spark.sparkContext.textFile("hdfs://192.168.247.100:9000/data/spark/infos.txt")
    val infoDF = rdd.map(_.split(",")).map(line => Info(line(0).toInt, line(1), line(2).toInt)).toDF()

    infoDF.show()
  }

  private def program(spark: SparkSession) = {

    //编程方式:当不能提前定义case类时
    val rdd = spark.sparkContext.textFile("hdfs://192.168.247.100:9000/data/spark/infos.txt")
    val infoRDD = rdd.map(_.split(",")).map(line => Row(line(0).toInt, line(1), line(2).toInt))
    val structType = StructType(Array(StructField("id",IntegerType,true),
      StructField("name", StringType),
      StructField("age",IntegerType, true)))

    val infoDF = spark.createDataFrame(infoRDD,structType)

    infoDF.printSchema()
    infoDF.show()
  }

  case class Info(Id:Int, name:String, age:Int)
}

DataSet

package com.test

import org.apache.spark.sql.SparkSession

object DataSetApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("test DataFrame").master("local[2]").getOrCreate()

    import spark.implicits._

    //为case类创建编码器
    val caseClassDS = Seq(Person("Andy", 18)).toDS()
    caseClassDS.show()

    //最常见类型的编码器通过导入spark.implicits._自动提供。
    val primitiveDS = Seq(1,2,3).toDS()
    primitiveDS.map(_+1).collect().take(10)
    primitiveDS.show()

    //可以通过提供类将DataFrame转换为数据集。映射将按名称完成
    val df = spark.read.json("hdfs://192.168.247.100:9000/data/spark/people.json")
    val personDS = df.as[Person]
    personDS.show()
    

    //spark解析csv文件
    val csvDF = spark.read.option("header","true").option("inferSchema","true").csv("hdfs://192.168.247.100:9000/data/spark/sales.csv")
    csvDF.show()
  }

  case class Person(name:String, age:Long)
}
Data Sources
  • Parquet Files
package com.test

import org.apache.spark.sql.SparkSession

object ParquetApp {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("test DataFrame").master("local[2]").getOrCreate()

    //spark.format("parquet").load(path),也可直接load默认是parquet格式
    val df = spark.read.parquet("hdfs://192.168.247.100:9000/data/spark/users.parquet")

    df.printSchema()
    df.show()

    //write.format("json").save(path)
    /**
      * def mode(saveMode: String): DataFrameWriter[T] = {
      *     this.mode = saveMode.toLowerCase match {
      * case "overwrite" => SaveMode.Overwrite
      * case "append" => SaveMode.Append
      * case "ignore" => SaveMode.Ignore
      * case "error" | "default" => SaveMode.ErrorIfExists
      * case _ => throw new IllegalArgumentException(s"Unknown save mode: $saveMode. " +
      * "Accepted save modes are 'overwrite', 'append', 'ignore', 'error'.")
      * }
      * this
      * }
      */
    df.select("name","favorite_color").write.mode("overwrite").json("hdfs://192.168.247.100:9000/data/spark/userJson.json")

    spark.stop()
  }
}
  • hive
package com.test

import org.apache.spark.sql.SparkSession

object HiveJoinJdbs {

  /**
    * 须在resource添加hive-site.xml配置mysql,否则会在当前系统新建warehouse
    * hosts映射文件 master--->
    */
  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("HiveJoinJdbs")
      .master("local[2]")
      //这行在window好像没用
      .config("spark.sql.warehouse.dir", "hdfs://192.168.247.100:9000/user/hive/warehouse")
      .enableHiveSupport()
      .getOrCreate()

    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) " +
          "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' " +
          "LINES TERMINATED BY '\\n'")
    spark.sql("LOAD DATA LOCAL INPATH 'file://F:/src.txt' INTO TABLE src")

    val hiveDF = spark.sql("select * from src")
    hiveDF.show()
    
    //数据以hive表的形式保存
    import spark.implicits._
    hiveDF.filter($"key">2).write.saveAsTable("hive_src1")
    //按key进行分区
    hiveDF.write.partitionBy("key").saveAsTable("hive_src2")

    spark.sql("show tables").show()

    //配置在为连接或shuffle时要使用的分区数。
    spark.sqlContext.setConf("spark.sql.shuffle.partitions","10")

    spark.stop()

  }
}

  • Mysql
package com.test

import java.util.Properties

import org.apache.spark.sql.SparkSession

object JDBCDataFrameApp {

  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("test DataFrame").master("local[2]").getOrCreate()

    //option方式
    val jdbcDF = spark.read.format("jdbc")
      .option("url","jdbc:mysql://192.168.247.100:3306/hive")
      .option("dbtable","hive.VERSION")
      .option("user","root")
      .option("password","123456")
      .load()

    //properties方式
    val properties = new Properties()
    properties.put("user","root")
    properties.put("password","123456")
    val jdbcDF1 = spark.read
      .jdbc("jdbc:mysql://192.168.247.100:3306/hive","hive.TBLS",properties)

    jdbcDF.show()
    jdbcDF1.show()

    //保存数据也有相应的两种方式
    jdbcDF.write.format("jdbc")
      .option("url","jdbc:mysql://192.168.247.100:3306/test")
      .option("dbtable","test.VERSION_copy")
      .option("user","root")
      .option("password","123456")
      .option("driver","com.mysql.jdbc.Driver").mode("overwrite").save()

    jdbcDF1.write.mode("overwrite").jdbc("jdbc:mysql://192.168.247.100:3306/test","test.TBLS_copy",properties)
    spark.stop()
  }
}
  • 分区和Schema合并
package com.test

import org.apache.spark.sql.SparkSession

object SparkSqlOther {

  def main(args: Array[String]): Unit = {
    val spark = SparkSession.builder().appName("sparkOther").master("local[2]").getOrCreate()

    import spark.implicits._
    /**
      * 合并schema和分区
      */

    val df1 = spark.sparkContext.makeRDD((1 to 10))
      .map(i => (i, i*i))
      .toDF("value","square")
    //保存时可以指定路径进行分区
    df1.write.mode("overwrite").parquet("hdfs://192.168.247.100:9000/data/spark/parquet/key=1")

    val df2 = spark.sparkContext.makeRDD((11 to 20))
      .map(i => (i, i * i * i))
      .toDF("value", "cube")
    df2.write.mode("overwrite").parquet("hdfs://192.168.247.100:9000/data/spark/parquet/key=2")


    /**
      * 如不开启Schema合并,Schema信息会取第一个分区信息的Schma
      */
    //全局开启Schema合并
    spark.sqlContext.setConf("spark.sql.parquet.mergeSchema","true")
    
    //读取数据时开局Schema合并
    val mergeDF = spark.read.option("mergeSchema","true").parquet("hdfs://192.168.247.100:9000/data/spark/parquet")
    mergeDF.printSchema()
    mergeDF.show()
  }
}
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,324评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,303评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,192评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,555评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,569评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,566评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,927评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,583评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,827评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,590评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,669评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,365评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,941评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,928评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,159评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,880评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,399评论 2 342

推荐阅读更多精彩内容