1.Spark简述
Spark通过内存计算能力,急剧的提高大数据处理速度。解决了Hadoop只适合于离线的高吞吐量、批量处理的业务场景的弊端,提出了实时计算的解决方法。
1.1 Spark特点
a.快速处理能力:Hadoop的MapReduce中间数据采用磁盘存储,而Spark优先使用内存避免大量的磁盘IO,极大的提高了计算速度;
b.支持性强:Spark支持Java、Scala、Python等;
c.可查询:Spark SQL支持SQL;
d.支持流计算:Spark支持实时的流计算Spark Streaming;
e.可用性高:Spark的Standalone模式支持多Master,避免了单点故障;
f.Spark支持的数据源多:Spark支持HDFS、Hbase、Hive、Cassandra等,方便数据迁移。
1.2 Spark术语
a.RDD(Resillient Distributed Dataset) :弹性分布式数据集,程序可以根据需要,将RDD的Partition的个数进行增加和减少(一个Partition对应一个task),以提高执行效率。而且RDD支持容错,如果有失效的RDD,那么Spark可以从父RDD重新生成子RDD;
RDD特征:
1)不可变性: RDD只能通过转换生成需要的新的RDD;
2)分区性: RDD由多个partition构成,每个partition独立的存储于各自的机器内存+磁盘上;
3)内存优先: 可以全部或部分缓存在内存中,当内存不够时再使用磁盘存储;
4)弹性存储: RDD在运行时内存不够时,它会进行内存和磁盘进行数据交换,这对开发者是透明的;
5)容错性: 当RDD数据被删除或者丢失,可使用父RDD重新计算重新获取RDD,而且用户无感知。
6)本地计算: 当程序和数据块不在同一台机器时,优先迁移程序而不是优先迁移数据。
b.Task:执行任务,有ShuffleMapTask(对应Hadoop的Map)和ResultTask(对应Hadoop的Reduce);
c.Job:程序提交的作业,由task构成;
d.Stage:Job的分段,一个Job划分成多个Stage;
e.Partition: 数据分区,一个RDD可以后多个分区构成;
f.Narrow依赖:即窄依赖,子RDD的Partition依赖父RDD中固定的一个Partition,例如map、filter、union等操作会产生窄依赖;
g.Shuffle依赖:即宽依赖,子RDD的Partition依赖父RDD中的所有Partition,例如groupByKey、reduceByKey、sortByKey等操作会产生宽依赖,会产生shuffle;
h.DAG(Directed Acycle Graph): 记录RDD之前依赖关系的有向无环图。
i.算子:是指对RDD的运算,可以理解为Spark处理RDD 的函数
算子的分类:
i.1:Value型Transformation算子:
针对处理的数据项是value,例如一对一型(map,flatMap,mapPartition,glom),多对一型(union,cartesian),多对多型(groupBy),输出分区为输入分区子集型(filter,distinct,subtract,sample,takeSample),Cache型(cache:将RDD从磁盘缓存到内存,persist:将RDD从进行缓存,可以缓存到内存或磁盘)。
i.2:Key-Value型Transformation算子:
针对处理的数据项是Key-Value形式的算子。例如一对一型(mapValues),聚集(combineByKey,reduceByKey,partitionBy,cogroup),连接(join,leftOutJoin,rigthOutJoin)。
i.3:Action算子:
Action算子是指通过SparkContext执行提交作业的runJob操作,触发RDD的DAG的执行。例如无输出(foreach),HDFS(saveAsTextFile,saveAsObjectFile),Scala集合型(collect,collectAsMap,reduceByKeyLocally,lookup,count,top,reduce,fold,aggregate)。
2.Spark的基本架构
过程说明:
Client 提交应用(由Spark Action算子触发),Cluster Manager找到一个Worker启动Driver,Driver向Cluster Manager申请资源,然后将应用转换为RDD DAG,再有DAG Scheduler 将RDD DAG 转化为Stage DAG(一个Stage由一组相同的task集合构成),然后提交给TaskScheduler,由TaskScheduler将task(每个task对应一个Partition)交给Executor执行。
a.Cluster Manager:
Spark集群管理器,主要负责资源的分配和管理,负责Worker上的内存,CPU等资源的分配,不负责Executor的资源分配和管理。常用的Spark资源管理器有Standalone,YARN,Mesos,EC2等。
b.Driver:
是spark的驱动节点,执行spark中的main方法,负责代码的执行工作。主要任务如下:
1.将用户查询转换为任务;
2.在Executor之间调度任务;
3.跟踪Executor的执行情况;
4.支持UI展示运行情况。
c.Worker:
Spark中的工作节点,得到Cluster Manager的分配到资源的Worker的工作有:创建Executor,将自己的资源和任务分配给Executor,同步资源信息给Cluster Manager。
d.Executor:
Executor是一个JVM进程:
1.负责执行Spark中的task执行,并将结果返回给Driver;
2.通过自己的Block Manager为RDD提供基于内存的存储(如果需要),因此RDD是可以直接缓存在Executor进程内的,故执行效率高。
各组件之间通信采用Netty通信服务,每个组件有一个InBox负责接收数据,N个OutBox(N取决于要发送数据的接收组件的个数)
2.1 Spark运行模式:
模式 | 说明 |
---|---|
local | 本地运行,可以指定CPU的核心数量 |
Standalone | Spark自带的集群运行模式 |
Mesos | 在Mesos集群上运行,Driver和Worker都运行在Mesos上 |
yarn-client | Driver运行在本地,Worker运行在YARN上 |
yarn-cluster | Driver运行在YARN上,Worker运行在YARN上,线上环境基本都是这种模式 |
2.2 Spark的task提交过程
a.spark根据transaction操作,将RDD构建血缘关系图,即DAG,然后由action算子触发job调度执行;
b.DAGScheduler负责Stage的调度,将job划分成多个stage,然后将stage打包成taskSet交给TaskScheduler调度;
c.TaskScheduler负责task的调度,将从DAGScheduler获取的TaskSet按照指定的调度策略(FIFO使用先进先出的TaskSetManger队列或者FAIR)分发到Executor上执行;
d.SchedulerBackend负责提供可用资源,接收Executor的注册信息,并维护Executor的状态,资源情况的上报等。
调度策略:
FIFO:使用先进先出的队列,TaskSetManger为一个节点。
FAIR:对TaskSetManger的runningTasks个数,minShare值,weight值;
主要原则有:
runningTasks < minShare的先执行;
minShare使用率低的先执行;
weight使用率低的先执行;
最后将排序后的TaskSetManger放入缓存队列中,然后依次交给Executor执行。
2.3 Job,Stage,task的划分
说明:
每个Action算子是一个job,一个job由shuffle(宽依赖)分割成多个Stage,一个Stage内有多少个Partition就产生多少个task。故一个job中task的数量 = stage数量 * 每个stage的task数量。
由shuffle宽依赖划分Stage的原因:
shuffle宽依赖中子RDD的Partition会依赖父RDD的多个Partition,这样就会出现一些父Partition没有准备好数据,导致不能继续计算,直到父RDD的所有Partition都准备好了,才能够执行将父RDD转换为子RDD的计算,而且往往需要跨节点数据传输。而窄依赖是父RDD的一个Partition决定了子RDD的一个Partition,直接计算就可以了。另外,在数据恢复时,窄依赖只需要重新执行丢失子RDD的Partition的父RDD的Partition即可,而shuffle宽依赖需要考虑通过恢复所有父RDD的Partition,然后通过计算再获得子RDD的Partition。
Task的本地化等级:
名称 | 说明 |
---|---|
PROCESS_LOCAL | 进程本地化,即Task和对应的Partition在一个Executor中,性能最好 |
NODE_LOCAL | 节点本地化,即Task和对应的Partition在一个Worker中,但是不在一个Executor中,数据需要进程间传输 |
RACK_LOCAL | 机架本地化,即Task和对应的Partition在一个机架的两个节点上,数据需要通过网络在节点间传输 |
NO_PREF | 对于Task来说,从哪里去都一样,没有优劣之分 |
ANY | Task和数据不在一个机架上,性能最差。 |
Spark在调度时,尽可能的让每个task以最高级别的本地性级别执行,但是当本地性级别对应的所有节点都没有空闲资源时,那么Spark会等待重试,如果超过阈值时间,那么将会降低本地性级别启动。
Shuffle的任务个数:
map task个数:
当Spark从HDFS读取数据时,task的个数由HDFS的split个数相同;
当Spark的Shuffle之前执行了repartition或者coalesce操作,那么task的个数和Partition个数相同。
reduce task个数:
如果配置了spark.default.parallelism=N,那么reduce的task的个数为N;
如果没有配置,那么task的个数和Partition个数相同。
3.Spark的Shuffle过程
3.1 我们先来看看Hadoop的Shuffle过程:
说明:
Reduce获得Map的中间输出结果后,会对这些数据在磁盘上进行merge sort,需要大量的IO。
3.2 Spark基于Hash的Shuffle:
说明:
在Hash Shuffle的时候,每个task会根据Reduce的个数创建相同个数的bucket,故bucket的总个数是task个数 * Reduce的个数。相比Hadoop的Shuffle,Hash Shuffle避免了不必要的排序。
缺点:
task个数和Reduce的个数比较大时,该Shuffle会生成大量的bucket文件,不但对系统产生很大压力,也影响了IO吞吐量。另外,Map的中间结果是首先保存到内存中的,然后再写入磁盘,对内存容量要求比较高。
3.3 Spark基于Hash的Shuffle的优化
说明:
使用Consolidation机制(spark.shuffle.consolidation=true开启),将一个CPU core的所有Task的输出到一个ShuffleBlockFile文件中,不同的Task输出到不同的ShuffleBlockFile的Segment中。即只有CPU core中第一个执行的task要创建磁盘文件,后面该CPU core再执行后面的task时,复用之前第一个task创建的磁盘文件。
3.4 Spark基于sort的Shuffle
sort的Shuffle有两种运行模式
a.普通运行模式
当执行shuffle时,数据会首先写入内存数据结构中(reduceByKey使用map结构,join使用Array结构),每一条数据写入内存数据结构后,会判断是否达到了阈值,如果达到了阈值,那么根据key将内存结构中的数据进行排序,排序之后,那么将内存数据结构中的数据使用Java的BufferedOutputStream分批写入到磁盘中(默认batch=10000),然后清空内存数据结构。多次flush内存结构数据,会产生多个临时文件,最后将这些临时文件都进行合并,即merge。
b.ByPass运行模式
当Shuffle read task的数量小于等于spark.shuffle.sort.bypassMergeThreshold参数时(默认为200)且不是聚合类的shuffle算子,则启用ByPass模式。
其过程大致和未经优化的Hash Shuffle相同,只是后面多了一步文件merge的过程。
好处:
1.不会进行排序;
2.向磁盘写数据的方式不同;
4.Spark的内存管理
4.1 堆内内存(on-heap memory):
指Executor中的堆内存,即JVM中的heap中的空间,由JVM统一管理。通过-executor-memory或spark.executor.memory参数可指定大小。
1.storage内存: 主要保存RDD的缓存,广播变量的内存部分为;
2.Execution内存:主要保存shuffle时产生的数据;
3.other内存:保存Spark内部的对象实例,或者用户自定义的对象实例。
由于Spark的堆内内存,交由JVM管理,故会出现Spark标记为释放的实例,在JVM方面还没有被垃圾回收器回收,因此JVM实际可用的堆内存小于Spark记录的可用内存,即Spark记录的可用堆内内存会和JVM实际的堆内内存不一致,故有可能会导致OMM。
4.2 堆外内存(off-heap memory):
Spark从操作系统获取的内存,存储序列化的二进制对象。Spark使用JDK Unsafe API管理堆外内存,跳过了JVM,精准的申请和释放内存,减少不必要的堆内存开销,以及JVM GC带来的性能问题,也提高了内存空间使用情况计算的精度。
默认情况下不开启堆外内存,可通过spark.memory.offHeap.enabled开启,并设置堆外内存大小,堆外空间有storage内存和Execution内存,没有other内存空间。
4.3 内存分配:
a.静态内存分配:
早期的Spark采用静态内存分配,即程序运行期间的存储内存,执行内存和other内存大小都是固定的,都是在Spark启动时设置好的。
缺点:
需要对Spark深入了解,对任务逻辑很清楚的情况下,才可以分配好存储空间,否则很容易空间分布不均衡。故目前很少使用这种分配方式。
b.统一内存管理:
Spark1.6之后引入,即Storage和Execution存储空间在同一块区域,可以动态占用另一方的空闲区域。
a.Spark启动时,设置Storage和Execution空间大小;
b.当一个空间不够(放不下一个Block),另外一个空闲时,则借用空闲空间; 当Storage和Execution空间都不够时,将数据存储到磁盘;
c.当自己Execution内存不够时,可以要求另一方归还借用的空间;
d.当自己Storage内存不够时,无法可以要求另一方归还借用的空间,主要因为Shuffle的过程比较复杂。
优点:
提高内存使用率,降低Spark内存维护难度,要防止缓存数据过多导致full GC带来的性能问题。
4.4 RDD的持久化:
持久化级别 | 说明 |
---|---|
MEMORY_ONLY(默认级别) | 以非序列化的JAVA对象的方式存储在JVM的内存中,当内存不够存储所有Partition时,没有存储到JVM内存中的Partition会在需要他的时候重新计算。 |
MEMORY_AND_DISK | 以非序列化的JAVA对象的方式存储在JVM的内存中,当JVM内存不够时,会将超出的Partition持久化到磁盘 |
MEMORY_ONLY_SER | 与MEMORY_ONLY相同,但是会将Java对象进行序列化,减少了内存开销,但是增大了CPU开销 |
MEMORY_AND_DISK_SER | 与MEMORY_AND_DISK相同,但是会将Java对象进行序列化,减少了内存开销,但是增大了CPU开销 |
*_2(MEMORY_ONLY_2,MEMORY_AND_DISK_2,MEMORY_ONLY_SER_2,MEMORY_ONLY_SER_2 ) | 表示持久化数据要有一个副本,保存到其他节点上,实现容错,避免因为一个RDD分区意外丢失导致所有数据重新计算; |
4.5 RDD的缓存过程:
Unroll过程:
RDD在缓存到Storage之前,Partition以Iterator数据结构来访问,通过Iterator可以获得每一条数据记录(Record,序列化使用SerializedMemoryEntry数据结构定义,使用字节缓冲区ByteBuffer存储二进制数据;非序列化使用DeserializedMemoryEntry数据结构定义,使用数组存储),这些Record在逻辑上占用了Spark堆内内存的other空间,注意:同一个Partition内的Record并不连续。
当将RDD缓存到Storage内存之后,Partition被转换成Block,Record在堆内或者堆外的Storage内存中占用一块连续的空间,这样讲Partition由不连续的存储空间转换为连续的存储空间的过程,Spark称之为Unroll。
每个Executor的Storage使用一个LinkedHashMap来管理所有堆内内存和堆外内存中所有的Block实例,对LinkedHashMap中Block的新增和删除,伴随着内存的申请和释放。
Block的淘汰和落盘具体规则如下:
a.旧的Block和新的Block要同属堆内内存或者同属堆内内存;
b.新旧Block不能同属一个RDD,避免循环淘汰;
c.旧的Block对应的RDD不能处于被读状态,避免发生一致性问题;
d.LinkedHashMap的淘汰策略使用LRU方式,采用遍历的方式,找到淘汰的Block;
e.只有存储级别中包含了DISK,Block才进行落盘,否则直接删除该Block,需要的时候再重新计算得到被删除的Block。
由于Storage空间容纳的Iterator是有限的,当前计算任务,向Memory Manager申请足够的Uroll空间(空间的估算方式,序列化的Partition直接累加计算即可,而且计算准确;非序列化的Partition,使用遍历Record的方式采样估算所需要的Unroll空间,估算会精度会有偏差,当程序执行Unroll时遇到空间不足时,Unroll会释放已占有的空间)来临时占位,当空间不足时,unroll失败,直到空间足够时才能够执行Unroll。
4.6 Execution内存管理:
Execution主要保存Shuffle执行时占用的内存。
4.6.1 Shuffle Write
a.若在map端使用普通的排序方式,则会使用ExeternalSorter进行外排,使用Spark堆内内存空间;
b.若在map端使用tungsten排序(钨丝排序),则会使用ShuffleExeternalSorter直接对序列化形式的对象进行排序,该方式可以使用Spark堆内内存空间和Spark堆外内存空间(当堆外内存开启时)。
4.6.2 Shuffle Read
a.在对reduce端数据进行聚合时,数据由Aggregator处理,使用Spark堆内内存空间;
b.如果最终结果需要排序,那么数据还需要ExeternalSorter进行外排,使用Spark堆内内存空间。
ExeternalSorter和Aggregator使用AppendOnlyMap哈希表保存数据(当内存不够时,会保存到磁盘中)。
4.7 Block的管理:
a.BlockManagerMaster:
负责对各个节点上的BlockManager内部数据的元数据进行维护(主要是Block的增删改等);
b.BlockManager:
每个BlockManager创建之后,首先将自己注册到BlockManagerMaster,这时候BlockManagerMaster就会有对应的BlockManagerInfo;
BlockManager进行数据读写的时候,首先从本地读取(使用DiskStore或者MemoryStore),如果本地没有对应数据的话,通过TransferService与有数据的BlockManager建立连接,然后从远程拉取数据。
4.8 Spark共享变量:
默认情况下,每个Spark的算子,使用了某个外部变量,那么这个变量就会复制到每个task中,这样每个task读取各自的外部变量。
4.8.1 Spark广播变量(Broadcast Variable)
广播变量在Spark开始时是在Driver端,当task使用广播变量时,先从自己所在Executor中BlockManager中获取,则从Driver端或者其他节点的BlockManager获取,该Executor后续的task使用广播变量时,自己从自己所在Executor中BlockManager中获取。为每个Executor复制一份,避免了每个task复制一份的性能问题,减少了IO和内存消耗。广播变量不能被task修改。
4.8.2 Spark累加器(Accumulator)
Spark Accumulator保存于Driver端,task对Accumulator的累加操作后,会把值发送到Driver端,Driver对值进行汇总。task只能对Accumulator只能进行累加操作,不能够读取。
5.Spark性能调优
5.1 资源的配置:
在资源允许的情况下:
a.增加Executor个数,提高task并行度;
b.增加每个Executor的CPU core的个数,提高task并行度;
c.增加每个Executor的内存,这样可以缓存更多的数据,为Shuffle提供更多的内存,每个task也获取了更多的内存。
5.2 RDD优化:
a.RDD复用,避免重复计算带来的资源消耗;
b.RDD持久化,通过持久化,将多次使用的RDD缓存到内存或磁盘中。
5.3 RDD并行度控制:
尽量让并行度和资源匹配,Spark官方推荐,Stage中的task数量为Spark的总CPU core数量的2-3倍,避免不同CPU不同的性能,导致先执行完task的CPU空闲。
5.4 广播大变量:
广播变量为每个Executor复制一份,避免了每个task复制一份的性能问题,减少了IO和内存消耗。
5.5 kyro序列化:
Spark默认使用Java的序列化,但是Java的序列化性能和效率不高,使用kyro序列化的性能是Java序列化的性能高很多(官方说是10倍)。
5.6 本地化等待时间:
Spark的希望task能够运行在它要计算的数据所在节点上(数据本地化思想,避免网络传输),当该节点的计算资源不够时,spark会等待(默认等待3秒),如果超过了等待时间,那么task的本地化等级会降级,然后再等待一段时间,否则再次降级。
本地化等待时间太长,会导致大量的等待时间过长,反而使Spark任务性能下降。
5.7 算子调优:
a.MapPartitions的使用场景;
b.foreachPartitions的使用场景;
c.filter和coalesce配合,过滤数据之后,考虑数据重新分区的问题;
d.repartition重新分区提高并行度,Spark SQL的并行度不允许用户指定,如读取hive数据时,HDFS的split个数,即使所在Stage的并行度,即RDD的个数。但是在读取到数据之后,可以立即使用repartition算子,进行合理的分区;
e.reduceByKey本地聚合,在map端对本地的数据进行聚合。
5.8 Shuffle调优:
a.调整map端缓冲区(默认32KB),避免因为缓存区不够导致的刷盘IO操作;
b.调整reduce端缓冲区(默认48MB),避免因为缓存区不够导致的刷盘IO操作,减少数据拉去的次数;
c.调整reduce端拉去数据重试次数(默认3次),避免网络不稳定或者full gc导致的数据拉去失败问题;
d.调整reduce端拉去数据等待时间的阈值(默认5秒),避免网络不稳定或者full gc导致的数据拉去失败问题;
e.调整SortShuffle排序操作阈值,即bypass操作的阈值,避免不必要的排序操作;
5.9 JVM调优:
a.降低cache操作的内存占比;
b.调节Executor的堆外内存;
c.调节连接等待时长;