airflow的使用方法

简介

airflow是airbnb家的基于DAG(有向无环图)的任务管理系统, 最简单的理解就是一个高级版的crontab。它解决了crontab无法解决的任务依赖问题。

类似产品比较

系统 介绍
Apache Oozie 使用XML配置, Oozie任务的资源文件都必须存放在HDFS上. 配置不方便同时也只能用于Hadoop.
Linkedin Azkaban web界面尤其很赞, 使用java properties文件维护任务依赖关系, 任务资源文件需要打包成zip, 部署不是很方便.
airflow 具有自己的web任务管理界面,dag任务创建通过python代码,可以保证其灵活性和适应性

web界面使用介绍

DAGS

启动web任务管理需要执行airflow websever -D命令,默认端口是8080
http://10.191.76.31:8080/admin/

image.png

  • DAG
    dag_id
  • Schedule
    调度时间
  • Owner
    dag拥有者
  • Recent Tasks
    这里包含9个圆圈,每个圆圈代表task的执行状态和次数
    圈1 success:现实成功的task数,基本上就是该tag包含多少个task,这里基本上就显示几。
    圈2 running:正在运行的task数
    圈3 failed:失败的task数
    圈4 unstream_failed:
    圈5 skipped:跳过的task数
    圈6 up_for_retry:执行失败的task,重新执行的task数
    圈7 queued:队列,等待执行的task数
    圈8 :
    圈9 scheduled:刚开始调度dag时,这一次执行总共调度了dag下面多少个task数,并且随着task的执行成功,数值逐渐减少。
  • Last Run
    dag最后执行的时间点
  • DAG Runs
    这里显示dag的执行信息,包括3个圆圈,每个圆圈代表dag的执行状态和次数
    圈1 success:总共执行成功的dag数,执行次数
    圈2 runing:正在执行dag数
    圈3 faild:执行失败的dag数
  • Links
link 说明
Trigger Dag 人为执行触发
Tree View 当dag执行的时候,可以点入,查看每个task的执行状态(基于树状视图),状态:success,running,failed,skipped,retry,queued,no status
Graph View 同上,基于图视图(有向无环图),查看每个task的执行状态,状态:success,running,failed,skipped,retry,queued,no status
Tasks Duration 每个task的执行时间统计,可以选择最近多少次执行(number of runs)
Task Tries 每个task的重试次数
Landing Times
Gantt View 基于甘特图的视图,每个task的执行状态
  • Code View
    查看任务执行代码
  • Logs
    查看执行日志,比如失败原因
  • Refresh
    刷新dag任务
    -Delete Dag
    删除该dag任务

当某dag执行失败,可以通过3个View视图去查看是哪个task执行失败。

Data Profiling 数据分析

image.png
  • Ad Hoc Query:特殊查询
    通过UI界面对一些数据库,数据仓库的进行简单的SQL交互操作.


    Ad Hoc Query

    image.png
  • Charts:图表
    实现数据可视化和图表的工作。通过SQL去源数据库检索一些数据,保存下来,供后续使用。

These charts are basic, but they’re easy to create, modify and share


Charts

Charts.png

You can even use the same templating and macros available when writing airflow pipelines, parameterizing your queries and modifying parameters directly in the URL.


image.png
  • Known Events:已知的事件


    Known Events

Browse 浏览

Browse
  • SLA Misses

  • Task Instances:查看每个task实例执行情况


    Task Instances
  • Logs:查看所有dag下面对应的task的日志,并且包含检索


    image.png
  • Jobs:查看dag的执行状态,开始时间和结束时间等指标


    image.png
  • DAG Runs

Admin:管理员

image.png
  • Pools:

  • Configuration:查看airflow的配置,即:./airflow_home/airflow.cfg

  • Users:查看用户列表,创建用户,删除用户

  • Connections
    我们的Task需要通过Hook访问其他资源, Hook仅仅是一种访问方式, 就像是JDBC driver一样, 要连接DB, 我们还需要DB的IP/Port/User/Pwd等信息. 这些信息不太适合hard code在每个task中, 可以把它们定义成Connection, airflow将这些connection信息存放在后台的connection表中. 我们可以在WebUI的Admin->Connections管理这些连接.

  • Variables
    Variable 没有task_id/dag_id属性, 往往用来定义一些系统级的常量或变量, 我们可以在WebUI或代码中新建/更新/删除Variable. 也可以在WebUI上维护变量.
    Variable 的另一个重要的用途是, 我们为Prod/Dev环境做不同的设置, 详见后面的开发小节.

  • XComs
    XCom和Variable类似, 用于Task之间共享一些信息. XCom 包含task_id/dag_id属性, 适合于Task之间传递数据, XCom使用方法比Variables复杂些. 比如有一个dag, 两个task组成(T1->T2), 可以在T1中使用xcom_push()来推送一个kv, 在T2中使用xcom_pull()来获取这个kv.

Docs

image.png
  • 官方文档
  • Github地址

Dag提交-python配置任务

  • DAG 基本参数配置
default_args = {
    'owner': 'airflow',
    'depends_on_past': False,   # 是否依赖上一个自己的执行状态 
    'start_date': datetime.datetime(2019, 1, 1),
    'email': ['wangzhenjun@gmail.com'], # 需要在airflow.cfg中配置下发件邮箱
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': datetime.timedelta(minutes=5),
    # 'end_date': datetime(2020, 1, 1),   # 结束时间,注释掉也就会一直执行下去
}
  • DAG对象
    设置dag的执行周期:schedule_interval.该参数可以接收cron 表达式和datetime.timedelta对象,另外airflow还预置了一些调度周期。
preset Description cron
None Don’t schedule, use for exclusively “externally triggered” DAGs
@once Schedule once and only once
@hourly Run once an hour at the beginning of the hour 0 * * * *
@daily Run once a day at midnight 0 0 * * *
@weekly Run once a week at midnight on Sunday morning 0 0 * * 0
@monthly Run once a month at midnight of the first day of the month 0 0 1 * *
@yearly Run once a year at midnight of January 1 0 0 1 1 *
dag = DAG(
    'tutorial', 
default_args=default_args, 
schedule_interval='* * * * *' # 执行周期,crontab形式
)
  • 定义任务
    在定义这个任务的过程,就像是在写一个 shell 脚本,只是这个脚本的每个操作可以有依赖。 不同的操作对应了不同的 Operator,比如 shell 就需要用 BashOperator 来执行。
t1 = BashOperator(   #任务类型是bash
    task_id='echoDate', #任务id
    bash_command='echo date > /home/datefile', #任务命令
    dag=dag)
  • 完整样例
# coding: utf-8

from airflow import DAG
from airflow.operators.python_operator import PythonOperator
from datetime import datetime, timedelta



# 定义默认参数
default_args = {
    'owner': 'wangzhenjun',  # 拥有者名称
    'depends_on_past': False,   # 是否依赖上一个自己的执行状态
    'start_date': datetime(2019, 1, 15, 10, 00),  # 第一次开始执行的时间,为格林威治时间,为了方便测试,一般设置为当前时间减去执行周期
    'email': ['wangzhenjun01@corp.netease.com'],  # 接收通知的email列表
    'email_on_failure': True,  # 是否在任务执行失败时接收邮件
    'email_on_retry': True,  # 是否在任务重试时接收邮件
    'retries': 3,  # 失败重试次数
    'retry_delay': timedelta(seconds=5)  # 失败重试间隔
}

# 定义DAG
dag = DAG(
    dag_id='hello_world',  # dag_id
    default_args=default_args,  # 指定默认参数
    # schedule_interval="00, *, *, *, *"  # 执行周期,依次是分,时,天,月,年,此处表示每个整点执行
    schedule_interval=timedelta(minutes=1)  # 执行周期,表示每分钟执行一次
)

"""
1.通过PythonOperator定义执行python函数的任务
"""
# 定义要执行的Python函数1
def hello_world_1():
    current_time = str(datetime.today())
    with open('/root/tmp/hello_world_1.txt', 'a') as f:
        f.write('%s\n' % current_time)
    assert 1 == 1  # 可以在函数中使用assert断言来判断执行是否正常,也可以直接抛出异常
# 定义要执行的Python函数2
def hello_world_2():
    current_time = str(datetime.today())
    with open('/root/tmp/hello_world_2.txt', 'a') as f:
        f.write('%s\n' % current_time)

# 定义要执行的task 1
t1 = PythonOperator(
    task_id='hello_world_1',  # task_id
    python_callable=hello_world_1,  # 指定要执行的函数
    dag=dag,  # 指定归属的dag
    retries=2,  # 重写失败重试次数,如果不写,则默认使用dag类中指定的default_args中的设置
)
# 定义要执行的task 2
t2 = PythonOperator(
    task_id='hello_world_2',  # task_id
    python_callable=hello_world_2,  # 指定要执行的函数
    dag=dag,  # 指定归属的dag
)

t2.set_upstream(t1)  # t2依赖于t1;等价于 t1.set_downstream(t2);同时等价于 dag.set_dependency('hello_world_1', 'hello_world_2')
# 表示t2这个任务只有在t1这个任务执行成功时才执行,
# 或者
t1 >> t2


"""
2.通过BashOperator定义执行bash命令的任务
"""
hello_operator = BashOperator(   #通过BashOperator定义执行bash命令的任务
    task_id='sleep_task',
    depends_on_past=False,
    bash_command='echo `date` >> /home/py/test.txt',
    dag=dag
)
"""
其他任务处理器:
3.EmailOperator : 发送邮件
4.HTTPOperator : 发送 HTTP 请求
5.SqlOperator : 执行 SQL 命令
"""

分布式部署

CeleryExecutor is one of the ways you can scale out the number of workers. For this to work, you need to setup a Celery backend (RabbitMQ, Redis, …) and change your airflow.cfg to point the executor parameter to CeleryExecutor and provide the related Celery settings.

我们的生产环境:
每台机器运行的任务所属应用各不相同,不同应用运行环境也不相同,另外不同应用也希望达到集群隔离的目的。如果要实现这个功能,需要自己提供队列的管理,指定队列的任务节点会被调度到相应队列的机器上,相应队列的机器也只会运行指定队列的任务节点。

大部分都是集中在2-3台机器提交,环境类似,各自提交任务,但是任务通过主节点去随机分发到各结点执行,并不能保证环境的满足。
现在情况:如果是组内使用,各位的环境差异比较大,首先需要保证各环境的统一性

面临的问题:

  • 官方文档+网上的关于分布式的资料不多,官方文档更多是一笔带过。
image.png

image.png
最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 203,271评论 5 476
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 85,275评论 2 380
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 150,151评论 0 336
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 54,550评论 1 273
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 63,553评论 5 365
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,559评论 1 281
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,924评论 3 395
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,580评论 0 257
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,826评论 1 297
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,578评论 2 320
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,661评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,363评论 4 318
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,940评论 3 307
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,926评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 31,156评论 1 259
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,872评论 2 349
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 42,391评论 2 342

推荐阅读更多精彩内容

  • 本文将介绍 Airflow 这一款优秀的调度工具。主要包括 Airflow 的服务构成、Airflow 的 Web...
    a7f00a9019ae阅读 61,838评论 6 42
  • Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AM...
    大佛爱读书阅读 2,811评论 0 20
  • 在快速启动部分中设置很简单,构建生产级环境需要更多的工作,下面来了解一下。 1. 设置配置选项 第一次运行Airf...
    路小漫阅读 9,295评论 0 3
  • 阿里妹导读:搜索中台建设过程中,单个系统不再能满足复杂业务的需求,更多时候需要多个子系统互相协作,异步地按照指定流...
    高级java架构师阅读 4,454评论 0 7
  • 2016-05-22《解决力》肖威 改变你命运的第24课 演讲者:肖威 信息整理者:陈安娜 可参考:大前研一——《...
    陈安娜阅读 1,103评论 0 0