Coordinator Overview
用户在数据处理的过程中可能会使用到map-reduce, hadoop-streaming, hdfs 以及 Pig任务,这些多个任务可以被组合成为一个任务流。一般情况下,任务流运行可能依赖于时间周期、数据依赖、又或者是某些特定的事件条件。任务流和任务流之间也存在着某种关系,特别是不同频率之间的任务流,Coordinator组件作用就是允许用户来定义任务流的执行周期,依赖数据和依赖事件,以及任务流和任务流之间的关系。
概念定义
- Actual time:任务流实际真正允许的时间点;
- Nominal time:通过逻辑计算,任务流理应跑的时间点,因为上层依赖,或者是系统性能问题可能会导致Actual time比Nominal time迟;
- Dataset:数据集,数据集中会有一个url代表;
-
Synchronous Dataset:同步数据集,数据集实例通过Nominal time来标识,比如一个基本的hdfs上的数据集url可以标识:hdfs://foo:8020/usr/logs/2009/04/15/23/30.
一个基本的HCatalog表分区数据集url可以标识hcat://bar:8020/mydb/mytable/year=2009;month=04;dt=15;region=us. - Coordinator Action: 代表饿了满足了启动了的任务流;
- Coordinator Application: Coordinator 应用,定义了Coordinator 应用的开始时间和结束时间,定义了 coordinator actions的依赖条件,Coordinator 应用以xml的形式展现出来,信息可参数化;
- Coordinator Job: 是一个可以执行的Coordinator 应用实例,任务实例根据生成的条件已经将参数具体化;
- Data pipeline: 多个 Coordinator应用通过消费和产生相关的数据集;
- Coordinator Engine: 用来执行Coordinator 任务的引擎;
Datetime, Frequency and Time-Period Representation
Datetime:
Examples:
| EL Constant| Value| Example|
| ------------- |:-------------:|: -----|
| ${coord:minutes(int n)} | n | ${coord:minutes(45)}--> 45 |
| ${coord:hours(int n)} | n * 60 | ${coord:hours(3)}--> 180 |
| ${coord:days(int n)} | variable | ${coord:days(2)}--> minutes in 2 full days from the current date |
| ${coord:months(int n)} | variable | ${coord:months(1)}--> minutes in a 1 full month from the current date |
| ${cron syntax} | variable | ${0,10 15 * * 2-6}--> a job that runs every weekday at 3:00pm and 3:10pm UTC time |
Note 1:这里的天和月的计算使用的简单的步长,天按照24个小时来计算,月按照30天来计算。比如天,在夏令时开始和结束的时候一天不是24个小时。
Note 2:${coord:days(int n)}和${coord:endOfDays(int n)}的使用场景:
如果频率暗含的比如每几天,没几周,这样子,不建议使用这个表示方法。考虑到的是夏令时的问题。
Demo:
<coordinator-app name="hello-coord" frequency="${coord:days(1)}"
start="2009-01-02T08:00Z" end="2009-01-04T08:00Z" timezone="America/Los_Angeles"
xmlns="uri:oozie:coordinator:0.1">
<controls>
<timeout>10</timeout>
<concurrency>${concurrency_level}</concurrency>
<execution>${execution_order}</execution>
<throttle>${materialization_throttle}</throttle>
</controls>
<datasets>
<dataset name="din" frequency="${coord:endOfDays(1)}"
initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
<uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
</dataset>
<dataset name="dout" frequency="${coord:minutes(30)}"
initial-instance="2009-01-02T08:00Z" timezone="UTC">
<uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
</dataset>
</datasets>
<input-events>
<data-in name="input" dataset="din">
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<output-events>
<data-out name="output" dataset="dout">
<instance>${coord:current(1)}</instance>
</data-out>
</output-events>
<action>
<workflow>
<app-path>${wf_app_path}</app-path>
<configuration>
<property>
<name>wfInput</name>
<value>${coord:dataIn('input')}</value>
</property>
<property>
<name>wfOutput</name>
<value>${coord:dataOut('output')}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
Note 3:coord:months(int n) 和 coord:endOfMonths(int n)
Demo:
<coordinator-app name="hello-coord" frequency="${coord:months(1)}"
start="2009-01-02T08:00Z" end="2009-04-02T08:00Z" timezone="America/Los_Angeles"
xmlns="uri:oozie:coordinator:0.1">
<controls>
<timeout>10</timeout>
<concurrency>${concurrency_level}</concurrency>
<execution>${execution_order}</execution>
<throttle>${materialization_throttle}</throttle>
</controls>
<datasets>
<dataset name="din" frequency="${coord:endOfMonths(1)}"
initial-instance="2009-01-02T08:00Z" timezone="America/Los_Angeles">
<uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
</dataset>
<dataset name="dout" frequency="${coord:minutes(30)}"
initial-instance="2009-01-02T08:00Z" timezone="UTC">
<uri-template>${baseFsURI}/${YEAR}/${MONTH}/${DAY}/${HOUR}/${MINUTE}</uri-template>
</dataset>
</datasets>
<input-events>
<data-in name="input" dataset="din">
<instance>${coord:current(0)}</instance>
</data-in>
</input-events>
<output-events>
<data-out name="output" dataset="dout">
<instance>${coord:current(1)}</instance>
</data-out>
</output-events>
<action>
<workflow>
<app-path>${wf_app_path}</app-path>
<configuration>
<property>
<name>wfInput</name>
<value>${coord:dataIn('input')}</value>
</property>
<property>
<name>wfOutput</name>
<value>${coord:dataOut('output')}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
Note4:Cron syntax
Field name | Allowed Values | Allowed Special Characters |
---|---|---|
Minutes | 0-59 | , - * / |
Hours | 0-23 | , - * / |
Day-of-month | 1-31 | , - * ? / L W |
Month | 1-12 or JAN-DEC | , - * / |
Day-of-Week | 1-7 or SUN-SAT | , - * ? / L # |
'':代表所有;
'?':应用在Day-of-month字段和Day-of-Week字段 ,指具体数值;
'-':标识范围;
',':用于枚举;
'/':表示增长的步长; e.g:"0/15" 表示 0,15,30,45; "5/15" 表示 5,20,35,50;"/15" 表示 "0/15"
'L' :用在 day-of-month 和day-of-week,标识倒数第几天;
'W':用在day-of-month,标识最近的星期几;
'#' :用在 day-of-week,表示第几个星期几;
Demo:
<coordinator-app name="cron-coord" frequency="0/10 1/2 * * *" start="${start}" end="${end}" timezone="UTC"
xmlns="uri:oozie:coordinator:0.2">
<action>
<workflow>
<app-path>${workflowAppUri}</app-path>
<configuration>
<property>
<name>jobTracker</name>
<value>${jobTracker}</value>
</property>
<property>
<name>nameNode</name>
<value>${nameNode}</value>
</property>
<property>
<name>queueName</name>
<value>${queueName}</value>
</property>
</configuration>
</workflow>
</action>
</coordinator-app>
Dataset
Synchronous Datasets:
同步数据集一般以一定的时间间隔,一定的频率规则产生,同步数据集实例以创建时间相互分辨,创建时间通常体现在URI中。
一个标准的同步数据集在xml中定义可能包含以下内容:
- name: 数据集的名称;
- frequency: 频率;
- **initial-instance: ** 初始化数据集实例的时间UTC时区,也使用这个时间做为基准时间来做频率计算;
- timezone: 时区;
- uri-template:
- constants
- **variables **
- done-flag:标识数据集准备好了可以被消费;
- If the done-flag is omitted the coordinator will wait for the presence of a _SUCCESS file in the directory (Note: MapReduce jobs create this on successful completion automatically).
- If the done-flag is present but empty, then the existence of the directory itself indicates that the dataset is ready.
- If the done-flag is present but non-empty, Oozie will check for the presence of the named file within the directory, and will be considered ready (done) when the file exists.
EL Constant | Resulting Format | Comments |
---|---|---|
YEAR | YYYY | 年份的四位数表示 |
MONTH | DD | 月份的两位数表示,一月份是1 |
DAY | DD | 天份的两位数表示 |
HOUR | HH | 小时 0-23 |
MINUTE | mm | 分钟 0-59 |
Demo:
<dataset name="[NAME]" frequency="[FREQUENCY]"
initial-instance="[DATETIME]" timezone="[TIMEZONE]">
<uri-template>[URI TEMPLATE]</uri-template>
<done-flag>[FILE NAME]</done-flag>
</dataset>
Examples:
1、数据集每天的00:15产生,完成标识被设置成空:
<dataset name="logs" frequency="${coord:days(1)}"
initial-instance="2009-02-15T08:15Z" timezone="America/Los_Angeles">
<uri-template>
hdfs://foo:8020/app/logs/${market}/${YEAR}${MONTH}/${DAY}/data
</uri-template>
<done-flag></done-flag>
</dataset>
hdfs://foo:8020/usr/app/[market]/2009/02/15/data
hdfs://foo:8020/usr/app/[market]/2009/02/16/data
hdfs://foo:8020/usr/app/[market]/2009/02/17/data
2、数据集在每个月的第十天可以使用,完成标识符是默认的'_SUCCESS':
<dataset name="stats" frequency="${coord:months(1)}"
initial-instance="2009-01-10T10:00Z" timezone="America/Los_Angeles">
<uri-template>hdfs://foo:8020/usr/app/stats/${YEAR}/${MONTH}/data</uri-template>
</dataset>
hdfs://foo:8020/usr/app/stats/2009/01/data
hdfs://foo:8020/usr/app/stats/2009/02/data
hdfs://foo:8020/usr/app/stats/2009/03/data
当在路径下面出现'_SUCCESS':标识,则说明数据集可以使用: