工作流管理平台 Airflow 入门

环境

CentOS Linux release 7.5.1804

Python 3.6.4/2.7.14

简介

Airflow 是 Airbnb 开源的一个用 Python 编写的工作流管理平台,自带 web UI 和调度,目前在Apache下做孵化。


Airflow 管理页面

Airflow 中有两个基本概念,DAG和task。
DAG是多个task的集合,定义在一个Python文件中,包含了task之间的依赖关系,如task A在task B之后执行,task C可以单独执行等等。

安装并运行

# 默认目录在~/airflow,也可以使用以下命令来指定目录
export AIRFLOW_HOME=~/airflow

pip install apache-airflow

# 初始化数据库
airflow initdb

# 启动web服务,默认端口为8080,也可以通过`-p`来指定
airflow webserver -p 8080

# 启动 scheduler
airflow scheduler

定义第一个DAG

$AIRFLOW_HOME目录下新建dags文件夹,后面的所有dag文件都要存储在这个目录。

新建dag文件hello_world.py,语句含义见注释

# coding: utf-8

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta


# 定义默认参数
default_args = {
    'owner': 'airflow',  # 拥有者名称
    'start_date': datetime(2018, 6, 6, 20, 00),  # 第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期
    'email': ['shelmingsong@gmail.com'],  # 接收通知的email列表
    'email_on_failure': True,  # 是否在任务执行失败时接收邮件
    'email_on_retry': True,  # 是否在任务重试时接收邮件
    'retries': 3,  # 失败重试次数
    'retry_delay': timedelta(seconds=5)  # 失败重试间隔
}

# 定义DAG
dag = DAG(
    dag_id='hello_world',  # dag_id
    default_args=default_args,  # 指定默认参数
    # schedule_interval="00, *, *, *, *"  # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行
    schedule_interval=timedelta(minutes=1)  # 执行周期,表示每分钟执行一次
)


# 定义要执行的Python函数1
def hello_world_1():
    current_time = str(datetime.today())
    with open('/root/tmp/hello_world_1.txt', 'a') as f:
        f.write('%s\n' % current_time)
    assert 1 == 1  # 可以在函数中使用assert断言来判断执行是否正常,也可以直接抛出异常


# 定义要执行的Python函数2
def hello_world_2():
    current_time = str(datetime.today())
    with open('/root/tmp/hello_world_2.txt', 'a') as f:
        f.write('%s\n' % current_time)


# 定义要执行的Python函数3
def hello_world_3():
    current_time = str(datetime.today())
    with open('/root/tmp/hello_world_3.txt', 'a') as f:
        f.write('%s\n' % current_time)

# 定义要执行的task 1
t1 = PythonOperator(
    task_id='hello_world_1',  # task_id
    python_callable=hello_world_1,  # 指定要执行的函数
    dag=dag,  # 指定归属的dag
    retries=2,  # 重写失败重试次数,如果不写,则默认使用dag类中指定的default_args中的设置
)

# 定义要执行的task 2
t2 = PythonOperator(
    task_id='hello_world_2',  # task_id
    python_callable=hello_world_2,  # 指定要执行的函数
    dag=dag,  # 指定归属的dag
)

# 定义要执行的task 3
t3 = PythonOperator(
    task_id='hello_world_3',  # task_id
    python_callable=hello_world_3,  # 指定要执行的函数
    dag=dag,  # 指定归属的dag
)

t2.set_upstream(t1)
# 表示t2这个任务只有在t1这个任务执行成功时才执行,
# 等价于 t1.set_downstream(t2)
# 同时等价于 dag.set_dependency('hello_world_1', 'hello_world_2')

t3.set_upstream(t1)  # 同理

写完后执行它检查是否有错误,如果命令行没有报错,就表示没问题。

python $AIRFLOW_HOME/dags/hello_world.py

通过以下命令查看生效的dags

[root@localhost dags]# airflow list_dags
[2018-06-06 21:03:25,808] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-06-06 21:03:25,877] {models.py:189} INFO - Filling up the DagBag from /root/airflow/dags


-------------------------------------------------------------------
DAGS
-------------------------------------------------------------------
hello_world

查看hello_world这个dag下面的tasks

[root@localhost dags]# airflow list_tasks hello_world
[2018-06-06 21:04:45,736] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-06-06 21:04:45,805] {models.py:189} INFO - Filling up the DagBag from /root/airflow/dags
hello_world_1
hello_world_2
hello_world_3

查看hello_world这个dag下面tasks的层级关系

[root@localhost dags]# airflow list_tasks hello_world --tree
[2018-06-06 21:05:42,956] {__init__.py:45} INFO - Using executor SequentialExecutor
[2018-06-06 21:05:43,020] {models.py:189} INFO - Filling up the DagBag from /root/airflow/dags
<Task(PythonOperator): hello_world_2>
    <Task(PythonOperator): hello_world_1>
<Task(PythonOperator): hello_world_3>
    <Task(PythonOperator): hello_world_1>

如果按照以上步骤启动了schedule,则DAG已经开始定时执行了,我们设置了每分钟执行一次,可以访问your_domain:8080来查看任务的执行情况。
也可以查看/root/tmp/hello_world_1.txt/root/tmp/hello_world_2.txt/root/tmp/hello_world_3.txt文件中的内容来检查任务是否执行成功。

执行失败时email通知

如果需要在任务执行失败(执行过程中有异常抛出)的时候邮件通知,除了在DAG文件中指定接收email列表外,还需要在配置文件中指定发送邮箱的信息,打开配置文件$AIRFLOW_HOME/airflow.cfg,修改以下配置项,修改完需要重启webserver和schedule

smtp_host = smtp.163.com  # smtp邮箱地址
smtp_starttls = True  # 是否tls加密
smtp_mail_from = demo@163.com  # 发件人邮箱地址,需开通smtp服务
smtp_ssl = False  # 是否ssl加密
smtp_port = 25  # smtp端口号

使用位位移来指定执行顺序

以下四行的作用是相同的

op1 >> op2
op1.set_downstream(op2)

op2 << op1
op2.set_upstream(op1)

也可以连续使用位位移

op1 >> op2 >> op3 << op4

以上等价于

op1.set_downstream(op2)
op2.set_downstream(op3)
op3.set_upstream(op4)

使用变量(Variables)

变量的value可以在UI界面的Admin > Variables里面进行增删改查

可以在代码中这样使用变量

from airflow.models import Variable
foo = Variable.get("foo", default_var='a')  # 设置当获取不到时使用的默认值
bar = Variable.get("bar", deserialize_json=True)  # 对json数据进行反序列化

更多

Airflow doc

博客更新地址

©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,456评论 5 477
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,370评论 2 381
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,337评论 0 337
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,583评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,596评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,572评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,936评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,595评论 0 258
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,850评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,601评论 2 321
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,685评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,371评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,951评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,934评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,167评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 43,636评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,411评论 2 342

推荐阅读更多精彩内容

  • Spring Cloud为开发人员提供了快速构建分布式系统中一些常见模式的工具(例如配置管理,服务发现,断路器,智...
    卡卡罗2017阅读 134,585评论 18 139
  • .bat脚本基本命令语法 目录 批处理的常见命令(未列举的命令还比较多,请查阅帮助信息) 1、REM 和 :: 2...
    庆庆庆庆庆阅读 8,036评论 1 19
  • 什么是shell shell 是一个命令语言解释器(command-language interpreter)。 ...
    Arteezy_Xie阅读 1,050评论 0 0
  • 本文将介绍 Airflow 这一款优秀的调度工具。主要包括 Airflow 的服务构成、Airflow 的 Web...
    a7f00a9019ae阅读 61,846评论 6 42
  • 样式新闻,公安部召开警察规范执法的会议视频--我要叫好。这段视频,就是被大家普遍诟病的,给中国抹黑的,让大家极度反...
    寻找无双zxl阅读 267评论 3 0