spark 和 pyspark的理解

最近学习了spark 相关的内容,写个笔记记录一下自己目前对于spark的理解,方便以后查阅。在本文的写作过程中,主要参考了1.宽依赖与窄依赖的区别;2.spark中几个概念的梳理;[3.spark shuffle的理解](https://blog.csdn.net/zylove2010/article/details/79067149)这样三篇博客,写的非常好,建议大家都去看看。

1.简介

  Spark可以说是圈内最流行的几个大数据处理框架之一了,类似地位的可能还有storm之类的。其最大的优点就是能够几乎底层透明的完成分布式的计算,非常方便开发。

  Spark是可以搭建在很多平台上的,本文面对的环境就是比较常见的Spark+Hadoop(作为文件系统)+Hive(作为分布式数据库)的配置。

  本文重点想讲解的就是spark整个程序的生命周期(我们能够接触到的几个环节,还没有到太底层的需要看源码的程度)。

2.spark的核心数据结构-RDD

RDD是spark 的核心数据结构,spark 之所以能够做到把分布式做成近乎底层透明,正是依靠了RDD.RDD全称弹性分布式数据集(Resilient Distributed Datasets).

2.1 partition ,task 与 RDD

  抛开一切我们想想一下,假如说现在有一张很大很大(千万级)的数据表格要你处理,我们按传统的思维方式来搞的话,就是:操作表格嘛,那么我就用个dataframe(R语言和python的pandas包中都有此概念)来封装这个数据表呗,然后我不就是各种骚操作嗖嗖嗖,像什么计数用的count函数啦,排序用的sort函数啦,分组用的groupby函数啦。

  现在问题来啦!你好像忘记了我一个很重要的前提,这个表很大很大啊!你用dataframe来封装的前提是得把这些数据全部加载到内存啊。这显然是不现实的。那么我们就要想办法,最直观的办法就是Divide & Conquer。因为我们看看我们想做的这些操作,无论是计数,还是排序,还是分组,都是能够先分成小数据集,并行处理,然后再合并出结果的。因此我们的解决方案来啦,以计数为例,我们首先把这个大数据集分成多个小数据集(知识点,敲黑板!!这就是partition),每个小数据集我们启动一个子任务去让他做计数(知识点,敲黑板!!这就是task),每个子任务执行完毕之后再汇总成最终的结果。

RDD

  其实,spark 就是帮我们把上面的工作完成了。Instead of 手动的分割文件,手动的分配任务,手动的汇总结果,我们只需要把我们的数据封装成RDD数据类型,就能够像是在操作普通小数据集一样的完成常见的那几种数据操作。

2.2 进击的RDD-Dataframe简介

  在RDD的基础之上,spark又提出了升级版的数据结构-Dataframe,不过dataframe注意:这里的dataframe是pyspark中的叫法,在scala等语言中使用的dataset的名称

  那么什么是dataframe呢,简单的说你可以把他想象成数据库里的一张表,他有自己的column,row,还包括一些针对表的操作。如下面盗来的这张图所示:

dataframe示例-盗图自易佰教程

  spark中通过引入dataframe的数据结构带来了很多好处,在这里我们只重点说一说其中的两个:

  • 效率高。dataframe自带的一些操作都是经过优化的,能够以极为高效的方式完成任务。
  • 操作简便。经过又一层的封装,spark中的数据操作变得更加友好,上手很快,相比之下原来的针对rdd的操作可以称得上是非常原始了。

3.spark的生命周期

  spark 作为一个大数据处理的框架,具有自己完整的生命周期。

3.1 全生命周期

  闲言少叙,我在这里一句话串联一下整个第三节的脉络:
  一段程序在spark里叫做一个application,一个application会划分成很多个job(划分条件是action),一个job会划分成很多个stage(划分条件是shuffle),每个stage里所有被处理的数据会划分之后交给很多个子任务去处理(划分条件是partition)

spark生命周期图

3.2 拆解讲解 - job & action

  前文提到了,一个action的产生将会促使application切分Job。那么什么是action?简单来说就是spark中对于rdd的操作可以分成两类:tranformationaction。听到这两个名字,相信很多人已经明白了,transformation就是只是在做一些变形之类的操作,有点类似于hadoop里面的map,比如整体加个1啊什么的。而action是实际需要求值出结果的操作,比如说count什么的。

  这个概念有点像lazy evaluation的操作,估计和spark的正宗语言是scala有关。总之,就是不到万不得已不求值,求值就要切分job.

3.3 拆解讲解 - stage & shuffle

  现在说一说job内部的划分-stage。前面提到了,spark是不到万不得已不求值,求值才划分job,因此在一个job内部就完全是transforamtion的操作。

  但是,即使都是变换操作也是有不一样的,有的变换是一一对应的变换,比如说每个元素都加1;而有的变换则是涉及到整个RDD,比如groupby.这就是窄依赖宽依赖的变换。

宽依赖和窄依赖

  为什么突然整这么一个概念呢,记住一句话:宽依赖引发shuffle 操作,shuffle操作导致stage切分。 想一下,我们现在把每个rdd交给很多个小的task取执行了,大家各自执行各自的(并行),执行完了之后如果没啥问题接着走后面的操作,直到最后汇总,这种就是完全并行的操作,理想的情况。但是总有一些操作搅屎,它是全局的操作(宽依赖),它必须得等待前面分好的所有子任务全部执行完他才能执行,换句话说就是必须得先在他这里汇总一下。

  那么我们现在得出了一个结论:stage是spark种并行处理的最大单位。一个stage以内的各种操作都可以各自搞各自的,互不影响,从而最高的利用并行开发的效率。而出了stage就只能顺序执行所有操作了。

4.如何提高执行效率?——spark并发度的计算

  到这里,主要的内容都差不多讲完了,但是我写这篇文章最大的原因还没有说。其实就是我在使用pyspark的时候遇到的一个问题。简单来说就是程序老是崩,总是提醒我内存不足,我经过好几天的折腾才发现是自己设定的问题。也总结出来了一条经验,就是想提高spark的效率就可以从两个角度出发:

  • 修改配置内容,增加并发度。核心配置excutor.instances是spark处理器的个数(虚拟的可以多分配一些),excutor.cores是spark处理器的核心个数(虚拟的可以多分配一些)。
    spark = SparkSession.builder.enableHiveSupport(). \
        master("yarn"). \
        config('spark.executor.memory', '15g'). \
        config('spark.executor.cores', '10'). \
        config('spark.executor.instances', '20'). \
        config('spark.driver.memory', '20g'). \
        getOrCreate()
  • 修改RDD的partition,划分更小的task。前面提到的我的那个问题的本质原因不是并发度的问题,而是划分之后的任务还是太大了,交给每一个核心去处理内存都会扛不住,所以需要手动的划分(原本采用的是默认的划分,默认划分取决于你数据的输入,比如从hdf来的文件就是和你file split分片保持一致)。

    df.repartition.groupyBy("city").count().sortBy('count')

5.赠送内容:spark的输出进度条怎么解读

进度条

  最后一个问题,spark跑任务的时候的那个进度条里的数字都是啥玩意?相信很多人刚开始的时候都搞不太明白。简单说明一下:

  • stage:就是前面提到的job内部划分的stage,不多说了
  • 进度条尾端(X+Y)/Z:X是已经执行完的任务总数,基本上和数据的partition是保持一致的。Y是活跃的(准备好可以执行的)但是还没执行的的任务数。Z是总任务数。
  • 特殊现象:有时候会出现活跃任务数是负数!!这是什么情况?——这个负数就是执行失败的任务数,需要重新执行的。
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,547评论 6 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,399评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,428评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,599评论 1 274
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,612评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,577评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,941评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,603评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,852评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,605评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,693评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,375评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,955评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,936评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,172评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,970评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,414评论 2 342

推荐阅读更多精彩内容