Yarn之状态机分析
前言
hadoop2.x.x版本的底层实现中作了很多优化:用状态机对各种对象生命周期和状态转移进行管理;采用事件机制避免线程同步与阻塞。主要分析下yarn中状态机的实现机制
一、事件
YARN中的很多组件之间进行通信,主要借助于事件。为了可读性、可维护性及可扩展性,YARN中的事件由事件名称和事件类型组成。比如JobImpl处理的事件名称为JobEvent,而事件类型为JobEventType.
二、状态
YARN中的很多组件之间进行通信,主要借助于事件。为了可读性、可维护性及可扩展性,YARN中的事件由事件名称和事件类型组成。比如JobImpl处理的事件名称为JobEvent,而事件类型为JobEventType,如下所示
public enum JobStateInternal {
NEW,
SETUP,
INITED,
RUNNING,
COMMITTING,
SUCCEEDED,
FAIL_WAIT,
FAIL_ABORT,
FAILED,
KILL_WAIT,
KILL_ABORT,
KILLED,
ERROR,
REBOOT
}
我们看到JobImpl的内部状态包括新建(NEW)、初始化(INITED)、运行中(RUNNING)、提交中(COMMITTING)、成功(SUCCEEDED)、失败(FAILED)等
三、转化
我们已经了解了事件与状态的基本实现与概念,那么事件与状态有什么关系?一个对象当前处于状态state0,当对象接收到事件Event后,将引发转换动作transition,最终当前对象的状态过渡到state1.
1)Yarn中与状态转化相关的类图如下
2)SingleArcTransition
@Public
@Evolving
public interface SingleArcTransition<OPERAND, EVENT> {
/**
* Transition hook.
*
* @param operand the entity attached to the FSM, whose internal
* state may change.
* @param event causal event
*/
public void transition(OPERAND operand, EVENT event);
}
由于SingleArcTransition的具体实现类只负责接收到事件后的具体操作或行为,并没有包含状态相关的信息,所以在状态机执行状态过渡时,并不是直接调用SingleArcTransition具体实现类的transition方法,而是由接口Transition定义(见代码清单3)真正的转态过渡(包括行为和状态改变)。
- Transition接口
private interface Transition<OPERAND, STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType);
}
- SingleInternalArc接口
SingleInternalArc作为Transition接口的实现类,在代理SingleArcTransition的同时,负责状态变换
private class SingleInternalArc
implements Transition<OPERAND, STATE, EVENTTYPE, EVENT> {
private STATE postState;
private SingleArcTransition<OPERAND, EVENT> hook; // transition hook
SingleInternalArc(STATE postState,
SingleArcTransition<OPERAND, EVENT> hook) {
this.postState = postState;
this.hook = hook;
}
@Override
public STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType) {
if (hook != null) {
hook.transition(operand, event);
}
return postState;
}
}
3)MultipleArcTransition
@Public
@Evolving
public interface MultipleArcTransition
<OPERAND, EVENT, STATE extends Enum<STATE>> {
/**
* Transition hook.
* @return the postState. Post state must be one of the
* valid post states registered in StateMachine.
* @param operand the entity attached to the FSM, whose internal
* state may change.
* @param event causal event
*/
public STATE transition(OPERAND operand, EVENT event);
}
由于MultipleArcTransition的具体实现类只负责接收到事件后的具体操作或行为,并没有包含状态相关的信息,所以在状态机执行状态过渡时,并不是直接调用MultipleArcTransition具体实现类的transition方法,而是通过代理类MultipleInternalArc。MultipleInternalArc也实现了Transition接口,并在代理 MultipleArcTransition 的转换行为的同时,负责状态变换.
- MultipleInternalArc
private class MultipleInternalArc
implements Transition<OPERAND, STATE, EVENTTYPE, EVENT>{
// Fields
private Set<STATE> validPostStates;
private MultipleArcTransition<OPERAND, EVENT, STATE> hook; // transition hook
MultipleInternalArc(Set<STATE> postStates,
MultipleArcTransition<OPERAND, EVENT, STATE> hook) {
this.validPostStates = postStates;
this.hook = hook;
}
@Override
public STATE doTransition(OPERAND operand, STATE oldState,
EVENT event, EVENTTYPE eventType)
throws InvalidStateTransitonException {
STATE postState = hook.transition(operand, event);
if (!validPostStates.contains(postState)) {
throw new InvalidStateTransitonException(oldState, eventType);
}
return postState;
}
}
4)ApplicableTransition
为了将所有状态机中的状态过渡与状态建立起映射关系,YARN中提供了ApplicableTransition接口用于将SingleInternalArc和 MultipleInternalArc 添加到状态机的拓扑表中,提高在检索状态对应的过渡实现时的性能,ApplicableTransition的实现类为ApplicableSingleOrMultipleTransition类,其apply方法用于代理SingleInternalArc和MultipleInternalArc ,将它们添加到状态拓扑表中。
- ApplicableTransition接口定义
private interface ApplicableTransition
<OPERAND, STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
void apply(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject);
}
- ApplicableSingleOrMultipleTransition
static private class ApplicableSingleOrMultipleTransition
<OPERAND, STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT>
implements ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> {
final STATE preState;
final EVENTTYPE eventType;
final Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
ApplicableSingleOrMultipleTransition
(STATE preState, EVENTTYPE eventType,
Transition<OPERAND, STATE, EVENTTYPE, EVENT> transition) {
this.preState = preState;
this.eventType = eventType;
this.transition = transition;
}
@Override
public void apply
(StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> subject) {
Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>> transitionMap
= subject.stateMachineTable.get(preState);
if (transitionMap == null) {
// I use HashMap here because I would expect most EVENTTYPE's to not
// apply out of a particular state, so FSM sizes would be
// quadratic if I use EnumMap's here as I do at the top level.
transitionMap = new HashMap<EVENTTYPE,
Transition<OPERAND, STATE, EVENTTYPE, EVENT>>();
subject.stateMachineTable.put(preState, transitionMap);
}
transitionMap.put(eventType, transition);
}
}
可以看到ApplicableSingleOrMultipleTransition的apply方法就是为构建状态拓扑表
四、状态机
YARN中状态机的实现类是StateMachineFactory,它主要包含4个属性信息:
- transitionsListNode:就是将状态机的一个个过渡的ApplicableTransition实现串联为一个列表,每个节点包含一个ApplicableTransition实现及指向下一个节点的引用
private class TransitionsListNode {
final ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition;
final TransitionsListNode next;
TransitionsListNode
(ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> transition,
TransitionsListNode next) {
this.transition = transition;
this.next = next;
}
}
transitionsListNode形成的过渡列表节点如下:
- stateMachineTable:状态拓扑表,为了提高检索状态对应的过渡map而冗余的数据结构,此结构在optimized为真时,通过对transitionsListNode链表进行处理产生
defaultInitialState:对象创建时,内部有限状态机的默认初始状态。比如:JobImpl的内部状态机默认初始状态是JobStateInternal.NEW。
optimized:布尔类型,用于标记当前状态机是否需要优化性能,即构建状态拓扑表stateMachineTable。
-
公共构造器
StateMachineFactory 的公有构造器只有一个。
public StateMachineFactory(STATE defaultInitialState) { this.transitionsListNode = null; this.defaultInitialState = defaultInitialState; this.optimized = false; this.stateMachineTable = null; }
-
私有构造器
StateMachineFactory 的 私有构造器有两个,其中代码清单11中的构造器在addTransition方法中使用。
从其实现看出,此构造器的主要作用是构建transitionsListNode链表。private StateMachineFactory (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT> t) { this.defaultInitialState = that.defaultInitialState; this.transitionsListNode = new TransitionsListNode(t, that.transitionsListNode); this.optimized = false; this.stateMachineTable = null; }
下面的构造器则用于installTopology方法
private StateMachineFactory (StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT> that, boolean optimized) { this.defaultInitialState = that.defaultInitialState; this.transitionsListNode = that.transitionsListNode; this.optimized = optimized; if (optimized) { makeStateMachineTable(); } else { stateMachineTable = null; } }
构造器当optimized参数为true时,调用了makeStateMachineTable方法,makeStateMachineTable的实现如下:
private void makeStateMachineTable() { Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack = new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>(); Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>> prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(); prototype.put(defaultInitialState, null); // I use EnumMap here because it'll be faster and denser. I would // expect most of the states to have at least one transition. stateMachineTable = new EnumMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype); for (TransitionsListNode cursor = transitionsListNode; cursor != null; cursor = cursor.next) { stack.push(cursor.transition); } while (!stack.isEmpty()) { stack.pop().apply(this); } }
1.创建堆栈stack,用于将transitionsListNode链表中各个节点持有的ApplicableSingleOrMultipleTransition压入栈中;
2.创建状态拓扑表stateMachineTable,并在此拓扑表中插入一个额外的默认初始状态defaultInitialState与null的映射;
3.迭代访问transitionsListNode链表,并将各个节点持有的ApplicableSingleOrMultipleTransition压入栈中;
4.依次弹出栈顶的ApplicableSingleOrMultipleTransition,并应用其apply方法(已在前面小节介绍),持续不断的构建状态拓扑表stateMachineTable。
五、状态机的构建
为了简化叙述,本节以JobImpl中状态机的构建为例。由于JobImpl的状态机预设的(调用addTransition方法)加入的ApplicableSingleOrMultipleTransition非常多,我们节选其中的2个作为典型进行分析。最后还会分析installTopology方法的实现。
protected static final
StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
stateMachineFactory
= new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
(JobStateInternal.NEW)
// Transitions from NEW state
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
.addTransition
(JobStateInternal.NEW,
EnumSet.of(JobStateInternal.INITED, JobStateInternal.NEW),
JobEventType.JOB_INIT,
new InitTransition())
// 省略其它addTransition调用
// create the topology tables
.installTopology();
构建JobImpl的状态机的步骤如下:
1.调用 StateMachineFactory 构造器创建一个初始的状态机
2.调用addTransition(STATE preState, STATE postState, EVENTTYPE eventType, SingleArcTransition<OPERAND, EVENT> hook)方法添加单弧过渡。从其实现(见代码清单15)可以知道addTransition方法将SingleArcTransition封装为SingleInternalArc,然后将SingleInternalArc封装为ApplicableSingleOrMultipleTransition,最后调用之前说的第一个私有构造器构建transitionsListNode链表
3.调用addTransition(STATE preState, Set<STATE> postStates, EVENTTYPE eventType, MultipleArcTransition<OPERAND, EVENT, STATE> hook)方法添加多弧过渡。从其实现(见代码清单16)可以知道addTransition方法将MultipleArcTransition封装为MultipleInternalArc,然后将MultipleInternalArc封装为ApplicableSingleOrMultipleTransition,最后调用之前说的第一个私有构造器构建transitionsListNode链表;
4.最后调用installTopology方法,其实现见代码清单17。installTopology正是在使用之前说的第二个私有构造器构建状态拓扑表stateMachineTable
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
addTransition(STATE preState, STATE postState,
EVENTTYPE eventType,
SingleArcTransition<OPERAND, EVENT> hook){
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
(this, new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
(preState, eventType, new SingleInternalArc(postState, hook)));
}
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
addTransition(STATE preState, Set<STATE> postStates,
EVENTTYPE eventType,
MultipleArcTransition<OPERAND, EVENT, STATE> hook){
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>
(this,
new ApplicableSingleOrMultipleTransition<OPERAND, STATE, EVENTTYPE, EVENT>
(preState, eventType, new MultipleInternalArc(postStates, hook)));
}
public StateMachineFactory
<OPERAND, STATE, EVENTTYPE, EVENT>
installTopology() {
return new StateMachineFactory<OPERAND, STATE, EVENTTYPE, EVENT>(this, true);
}
Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>> stack =
new Stack<ApplicableTransition<OPERAND, STATE, EVENTTYPE, EVENT>>();
Map<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>
prototype = new HashMap<STATE, Map<EVENTTYPE, Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>();
prototype.put(defaultInitialState, null);
// I use EnumMap here because it'll be faster and denser. I would
// expect most of the states to have at least one transition.
stateMachineTable
= new EnumMap<STATE, Map<EVENTTYPE,
Transition<OPERAND, STATE, EVENTTYPE, EVENT>>>(prototype);
for (TransitionsListNode cursor = transitionsListNode;
cursor != null;
cursor = cursor.next) {
stack.push(cursor.transition);
}
//最终在这里,状态路由表都存放到一个StateMachineFactory中了
while (!stack.isEmpty()) {
stack.pop().apply(this);
}