public void handleEvent(MvcEvent e) {
EventType type = e.getType();
if (type == Dispatcher.BeforeDispatch) {
beforeDispatch(e);
} else if (type == Dispatcher.AfterDispatch) {
afterDispatch(e);
}
}
DispatcherListener处理事件的入口是 handleEvent,里面有两个具体的处理,已在在分发事件之前BeforeDispatch,一个在分发事件之后AfterDispatch;
AddJobListener
如果是新增操作,这里进行处理,添加controller
public void beforeDispatch(MvcEvent mvce) {
if(mvce.getAppEvent() instanceof JobMaintenanceEvent){
JobMaintenanceEvent event=(JobMaintenanceEvent)mvce.getAppEvent();
String jobId=event.getJobId();
boolean exist=false;
for(Controller c:new ArrayList<Controller>(context.getDispatcher().getControllers())){
if(c instanceof JobController){
JobController jc=(JobController)c;
if(jc.getJobId().equals(jobId)){
exist=true;
break;
}
}
}
if(!exist){//新增操作
JobController controller=new JobController(context,master, jobId);
context.getDispatcher().addController(controller);
controller.handleEvent(new AppEvent(Events.Initialize));
mvce.setCancelled(true);
log.error("schedule add job with jobId:"+jobId);
}
}
}
DebugListener
任务失败的监听
当任务失败,需要发送邮件给相关人员
public void beforeDispatch(MvcEvent mvce) {
try {
if(mvce.getAppEvent() instanceof DebugFailEvent){
final DebugFailEvent event=(DebugFailEvent) mvce.getAppEvent();
DebugHistory history=event.getHistory();
FileDescriptor fd=fileManager.getFile(history.getFileId());
String msg="调试任务:"+fd.getName()+" 运行失败";
//此处可以发送IM消息
}else if(mvce.getAppEvent() instanceof DebugSuccessEvent){
final DebugSuccessEvent event=(DebugSuccessEvent) mvce.getAppEvent();
DebugHistory history=event.getHistory();
FileDescriptor fd=fileManager.getFile(history.getFileId());
String msg="调试任务:"+fd.getName()+" 运行成功"; //此处可以发送IM消息
}
} catch (Exception e) {
//处理异常,防止后续的依赖任务受此影响,无法正常执行
log.error("失败任务,发送通知出现异常",e);
}
}
JobFailListener
任务失败的监听
当任务失败,需要发送邮件给相关人员
public void beforeDispatch(MvcEvent mvce) {
try {
if(mvce.getAppEvent() instanceof JobFailedEvent){
final JobFailedEvent event=(JobFailedEvent) mvce.getAppEvent();
final String jobId=event.getJobId();
final String causeJobId=event.getJobException().getCauseJobId();
if(chainLocal.get()==null || !chainLocal.get().getCauseJobId().equals(causeJobId)){
GroupBean gb=readOnlyGroupManager.getGlobeGroupBean();
chainLocal.set(new ChainException(causeJobId, gb));
}
final ChainException chain=chainLocal.get();
final JobBean jobBean=chain.gb.getAllSubJobBeans().get(jobId);
final ZeusUser owner=userManager.findByUid(jobBean.getJobDescriptor().getOwner());
//延迟6秒发送邮件,保证日志已经输出到数据库
new Thread(){
public void run() {
try {
Thread.sleep(6000);
StringBuffer sb=new StringBuffer();
sb.append("Job任务(").append(jobId).append(")").append(jobBean.getJobDescriptor().getName()).append("运行失败");
sb.append("<br/>");
String type="";
if(event.getTriggerType()==TriggerType.MANUAL){
type="手动触发";
}else if(event.getTriggerType()==TriggerType.MANUAL_RECOVER){
type="手动恢复";
}else if(event.getTriggerType()==TriggerType.SCHEDULE){
type="自动调度";
}
sb.append("Job任务的触发类型为:"+type).append("<br/>");
if(event.getHistory()!=null){
sb.append("失败原因:"+jobHistoryManager.findJobHistory(event.getHistory().getId()).getLog().getContent().replaceAll("\\n", "<br/>"));
String msg= "Zeus报警 JobId:"+jobId+" 任务运行失败";
if(!jobBean.getDepender().isEmpty()){
msg+=",影响范围:"+getDependencyJobs(jobBean);
}
if(!causeJobId.equalsIgnoreCase(event.getJobId())){
msg+="(根本原因:job "+causeJobId+"运行失败)";
}
mailAlarm.alarm(event.getHistory().getId(), msg, sb.toString());
}
} catch (Exception e) {
log.error("邮件发送出现异常",e);
}
};
}.start();
String msg="Job任务("+jobId+"-"+owner.getName()+"):"+jobBean.getJobDescriptor().getName()+" 运行失败";
if(!jobBean.getDepender().isEmpty()){
msg+=",影响范围:"+getDependencyJobs(jobBean);
}
if(!causeJobId.equalsIgnoreCase(event.getJobId())){
msg+="(根本原因:job "+causeJobId+"运行失败)";
} //手机报警
//只发送自动调度的报警 并且只在下班时间 或者周末发送
if(event.getHistory().getTriggerType()==TriggerType.SCHEDULE){
Calendar now=Calendar.getInstance();
int hour=now.get(Calendar.HOUR_OF_DAY);
int day=now.get(Calendar.DAY_OF_WEEK);
if(day==Calendar.SATURDAY || day==Calendar.SUNDAY || hour<9 || hour>18){
smsAlarm.alarm(event.getHistory().getId(), "宙斯报警", "宙斯"+msg,chain);
}
}
}
} catch (Exception e) {
//处理异常,防止后续的依赖任务受此影响,无法正常执行
log.error("失败任务,发送通知出现异常",e);
}
}
JobSuccessListener
public void beforeDispatch(MvcEvent mvce) {
try {
if(mvce.getAppEvent() instanceof JobSuccessEvent){
final JobSuccessEvent event=(JobSuccessEvent) mvce.getAppEvent();
if(event.getTriggerType()==TriggerType.SCHEDULE){
return;
}
JobHistory history=jobHistoryManager.findJobHistory(event.getHistoryId());
final JobDescriptor jd=groupManager.getJobDescriptor(history.getJobId()).getX();
if(history.getOperator()!=null){
//此处可以发送IM消息
}
}
} catch (Exception e) {
//处理异常,防止后续的依赖任务受此影响,无法正常执行
log.error("失败任务,发送通知出现异常",e);
}
}
StopScheduleJobListener
阻止Job任务进行自动调度(包含自动调度和手动恢复)
预发环境下使用
预发环境不允许运行自动调度,手动恢复,只能运行手动调度
public void beforeDispatch(MvcEvent mvce) {
if(mvce.getAppEvent().getType()==Events.Initialize){
//取消初始化事件,放置Job进行出错任务重试,以及开启定时器
mvce.setCancelled(true);
}
}