Apache Airflow 数据管道:DAG 调度与监控

FreeGuideOnline 最新 2026-06-17

Apache Airflow 数据管道:DAG 调度与监控完全指南

Apache Airflow 是一个开源工作流编排平台,核心能力是通过有向无环图(DAG)定义、调度和监控复杂的数据管道。即使你是初学者,理解 DAG 的调度机制与内置监控功能,也能立刻让数据任务变得可靠且可观测。

什么是 DAG?为什么它是数据管道的蓝图

在 Airflow 中,所有工作流都被抽象为一个 DAG。DAG 由节点(Task)和边(Task 之间的依赖关系)组成,它描述了任务的执行顺序,但本身不执行任何计算,只负责编排。

  • 节点:一个 Task 代表一个工作单元,例如一段 Python 函数、一个 Bash 命令或一个 SQL 查询。
  • 依赖关系:通过 >><< 运算符设定。task_a >> task_b 表示 task_b 必须在 task_a 成功后才会执行。
  • 无环:DAG 严禁循环依赖,这保证了工作流的有限性和可终止性。

定义 DAG 时需指定核心的调度参数,它们共同决定管道的“心跳”。

调度器如何工作:定时触发与回填

Airflow 的调度器持续检查所有 DAG,并按照你设定的时间表生成任务实例,放入队列等待执行。理解调度周期是避免常见陷阱的关键。

schedule_interval:设置定期运行规则

schedule_interval 支持 Cron 表达式、预设常量(如 @daily)或 datetime.timedelta 对象。

参数 含义 示例
None 不自动调度,仅手动触发 用于事件驱动的管道
@once 仅调度一次 初始化任务
@hourly 每小时整点执行 0 * * * *
@daily 每天午夜执行 0 0 * * *
@weekly 每周日午夜执行 0 0 * * 0
Cron 表达式 '30 8 * * 1-5' 工作日早 8:30 复杂的自定义计划

start_datecatchup:理解调度窗口

  • start_date:DAG 的计划开始日期。Airflow 根据此日期和 schedule_interval 生成第一个数据区间。
  • 数据区间:对于计划时间为 2025-03-24 00:00(假设 @daily),其实际处理的数据区间是 2025-03-23。Airflow 在周期结束时触发该周期的运行。
  • catchup:默认为 True。如果当前日期远晚于 start_date,Airflow 会自动回填所有错过的调度周期。对于增量数据处理管道,务必设置为 False 以从当前时刻开始执行,防止重复处理历史数据。

一个典型的 DAG 定义骨架如下:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

default_args = {
    'owner': 'data_team',
    'retries': 2,
}

with DAG(
    dag_id='etl_pipeline',
    start_date=datetime(2025, 3, 23),
    schedule_interval='@daily',
    catchup=False,
    default_args=default_args,
    description='一个简单的 ETL 管道',
    tags=['example'],
) as dag:

    extract = BashOperator(task_id='extract', bash_command='echo "提取数据"')
    transform = BashOperator(task_id='transform', bash_command='echo "转换数据"')
    load = BashOperator(task_id='load', bash_command='echo "加载数据"')

    extract >> transform >> load

监控 DAG:掌握管道健康状况

Airflow Web UI 为每个 DAG 提供丰富的监控视图,无需额外配置即可快速定位故障。以下是监控中最常用的页面。

DAG 运行视图 (Grid View)

Grid 视图以矩阵形式展示一段时间内所有 DAG 运行与任务实例的状态。每一列是一组 DAG 运行,每一行是一个 Task。状态颜色一目了然:

  • 深绿色:成功
  • 红色:失败
  • 橙色:重试中
  • 品红色:上游失败(由于前面任务失败而跳过)
  • 蓝色:排队中
  • 浅绿色:运行中

单击任意任务实例方块,可查看日志、执行时间、XCom 数据等详细信息。这是日常排错的入口。

甘特图 (Gantt Chart)

为了优化管道执行时间,使用甘特图查看每个任务的实际运行时序。它能暴露瓶颈任务、并行度利用不充分等问题。在 DAG 页面顶部点击“Gantt”即可获得任务条形图。

任务持续时间 (Task Duration) 与着陆时间 (Landing Times)

  • Task Duration 曲线图:展示每个任务实例的执行时长随时间的变化。若出现上升趋势,可能是数据量增长或资源竞争的信号。
  • Landing Times:显示任务实际完成时间与计划时间的偏差,帮助判断是否超出了数据 SLA。

警报与回调

默认情况下,任务失败会在 Web UI 上显示红色。对于生产级别的监控,必须配置主动通知:

  1. 默认参数 email_on_failureemail:在 default_args 中设置,当任务失败时自动发送告警邮件(需配置 SMTP 后端)。
  2. 回调函数:使用 on_failure_callback 参数执行自定义代码,例如发送 Slack 消息、调用 PagerDuty 或记录到外部系统。
def notify_slack(context):
    # 通过 Slack Webhook 发送通知
    pass

task = PythonOperator(
    task_id='risky_job',
    python_callable=my_function,
    on_failure_callback=notify_slack
)

调度器高级控制:让你的管道准时可靠

超时与重试机制

  • retries:任务失败后的重试次数。
  • retry_delay:两次重试之间的等待时间(如 timedelta(minutes=5))。
  • execution_timeout:单个任务的最长运行时间,超时后任务被标记为失败并触发重试。这是防止僵尸任务的关键。
task = PythonOperator(
    task_id='long_running_task',
    python_callable=heavy_processing,
    execution_timeout=timedelta(hours=2),
    retries=1,
    retry_delay=timedelta(minutes=10)
)

依赖规则与触发器规则

默认情况下,Task 只有在所有直接上游任务都成功时才会执行。但你可以通过 trigger_rule 改变这一行为,实现灵活的容错管道:

  • all_success(默认):所有上游成功。
  • all_failed:所有上游失败。
  • one_success:至少一个上游成功。
  • one_failed:至少一个上游失败。
  • all_done:所有上游完成,无论成败。
  • none_failed:所有上游未失败(即成功或跳过)。
cleanup = BashOperator(
    task_id='cleanup_resources',
    bash_command='cleanup.sh',
    trigger_rule='all_done'
)

调度器性能考虑

  • 单个 DAG 不要定义过多任务(>200),复杂逻辑可通过 SubDAG 或 TaskGroup 组织。
  • 避免在 DAG 解析文件中执行密集操作,因为调度器定期加载所有 DAG 文件。
  • 设置合理的 scheduler_heartbeat_sec 和并行度参数(parallelism, dag_concurrency)以平衡吞吐量与资源。

最佳实践总结

  • 始终为生产 DAG 设置 catchup=False,除非明确需要回填历史数据。
  • 固定 start_date 为过去某个时刻,不要使用 datetime.now(),否则每次重新解析文件都会产生不同的开始日期,导致调度异常。
  • 将所有 DAG 置于版本控制,并启用 DAG Serialization 以减少 Web 服务器与调度器之间的同步问题。
  • 充分利用 TaskGroup 组织复杂管道,替代旧版 SubDAG。
  • 主动监控任务持续时间,建立内部 SLA,触发自定义警报。

Airflow 的调度与监控模型成熟且强韧。通过精心设置时间参数、善用 UI 图表和故障转移规则,即使是复杂的数据管道也能做到“启动即监控,失败即响应”。接下来,你可以尝试定义一个自己的 DAG,并在 Grid 视图中观察它的首次心跳。