spark 资源大小分配与并行处理

起因

写这篇博客的起因我在跑一个spark job时,有时能跑通,有时跑不通。程序的需求是对比两个hbase表。程序逻辑很简单,分别从两个hbase表读取全量数据,然后以cogroup二者,对比同一个rowkey下每个列是否一致。

跑不通的错误日志如下:

17/02/25 21:24:20 INFO collection.ExternalAppendOnlyMap: Thread 1896 spilling in-memory map of 83.6 MB to disk (46 times so far)

17/02/25 21:24:22 WARN server.TransportChannelHandler: Exception in connection from /10.110.1.57:57832

java.io.IOException: Connection reset by peer

at sun.nio.ch.FileDispatcherImpl.read0(Native Method)

……

17/02/25 21:24:22 ERROR server.TransportRequestHandler: Error sending result ChunkFetchSuccess{streamChunkId=StreamChunkId{streamId=1258210057016, chunkIndex=0}, buffer=FileSegmentManagedBuffer{file=/data-g/hadoop/yarn/local-dir/usercache/test/appcache/application_1466564207556_1562806/blockmgr-ebe23f0d-5a9e-4a37-952b-73bfab6cceed/3f/shuffle_0_6_0.data, offset=474965639, length=95049579}} to /10.130.1.27:53263; closing connection

java.nio.channels.ClosedChannelException

17/02/25 21:24:22 ERROR client.TransportResponseHandler: Still have 1 requests outstanding when connection from c1-hd-dn8.bdp.idc/10.130.1.27:50014 is closed

17/02/25 21:24:22 INFO shuffle.RetryingBlockFetcher: Retrying fetch (1/3) for 1 outstanding blocks after 5000 ms

17/02/25 21:24:22 INFO collection.ExternalAppendOnlyMap: Thread 65 spilling in-memory map of 84.1 MB to disk (44 times so far)

17/02/25 21:24:23 INFO collection.ExternalAppendOnlyMap: Thread 1895 spilling in-memory map of 83.9 MB to disk (47 times so far)

 17/02/25 21:24:27 ERROR shuffle.RetryingBlockFetcher: Exception while beginning fetch of 1 outstanding blocks (after 1 retries)

java.io.IOException: Failed to connect to someHost/someIP:50004

……

Caused by: java.net.ConnectException: Connection refused: someHost/someIP:50004

……


stage0读取第一个hbase表的数据;stage1读取第二个hbase表的数据;stage2 cogroup两表并做数据对比

关注点

针对上面这个问题,做了相关的尝试,解决了以下几个问题:

(1)运行spark job该分配多少资源,即我们该分配多少个executor?每个executor分配多少内存、多少个core?

(2)spark job 的并行度由什么因素决定?

(3)为什么yarn UI也的executor显示的used memory内存大小比配置的内存小?


(1)运行spark job该分配多少资源,即我们该分配多少个executor?每个executor分配多少内存、多少个core?

该分配多少资源主要看输入量的大小、资源计算的复杂度。一般瓶颈会在shuffle阶段,如果执行某个shuffle的task内存不足,那很可能会跑不下去,程序挂掉。

spark中的计算任务都是一个个task单独执行,executor内存越多,单个task执行时内存越足,执行越顺利。 executor越多,core越多,可并行执行的task数目也就越多。假如总共100个task,5个executor,4个core,那么平均需要执行100/(5*4) = 5个批次;如果是2个executor,4个core,那么需要执行100/(2*4) = 13个批次。

core的数量一般根据内存大小和机器物理核数来定。最好不要超过物理核数。如果executor内存是4G,分配了4个core,那么每个core只有4G/4 = 1G内存。所以core不宜太大,如果太大,每个task执行时的内存将会变小,影响正常执行。

举个例子,我们的输入是两个hbase表,均为3.5G。shuffle阶段两个表会根据rowkey 做join,会产生几十G的shuffle数据。我们这样设置资源:

--driver-memory 1g \

--executor-memory 4g \

--num-executors 6 \

--executor-cores 4 \

(2)spark job 的并行度由什么因素决定?

并行度分为理论上最大的并行度和实际执行的并行度两种,“理论上”指的是总共的partition数目,一个partition对应一个task执行,如果数据有100个partition,那么理论上并行度最高可以达到100。“实际执行”指的是这些task实际分到executor各个core执行时的并行度。加入有100个partition,但是分配的资源只有10个executor,每个executor2个core,那么他们的并行度是10*2=40, 实际执行时会分批执行,分为100/(10*2) = 5批。我们一般讨论的并行度是理论上的并行度。

并行度(partition数目)由初始数据大小、初始数据类型,程序中设定的numPartitions大小,分配资源的executor、core数目共同决定。并行度一般在shuffle时发生改变,如果未设定,则默认取上一个stage中最大的partition数目作为当前stage的并行度。所以如果不做设定,那么并行度与初始数据的并行度紧密相关。

1.初始数据文件类型因素

如果读入的数据为hdfs文件,那么默认的并行度是block数量。block大小默认是64MB或128MB。

如果读入的数据是hbase表,那么默认的并行度是表的region数目。

2.人为设定numPartitions

如果人为的在读取数据或者在shuffle类算子中设定numPartitions,那么整体的并行度将会以人为设定的为准。

3.人为设定spark.default.parallelism

spark.default.parallelism参数是全局的,优先级低于人为设定的numPartiton。在shuffle时,如果没有设定numPartiton,那么将为以spark.default.parallelism设定的数目作为并行度。

4.系统默认的spark.default.parallelism

系统默认的spark.default.parallelism = executor数目*core数目

以上4个因素的优先级:

1.numPartitions参数 > 2. spark.default.parallelism参数 > 3. 读取初始文件产生的并行度

(3)为什么yarn UI也的executor显示的used memory内存大小比配置的内存小?

spark中的内存分为多个部分,UI页面上显示的只是缓存RDD用的storage memory,大约是(总内存 - 300M) * 60% * 50% 的量,所以会偏小。具体内存分配如下图:

以上。

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 194,761评论 5 460
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 81,953评论 2 371
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 141,998评论 0 320
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 52,248评论 1 263
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 61,130评论 4 356
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 46,145评论 1 272
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 36,550评论 3 381
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 35,236评论 0 253
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 39,510评论 1 291
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 34,601评论 2 310
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 36,376评论 1 326
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 32,247评论 3 313
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 37,613评论 3 299
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 28,911评论 0 17
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,191评论 1 250
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 41,532评论 2 342
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 40,739评论 2 335

推荐阅读更多精彩内容