本文为 《Hadoop The Definitive Guide 4th Edition》的读书笔记,仅限交流使用。
剖析 MapReuce Job 的运行
我们能够在 JOb 对象上调用一个submit()方法来运行一个 MapReduce job,你也能调用 waitFormCompletion() -- 总是提交一了一个job后等待job的完成。这个方法隐藏了许多的执行细节。我们现在开看看他底层运行的步骤:
完整的运行流程见 Figure 7-1 :
- 客户端提交 MapReduce job
- YARN 资源管理器里,在集群里定位计算资源在哪个节点上开辟的。
- YARN node manager 中,运行在并且管理在集群机器上的计算容器,每一个容器有自己的nodeManager。
- MapReduce 应用程序管理员(master),定位 MapReduce job 的 task 手下们。应用程序 master 和 MapReduce task 运行在 由资源管理器调度、由 node manager 管理 的 containers 中,
- 分布式文件系统,用于在其他实体中分享 job 文件。
Job 提交过程
submit() 方法在 Job 上创建了一个内置的 JobSubmitter 实例化并且调用 submitJobInternal() 方法。 当调用了一个job, waitForCompletion() 将会每一秒去检测job的运行,当运行成功后,就展示出 job counters。如果因为job任务有错误而失败了,也会打印在控制台上。
job的提交过程是通过 JobSubmitter 来实现的:
- 向 resource manager 请求一个新的 应用程序 ID,使用在 MapReduce job 的ID上 (第2步)。
- 检查job指定的输出路径,如果路径没有指定或者已经存在,job就不会被提交,并且会向 MapReduce 抛出一个 error.
- 计算job的 input splits 。 如果无法计算splits(比如文件不存在),job就不会被提交,并且会向 MapReduce 抛出一个 error.
- 拷贝 job 所需要的资源,包括job的jar包,配置文件,计算后的 input splits,送给在共享文件系统中使用job的ID来命名的目录中(第3步)。 job 的jar包会根据配置的备份数来进行备份,默认是10.这样就可以有许多的拷贝在集群中传输给 node manager去读取,好为job运行 task。
- 通过调用在 resource manager 上调用 submitApplication()来提交任务(第4步)。
Job 初始化
当 resource manager 接到了 submitApplication() 方法的调用,他就将请求转送给 YARN 调度器。调度器就分配一个容器,随后 resource manager 开始启动在nodemanager的管理下的应用程序 master的进程(5a 5b)。
Mapreduce job的应用程序master是一个主类为 MRAPPMaster 的Java应用程序。他通过创建一些 bookkeeping(清单,账本)来初始化job,它用来追踪job的运行,也会接收从task发来的关于task 运行 和 完成 的报告(第6步)。然后,他从分享的文件系统中检索在客户端中计算好的 input splits(第7步)。然后就在每一个 split 上创建一个 map task,而 reduce task 对象的数目根据
mapreduce.job.reduces
属性来设定。
或者
setNumReduceTasks()
方法来设定。
task的ID在此时就被给予。
应用程序 master 必须决定怎样运行由 MapReduce 生成的 task 。如果 job 很小,应用程序 master 可能在他自己运行所在的 JVM 上运行这个 job。 这是通过比较分配和运行一个新的容器并且不得不并行运行的他们的开销和在一个node一个队列中运行他们的开销,来判断。使用这种方式运行的 task 叫 最好的(uber)task。
少于10个mapper和只有一个reducer,并且输入数据小于块大小的job就叫做一个小job(uber),允许使用uber方式运行。这些界线可以使用下面两个属性来设置:
mapreduce.job.ubertask.maxmaps
mapreduce.job.ubertask.maxreduces
mapreduce.job.ubertask.maxbytes
Uber task 必须要激活,即将属性
mapreduce.job.ubertask.enable
设置为 true
最后,在任何一个 task 能够运行之前,应用程序 master 就调用OutputCommitter 的 setupJob() 方法。默认的是 FileOutputCommitter,他会为整个job创建一个不可修改的输出路径,而为task输出创建一个临时的工作目录。详细有空讲。
task assignment 任务分配
如果一个 job 没有资格作为一个uber task运行,那么应用程序 master 将为一个job中所有的map和reduce task请求容器(第8步)。
为 map tasks 的所发出的请求会先进行,因为map tasks的优先级比 redeuce tasks 高很多,所以在 reduce 的排序阶段开始之前,必须要等他之前的 map task们 运行结束。在map tasks 完成了5%后,AppMaster 才能为 reduce task 发出资源(向 resource manager)请求。
reduce 能运行在集群的任何地方。但是需要 应用程序master 为 map tasks 发送的请求要遵守由调度器努力维持的数据本地化约束。
- task是数据本地化的,即 task 和 split 在一个 node 上。
- task 与 split 是在同一个的机架上,但不在一个node中。
- 一些 task 和他的 split 也有可能是在不同的机架中。
对于分布式的运行,通过查看job的counters,你能够查明运行在各个 本地化水平(上面三者)上的数目。
请求也会为task指定内存需求和CPU数目。默认的每一个map和reduce任务被分配有1024MB内存个一个虚拟CPU内核。这些值当然可以设置:
mapreduce.map.memory.mb
mapreduce.reduce.memory.mb
mapreduce.map.cpu.vcores
mapreduce.map.cpu.vcores
Task Execution 任务执行
一旦为一个 task 通过resource manager的调度器在分离节点上的容器上分配好了资源。应用程序 master 就开始联系 node manager,让他去启动容器(9a,9b)。task 是通过一个叫做 YarnChild 的主类Java应用程序执行的。在执行 task 之前,他会去定位 task 需要的资源,包括 job 配置文件,jar包和任何在分布式缓存中的文件(第10步)。最后,就开始运行 map task 或者 reduce task(第11步)。
YarnChild 需要专用的 JVM,以至于由用户编写定义map和reduce上的bug,不会影响到 node manager 的运行,造成崩溃或者中止。
流
流运行特殊的map和reduce task 为了运行用户自己的可执行程序并且与之交互。
流通过使用标准输入和标准输出与程序(可能使用其他程序编写的)进行交互,在执行 task 之间,Java程序将输入键值对送进额外的程序中,然后使用用户自定义的map或者reduce 方法处理输入,然后把输出键值对送回Java程序中。在 node manager 的立场上,他并不知道是一个子程序自己运行的map或reduce代码。
程序和状态更新
MapReduce job 是一个长期运行的并行批处理 job,运行时间由几秒到几小时。因为job的运行时间很重要,得到job执行时的反馈对用来说是非常重要的。一个 job 和他的每一个 task 都有一个 status(状态),包括job或者task的执行状态(比如:正运行,成功,完成,失败),还有 map 和 reduce 的完成进度,job conunter 的值,一个状态消息或者描述(可能用户代码设置的)。这些状态在job的执行过程中会不断的变化。
当 task 正运行,task能够计算他自身处理的进度(完成百分比)。对于 map task 来说,就是输入被处理的比例。 对于 reduce task, 就复杂了点,但是系统仍然会有输入进度的比例。他是将整个进度分成三个部分,对应于 shuffle 的三个阶段。比如,copy 和 sort 阶段(每一个占 1/3)已经都完成,而 task 的输入阶段完成了一半,那 task 的当前完成的进度就是 5/6 (1/3 + 1/3 + 1/3 * 1/2 = 5/6)。
task 中还会有许多的计数器(counter)去记录任务的运行情况,有些是框架中自带内建的,就像 ma 输出的记录数,当然用户能够自定义一个计数器。
当 map 和 reduce 正在运行的时候, 子进程会与他们的爸爸 AppMater 通过 umbilical ()接口进行交互。 task 汇报他的完成程度和状态信息给 appMaster,然后据聚合成 job 的运行视图。每三秒汇报一次。
resourse manager 网页界面将会展示出所有正在运行的 应用程序,是通过连接界面和 应用程序master 来得到的信息。
在Job 运行期间,客户端会通过每秒的轮询获得 应用程序master 中记录最近的状态信息(通过 mapreduce.client.progressmonitor.pollinterval来设置)。客户端也能够使用 Job 的 getStatus() 来获得包含了所有状态信息的JobStatus对象。
Job 完成
当 应用程序master 收到最后一个 task 发来的通知,说明最后一个 task 已经完成,他就将job的状态改为“成功”。当 Job(类)查询到这个状态,他就知道了这个 job 已经完成,然后他就打印一条信息给用户,并且返回 waitForCompletion() 方法。最后Job的统计数据与计数会打印到控制台输出。
最后, 应用程序master 和 task容器 清理他的工作状态(所以中间文件也会被删除),然后 OutputCommitter 的 commitJob() 方法被调用。 Job 的信息被归档在历史job服务中,以备用户去查看。
恩,就这些。