Apache Airflow 数据管道:DAG 调度与监控
Apache Airflow 数据管道:DAG 调度与监控完全指南
Apache Airflow 是一个开源工作流编排平台,核心能力是通过有向无环图(DAG)定义、调度和监控复杂的数据管道。即使你是初学者,理解 DAG 的调度机制与内置监控功能,也能立刻让数据任务变得可靠且可观测。
什么是 DAG?为什么它是数据管道的蓝图
在 Airflow 中,所有工作流都被抽象为一个 DAG。DAG 由节点(Task)和边(Task 之间的依赖关系)组成,它描述了任务的执行顺序,但本身不执行任何计算,只负责编排。
- 节点:一个 Task 代表一个工作单元,例如一段 Python 函数、一个 Bash 命令或一个 SQL 查询。
- 依赖关系:通过
>>或<<运算符设定。task_a >> task_b表示 task_b 必须在 task_a 成功后才会执行。 - 无环:DAG 严禁循环依赖,这保证了工作流的有限性和可终止性。
定义 DAG 时需指定核心的调度参数,它们共同决定管道的“心跳”。
调度器如何工作:定时触发与回填
Airflow 的调度器持续检查所有 DAG,并按照你设定的时间表生成任务实例,放入队列等待执行。理解调度周期是避免常见陷阱的关键。
schedule_interval:设置定期运行规则
schedule_interval 支持 Cron 表达式、预设常量(如 @daily)或 datetime.timedelta 对象。
| 参数 | 含义 | 示例 |
|---|---|---|
None |
不自动调度,仅手动触发 | 用于事件驱动的管道 |
@once |
仅调度一次 | 初始化任务 |
@hourly |
每小时整点执行 | 0 * * * * |
@daily |
每天午夜执行 | 0 0 * * * |
@weekly |
每周日午夜执行 | 0 0 * * 0 |
Cron 表达式 '30 8 * * 1-5' |
工作日早 8:30 | 复杂的自定义计划 |
start_date 与 catchup:理解调度窗口
start_date:DAG 的计划开始日期。Airflow 根据此日期和schedule_interval生成第一个数据区间。- 数据区间:对于计划时间为
2025-03-24 00:00(假设@daily),其实际处理的数据区间是2025-03-23。Airflow 在周期结束时触发该周期的运行。 catchup:默认为True。如果当前日期远晚于start_date,Airflow 会自动回填所有错过的调度周期。对于增量数据处理管道,务必设置为False以从当前时刻开始执行,防止重复处理历史数据。
一个典型的 DAG 定义骨架如下:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
default_args = {
'owner': 'data_team',
'retries': 2,
}
with DAG(
dag_id='etl_pipeline',
start_date=datetime(2025, 3, 23),
schedule_interval='@daily',
catchup=False,
default_args=default_args,
description='一个简单的 ETL 管道',
tags=['example'],
) as dag:
extract = BashOperator(task_id='extract', bash_command='echo "提取数据"')
transform = BashOperator(task_id='transform', bash_command='echo "转换数据"')
load = BashOperator(task_id='load', bash_command='echo "加载数据"')
extract >> transform >> load
监控 DAG:掌握管道健康状况
Airflow Web UI 为每个 DAG 提供丰富的监控视图,无需额外配置即可快速定位故障。以下是监控中最常用的页面。
DAG 运行视图 (Grid View)
Grid 视图以矩阵形式展示一段时间内所有 DAG 运行与任务实例的状态。每一列是一组 DAG 运行,每一行是一个 Task。状态颜色一目了然:
- 深绿色:成功
- 红色:失败
- 橙色:重试中
- 品红色:上游失败(由于前面任务失败而跳过)
- 蓝色:排队中
- 浅绿色:运行中
单击任意任务实例方块,可查看日志、执行时间、XCom 数据等详细信息。这是日常排错的入口。
甘特图 (Gantt Chart)
为了优化管道执行时间,使用甘特图查看每个任务的实际运行时序。它能暴露瓶颈任务、并行度利用不充分等问题。在 DAG 页面顶部点击“Gantt”即可获得任务条形图。
任务持续时间 (Task Duration) 与着陆时间 (Landing Times)
- Task Duration 曲线图:展示每个任务实例的执行时长随时间的变化。若出现上升趋势,可能是数据量增长或资源竞争的信号。
- Landing Times:显示任务实际完成时间与计划时间的偏差,帮助判断是否超出了数据 SLA。
警报与回调
默认情况下,任务失败会在 Web UI 上显示红色。对于生产级别的监控,必须配置主动通知:
- 默认参数
email_on_failure和email:在default_args中设置,当任务失败时自动发送告警邮件(需配置 SMTP 后端)。 - 回调函数:使用
on_failure_callback参数执行自定义代码,例如发送 Slack 消息、调用 PagerDuty 或记录到外部系统。
def notify_slack(context):
# 通过 Slack Webhook 发送通知
pass
task = PythonOperator(
task_id='risky_job',
python_callable=my_function,
on_failure_callback=notify_slack
)
调度器高级控制:让你的管道准时可靠
超时与重试机制
retries:任务失败后的重试次数。retry_delay:两次重试之间的等待时间(如timedelta(minutes=5))。execution_timeout:单个任务的最长运行时间,超时后任务被标记为失败并触发重试。这是防止僵尸任务的关键。
task = PythonOperator(
task_id='long_running_task',
python_callable=heavy_processing,
execution_timeout=timedelta(hours=2),
retries=1,
retry_delay=timedelta(minutes=10)
)
依赖规则与触发器规则
默认情况下,Task 只有在所有直接上游任务都成功时才会执行。但你可以通过 trigger_rule 改变这一行为,实现灵活的容错管道:
all_success(默认):所有上游成功。all_failed:所有上游失败。one_success:至少一个上游成功。one_failed:至少一个上游失败。all_done:所有上游完成,无论成败。none_failed:所有上游未失败(即成功或跳过)。
cleanup = BashOperator(
task_id='cleanup_resources',
bash_command='cleanup.sh',
trigger_rule='all_done'
)
调度器性能考虑
- 单个 DAG 不要定义过多任务(>200),复杂逻辑可通过 SubDAG 或 TaskGroup 组织。
- 避免在 DAG 解析文件中执行密集操作,因为调度器定期加载所有 DAG 文件。
- 设置合理的
scheduler_heartbeat_sec和并行度参数(parallelism,dag_concurrency)以平衡吞吐量与资源。
最佳实践总结
- 始终为生产 DAG 设置
catchup=False,除非明确需要回填历史数据。 - 固定
start_date为过去某个时刻,不要使用datetime.now(),否则每次重新解析文件都会产生不同的开始日期,导致调度异常。 - 将所有 DAG 置于版本控制,并启用
DAG Serialization以减少 Web 服务器与调度器之间的同步问题。 - 充分利用 TaskGroup 组织复杂管道,替代旧版 SubDAG。
- 主动监控任务持续时间,建立内部 SLA,触发自定义警报。
Airflow 的调度与监控模型成熟且强韧。通过精心设置时间参数、善用 UI 图表和故障转移规则,即使是复杂的数据管道也能做到“启动即监控,失败即响应”。接下来,你可以尝试定义一个自己的 DAG,并在 Grid 视图中观察它的首次心跳。