MapReduce作业运行机制

调用方法运行MapReduce作业

  • Job对象的submit()方法:封装处理细节

  • waitForCompletion()方法:用于提交以前没有提交过的作业,并等待作为完成。

Hadoop运行MapReduce作业的工作原理

MapReduce工作机制.png

独立实体

  • 客户端:提交作业。

  • YARN资源管理器:负责协调集群上计算机资源的分配。

  • YARN节点管理器:负责启动和监视集群中机器上的计算容器(container)

  • MapReduce的application master :负责协调运作MapReduce作业的任务,它和MapReduce任务在容器中运行,这些容器由资源管理器分配,并由节点管理器进行管理。

  • 分布式文件系统(HDFS):用来与其他实体间共享作业文件。

作业的提交

1.Job的submit方法创建一个内部的JobSummiter实例,调用submitJobInternal()方法(步骤1)。提交作业后,waitForCompletion()每秒轮询作业的进度,如果发现自上次报告后有改变,则把进度报告到控制台;作业完成后,如果成功,则显示作业计数器;如果失败,则导致作业失败的错误被记录到控制台。

2.obSummiter实现的作业提交过程如下:

  • 向资源管理器请求一个新应用ID,用于MapReduce作业ID。(步骤2)

  • 检测作业的输出说明。例如,若没有指定输出目录或者输出目录已经存在,作业就不提交,错误抛给MapReduce程序

  • 计算作业的输入分片。如果分片无法计算,比如因为输入路径不存在,作业则不提交,错误抛给MapReduce程序。

  • 将运行作业所需资源:作业JAR文件、配置文件、计算所得的输入分片,复制到一个以作业ID命名的目录下的共享文件系统(HDFS)。(步骤3)

  • 通过调用资源管理器的submitApplication()方法提交作业。(步骤4)

作业的初始化

1.资源管理器(ResourceManager)收到调用它的submitApplication()消息后,将请求传递给YARN调度器(scheduler),调度器分配一个容器,然后资源管理器在节点管理器的管理下,在容器中启动application master进程。(步骤5)

2.MapReduce程序的application master是一个Java程序,其主类是MRAppMaster,application master对作业的初始化时通过创建多个簿记对象以保持对作业的进度跟踪。(步骤6)

3.接受来自HDFS的在客户端计算的输入分片,对每个分片创建一个map任务对象,以及由mapreduce.job.reduces属性(通过作业的setNumReduceTasks()方法设置)确定多少个reduce任务对象,任务ID在此时分配。(步骤7)

4.application master决定如何运行构成MapReduce作业的各个任务,如果作业很小,就选择和自己在同一个JVM上运行任务,与在一个节点上顺序运行这些任务相比,当application master判断在新的容器中分配任务和运行任务的开销大于并行运行它们的开销时,就好发生这一情况(在当前节点运行任务),这样的任务称为uber任务。

5.默认情况下,小作业是少于10个mapper且只有1个reducer且输入大小小于一个HDFS块的作业。

6.最后,在任何任务运行之前,application master调用setupJob()方法,设置OutputCommitter。

任务的分配

1.如果作业不适合作为uber任务运行,application master则为该作业中的所有map任务和reduce任务,向资源管理器请求新容器。(步骤8)

2.首先为Map任务发出请求,map请求优先级高于reduce请求,因为所有map任务需要在reduce任务运行前完成。直到有5%的map任务已经完成时,reduce任务请求才会发出。请求包括内存和CPU数

任务的执行

1.当资源管理器的调度器为任务分配了一个特定节点上的容器时,application master就通过与节点管理器通信(RPC)来启动容器,该任务由主类为YarnChild的一个Java程序执行。(步骤9)

2.在运行任务之前,首先将任务需要的资源本地化,包括作业的配置、JAR文件和所有来自分布式缓存的文件(步骤10)

3.最后,运行map任务或者reduce任务。(步骤11)

进度和状态的更新

每个作业及其每个任务都有一个状态(status),包括:作业或者任务的状态(运行中、成功完成、失败)、map和reduce进度、作业计数器的值、状态消息或者描述。

状态更新流程.png

1.当map任务或者reduce任务运行时,子进程和自己的父 application master通过umbilical接口通信。每隔3秒,任务通过这个umbilical接口向自己的application master报告进度和状态(包括计数器),application master会形成一个作业的汇聚视图(aggregate view)

2.在作业期间,客户端每秒钟轮询一次application master以接收最新状态(轮询隔间可通过mapreduce.client.progressmonitor.pollinterval设置)。客户端也可以使用Job的getStatus()方法得到一个JobStatus的实例

作业的完成

1.当application master收到作业最后一个任务已完成的通知后,便把作业状态设置为成功。然后在Job轮询状态时,便知道任务已经完成,Job打印一条消息告诉用户,然后waitForCompletion()方法返回。

2.最后,作业完成时,application master 和任务容器清理其工作状态,OutputCommitter的commitJob方法会被调用。作业信息由作业历史服务器存档,以便日后用户需要时可以查询。

失败

MapReduce程序失败主要考虑以下实体的失败:任务、application master、节点管理器、资源管理器

任务运行失败

  • 任务失败:当map任务或reduce任务抛出异常时,任务JVM在退出前向其父application master发送错误报告。错误报告最后被记入用户日志。application master将此次任务尝试标记为failed,并释放容器以便资源可以为其他任务使用。

  • 任务挂起:application master注意到已经有一段时间没收到进度更新,便会将任务标记为失败,此后,任务JVM进程将被自动杀死。任务被任务失败的超时时隔为10分钟。

  • 任务尝试机制(task attempt):application master被告知一个人任务尝试失败后,将重新调度该任务的执行,application master会试图避免在一起失败过的节点管理器上重新调度该任务。此外,如果一个任务失败过4次,将不会再重试。该值可以设置:

    • 对于map任务:运行任务的最多尝试次数由mapreduce.map.maxattempts属性控制。

    • 对于reduce任务:由mapreduce.reduce.maxattempts属性控制。

    • 默认情况下,如果任何任务失败次数大于4,整个作业都会失败。但可以为作业设置在不触发作业失败的情况下允许任务失败的最大百分比。

      • map任务:通过mapreduce.map.failures.maxpercent属性确定。

      • reduce任务:通过mapreduce.reduce.failures.maxpercent属性确定。

application master运行失败

  • 尝试机制:运行MapReduce application master的最多尝试次数由mapreduce.am.max-attempts属性控制,默认值为2,即MapReduce application master失败两次,便不会进行再尝试,作业失败。

  • 恢复过程:application master向资源管理器发送周期性心跳,当application master失败时,资源管理器将检测到该失败并在一个新的容器中开始一个新的master实例。对于MapReduce application master,将使用作业历史来恢复失败的应用程序所运行任务的状态,而不必重新运行。

  • MapReduce客户端向application master轮询进度报告,如果它的application master运行失败,客户端则需要重新定位新的实例。

节点管理器运行失败

如果节点管理器由于崩溃或者运行非常缓慢而失败,则会停止向资源管理器发送心跳包,如果10分钟内没有收到心跳消息,资源管理器将会通知停止发送心跳信息的节点管理器,并且将其从自己的节点池重移除调度启用容器。如果应用程序运行失败次数过高,节点管理器则可能会被拉黑。application master管理黑名单,对于MapReduce任务,如果一个节点管理器上有超过3次任务失败,application master就好尽量将任务调度到不同节点上。

资源管理器运行失败

该问题非常严重,直接导致作业和任务容器无法启动,且不能被恢复。因此需要保证其高可用。所以运行一对资源管理器是有必要的,如果主资源管理器失败了,那么备份资源管理器能够接替,且客户端不会感到明显的中断。

资源管理器从备机到主机的切换有故障转移控制器(failover controller)处理,默认故障转移控制器使用zookeeper的leader选举制度以确保同一时刻只有一个资源管理器。

shuffle和排序

MapReduce确保每个reducer的输入都是按键排序的,系统执行排序,将map输出作为输入传给reducer的过程称为shuffle。

map端

map函数产生输出的过程并非简单地将它写到磁盘,它利用缓存方式写到内存并处于效率的考虑进行预排序。过程如下:

shuffle排序.png

1.每个map都有一个环形内存缓存区用于存储任务输出,默认大小为100MB,当缓存内容达到阈值0.8时,后台线程便开始把内容溢出(spill)到磁盘(disk)。在溢出写入磁盘过程中,map输出继续写到缓冲区,如果此间缓冲区满了,map会被阻塞直到写磁盘过程完成。溢出写过程按轮询方式将缓冲区中的内容写到mapreduce.cluster.local.dir属性在作业特定子目录下指定的目录中。

2.在写磁盘前,线程首先根据数据最终要传的reducer把数据划分成相应的分许(partition),在每个分区中,后台线程按键进行内存中排序,如果有combiner函数,则会在排序后的输出上运行。

reduce端

1.map输出文件位于运行map任务的tasktracker的本地磁盘,但reduce输出不是这样。

2.当每个map任务完成时,reduce任务开始复制其输出,这是reduce任务的复制阶段,reduce任务有若干复制线程,因此能够并行复制map输出,默认是5个线程,可以通过mapreduce.reduce.shuffle.parallelcopies属性确定

3.reducer如何知道在哪台机器上取得map输出?

map任务成功完成后,会使用心跳机制通知他们的application master。因此,对于指定作业,application master拥有map输出和主机位置之间的映射关系,reducer中的一个线程定期询问master以便获取map输出主机的位置,直到获取所有输出位置。

4.如果map输出相当小,则会被复制到reduce任务JVM的内存上,否则,map输出会被复制到磁盘。

5.复制完所有map输出后,reduce任务进入排序阶段(合并阶段,排序在map端进行),合并map输出,维持其顺序排序。默认合并因子为10,即10个map输出合并为1个。

6.最后,进入reduce阶段,直接把数据输入到reduce函数,没有把中间文件合并为1个文件作为最后一趟合并。在该阶段,对已排序输出中的每个键调用reduce函数,此阶段的输出直接写到输出系统(HDFS)

配置调优

调优原则:尽可能给shuffle过程分配多内存空间,减少磁盘写入与map阻塞。

在map端:避免多次溢出写磁盘来获取最佳性能,内存大小通过mapred.child.java.opts属性确定

在reduce端:中间数据全部驻留内存时,能获取最佳性能。

任务的执行

推测执行

MapReduce模型将作业分解成任务,然后并行地运行任务以使得作业的整体执行时间少于各个任务顺序执行时间。

Hadoop在一个任务运行比预期慢的时候,会尽量检测,并启动另一个相同的任务作为备份,这就是任务的推测执行

OutputCommitters

Hadoop MapReduce使用一个提交协议来确保作业和任务都完全成功或失败。这个行为通过对作业使用OutputCommitter来实现。


参考资料:《Hadoop权威指南》

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 202,723评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,080评论 2 379
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 149,604评论 0 335
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,440评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,431评论 5 364
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,499评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,893评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,541评论 0 256
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,751评论 1 296
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,547评论 2 319
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,619评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,320评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,890评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,896评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,137评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,796评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,335评论 2 342

推荐阅读更多精彩内容

  • 昨天2016的第一场雪,愉悦
    美阳羊阅读 342评论 0 0
  • 昨晚上,校本部王主任开启的别开生面破冰之旅,已是让我很惊奇,很震撼,我当了20多年的班主任,原来第一节课介绍学生也...
    我本善良_1034阅读 498评论 0 2
  • 今天给会员上了两节课,早上的阿斯汤加和晚上的哈他,早上的如沐春风,而晚上却是布满阵云,无非是心境的变化,无非是自己...
    Rachel_8阅读 174评论 0 3
  • 一个普通的下午一颗普通的树 坐着我没有说话风也没有说话只有灵魂在低声哭泣
    心花园子阅读 144评论 0 0