最近学习了spark 相关的内容,写个笔记记录一下自己目前对于spark的理解,方便以后查阅。在本文的写作过程中,主要参考了1.宽依赖与窄依赖的区别;2.spark中几个概念的梳理;[3.spark shuffle的理解](https://blog.csdn.net/zylove2010/article/details/79067149)这样三篇博客,写的非常好,建议大家都去看看。
1.简介
Spark可以说是圈内最流行的几个大数据处理框架之一了,类似地位的可能还有storm之类的。其最大的优点就是能够几乎底层透明的完成分布式的计算,非常方便开发。
Spark是可以搭建在很多平台上的,本文面对的环境就是比较常见的Spark+Hadoop(作为文件系统)+Hive(作为分布式数据库)的配置。
本文重点想讲解的就是spark整个程序的生命周期(我们能够接触到的几个环节,还没有到太底层的需要看源码的程度)。
2.spark的核心数据结构-RDD
RDD是spark 的核心数据结构,spark 之所以能够做到把分布式做成近乎底层透明,正是依靠了RDD.RDD全称弹性分布式数据集(Resilient Distributed Datasets).
2.1 partition ,task 与 RDD
抛开一切我们想想一下,假如说现在有一张很大很大(千万级)的数据表格要你处理,我们按传统的思维方式来搞的话,就是:操作表格嘛,那么我就用个dataframe(R语言和python的pandas包中都有此概念)来封装这个数据表呗,然后我不就是各种骚操作嗖嗖嗖,像什么计数用的count
函数啦,排序用的sort
函数啦,分组用的groupby
函数啦。
现在问题来啦!你好像忘记了我一个很重要的前提,这个表很大很大啊!你用dataframe来封装的前提是得把这些数据全部加载到内存啊。这显然是不现实的。那么我们就要想办法,最直观的办法就是Divide & Conquer
。因为我们看看我们想做的这些操作,无论是计数,还是排序,还是分组,都是能够先分成小数据集,并行处理,然后再合并出结果的。因此我们的解决方案来啦,以计数为例,我们首先把这个大数据集分成多个小数据集(知识点,敲黑板!!这就是partition),每个小数据集我们启动一个子任务去让他做计数(知识点,敲黑板!!这就是task),每个子任务执行完毕之后再汇总成最终的结果。
其实,spark 就是帮我们把上面的工作完成了。Instead of 手动的分割文件,手动的分配任务,手动的汇总结果,我们只需要把我们的数据封装成RDD数据类型,就能够像是在操作普通小数据集一样的完成常见的那几种数据操作。
2.2 进击的RDD-Dataframe简介
在RDD的基础之上,spark又提出了升级版的数据结构-Dataframe,不过dataframe注意:这里的dataframe是pyspark中的叫法,在scala等语言中使用的dataset的名称。
那么什么是dataframe呢,简单的说你可以把他想象成数据库里的一张表,他有自己的column,row,还包括一些针对表的操作。如下面盗来的这张图所示:
spark中通过引入dataframe的数据结构带来了很多好处,在这里我们只重点说一说其中的两个:
- 效率高。dataframe自带的一些操作都是经过优化的,能够以极为高效的方式完成任务。
- 操作简便。经过又一层的封装,spark中的数据操作变得更加友好,上手很快,相比之下原来的针对rdd的操作可以称得上是非常原始了。
3.spark的生命周期
spark 作为一个大数据处理的框架,具有自己完整的生命周期。
3.1 全生命周期
闲言少叙,我在这里一句话串联一下整个第三节的脉络:
一段程序在spark里叫做一个application,一个application会划分成很多个job(划分条件是action),一个job会划分成很多个stage(划分条件是shuffle),每个stage里所有被处理的数据会划分之后交给很多个子任务去处理(划分条件是partition)
3.2 拆解讲解 - job & action
前文提到了,一个action的产生将会促使application切分Job。那么什么是action?简单来说就是spark中对于rdd的操作可以分成两类:tranformation和action。听到这两个名字,相信很多人已经明白了,transformation就是只是在做一些变形之类的操作,有点类似于hadoop里面的map,比如整体加个1啊什么的。而action是实际需要求值出结果的操作,比如说count什么的。
这个概念有点像lazy evaluation的操作,估计和spark的正宗语言是scala有关。总之,就是不到万不得已不求值,求值就要切分job.
3.3 拆解讲解 - stage & shuffle
现在说一说job内部的划分-stage。前面提到了,spark是不到万不得已不求值,求值才划分job,因此在一个job内部就完全是transforamtion的操作。
但是,即使都是变换操作也是有不一样的,有的变换是一一对应的变换,比如说每个元素都加1;而有的变换则是涉及到整个RDD,比如groupby.这就是窄依赖和宽依赖的变换。
为什么突然整这么一个概念呢,记住一句话:宽依赖引发shuffle 操作,shuffle操作导致stage切分。 想一下,我们现在把每个rdd交给很多个小的task取执行了,大家各自执行各自的(并行),执行完了之后如果没啥问题接着走后面的操作,直到最后汇总,这种就是完全并行的操作,理想的情况。但是总有一些操作搅屎,它是全局的操作(宽依赖),它必须得等待前面分好的所有子任务全部执行完他才能执行,换句话说就是必须得先在他这里汇总一下。
那么我们现在得出了一个结论:stage是spark种并行处理的最大单位。一个stage以内的各种操作都可以各自搞各自的,互不影响,从而最高的利用并行开发的效率。而出了stage就只能顺序执行所有操作了。
4.如何提高执行效率?——spark并发度的计算
到这里,主要的内容都差不多讲完了,但是我写这篇文章最大的原因还没有说。其实就是我在使用pyspark的时候遇到的一个问题。简单来说就是程序老是崩,总是提醒我内存不足,我经过好几天的折腾才发现是自己设定的问题。也总结出来了一条经验,就是想提高spark的效率就可以从两个角度出发:
- 修改配置内容,增加并发度。核心配置excutor.instances是spark处理器的个数(虚拟的可以多分配一些),excutor.cores是spark处理器的核心个数(虚拟的可以多分配一些)。
spark = SparkSession.builder.enableHiveSupport(). \
master("yarn"). \
config('spark.executor.memory', '15g'). \
config('spark.executor.cores', '10'). \
config('spark.executor.instances', '20'). \
config('spark.driver.memory', '20g'). \
getOrCreate()
- 修改RDD的partition,划分更小的task。前面提到的我的那个问题的本质原因不是并发度的问题,而是划分之后的任务还是太大了,交给每一个核心去处理内存都会扛不住,所以需要手动的划分(原本采用的是默认的划分,默认划分取决于你数据的输入,比如从hdf来的文件就是和你file split分片保持一致)。
df.repartition.groupyBy("city").count().sortBy('count')
5.赠送内容:spark的输出进度条怎么解读
最后一个问题,spark跑任务的时候的那个进度条里的数字都是啥玩意?相信很多人刚开始的时候都搞不太明白。简单说明一下:
- stage:就是前面提到的job内部划分的stage,不多说了
- 进度条尾端(X+Y)/Z:X是已经执行完的任务总数,基本上和数据的partition是保持一致的。Y是活跃的(准备好可以执行的)但是还没执行的的任务数。Z是总任务数。
- 特殊现象:有时候会出现活跃任务数是负数!!这是什么情况?——这个负数就是执行失败的任务数,需要重新执行的。