Celery 任务队列:异步、定时与分布式

FreeGuideOnline 最新 2026-06-16

Celery 任务队列:异步、定时与分布式

Celery 是一个简单、灵活且可靠的分布式任务队列系统,专注于实时处理,同时也支持任务调度。它能够将耗时的操作从应用主线程中剥离,通过消息中间件将任务分发给多个工作进程(Worker)异步执行。本教程将从零开始,带你掌握 Celery 的核心概念、安装配置、异步任务、定时任务以及分布式部署。

1. 理解 Celery 的核心角色

在开始编码前,需要先了解 Celery 架构中的几个重要角色:

  • 生产者(Producer):你的应用代码,负责创建并发送任务到消息队列。
  • 消息中间件(Broker):负责接收生产者发送的任务,并将其分发给消费者。Celery 支持 RabbitMQ、Redis、Amazon SQS 等。
  • 消费者(Worker):Celery 的工作进程,负责从 Broker 中取出任务并执行。
  • 结果后端(Result Backend):可选组件,用于存储任务的执行结果,以便生产者查询。支持 Redis、数据库、MongoDB 等。

简单流程:Producer → Broker → Worker → Result Backend(可选)

2. 快速安装与环境准备

我们将使用 Redis 同时作为 Broker 和 Result Backend,因为它轻量且适合大多数场景。

pip install celery redis

确保你本地已经安装并启动了 Redis 服务(下载地址:https://redis.io/download)。默认情况下,Celery 连接本地 Redis 的 6379 端口。

3. 创建第一个 Celery 实例

首先创建一个 tasks.py 文件,用于定义 Celery 应用和任务。

from celery import Celery

# 创建 Celery 实例,'my_app' 为应用名
app = Celery('my_app',
             broker='redis://localhost:6379/0',      # 使用 Redis 0 号数据库作为 Broker
             backend='redis://localhost:6379/1')     # 使用 Redis 1 号数据库存储结果

# 定义一个简单的异步任务
@app.task
def add(x, y):
    return x + y

@app.task 装饰器将普通函数转换成 Celery 任务,该任务就能被发送到队列并异步执行了。

4. 启动 Worker 并执行异步任务

打开终端,进入 tasks.py 所在目录,启动一个 Worker:

celery -A tasks worker --loglevel=info

-A tasks 表示 Celery 应用实例所在的模块。看到 celery@xxx ready 即表示 Worker 启动成功。

现在,另开一个 Python 交互环境,来调用任务:

>>> from tasks import add

# 调用 delay 方法将任务发送到队列
>>> result = add.delay(4, 4)

# result 是一个 AsyncResult 对象,可以检查任务状态
>>> result.ready()
True
>>> result.get()        # 获取返回值(阻塞直到结果就绪)
8

delay() 是调用任务最简便的方式,也可以使用 apply_async() 并传入额外参数(如倒计时、队列路由等)。

5. 定时任务:利用 Celery Beat 实现周期调度

Celery 内置了周期任务调度器 celery beat。我们需要在配置中定义任务执行的节奏。

修改 tasks.py,添加定时设置:

from celery import Celery
from celery.schedules import crontab

app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')

# 配置时区与调度项
app.conf.timezone = 'Asia/Shanghai'
app.conf.beat_schedule = {
    'add-every-30-seconds': {
        'task': 'tasks.add',
        'schedule': 30.0,                     # 每30秒执行一次
        'args': (16, 16)
    },
    'print-every-minute': {
        'task': 'tasks.print_msg',
        'schedule': crontab(minute='*/1'),    # 使用 crontab 每分钟执行
    },
}

@app.task
def add(x, y):
    return x + y

@app.task
def print_msg():
    print('定时任务每分钟打印一次')

启动 Beat 调度器(需要单独进程):

celery -A tasks beat --loglevel=info

同时保持 Worker 运行(另一个终端):

celery -A tasks worker --loglevel=info

Beat 会按照 beat_schedule 将任务发送到 Broker,Worker 接收并执行。你可以在 Worker 日志中看见定时触发的任务。

6. 任务高级配置:重试、超时与错误处理

Celery 任务支持自动重试、结果过期、软硬超时等配置,极大增强了稳健性。

@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_url(self, url):
    import requests
    try:
        response = requests.get(url, timeout=5)
        return response.status_code
    except Exception as exc:
        # 触发重试,exc 会被记录
        raise self.retry(exc=exc)

常用参数解释:

  • bind=True:任务函数的第一个参数将成为任务实例本身(通常命名为 self),这样才能调用 self.retry()
  • max_retries:最大重试次数。
  • default_retry_delay:重试等待的默认秒数。
  • soft_time_limit / time_limit:软/硬超时,抛出 SoftTimeLimitExceeded 并可进行清理操作。

7. 任务路由与多队列

在生产环境中,常常需要将不同优先级的任务发送到不同队列,并由不同的 Worker 处理。

定义任务时指定队列名称:

@app.task(queue='high_priority')
def important_task():
    pass

@app.task(queue='low_priority')
def bulk_task():
    pass

启动 Worker 时指定只消费特定队列:

celery -A tasks worker -Q high_priority --loglevel=info
celery -A tasks worker -Q low_priority --concurrency=2 --loglevel=info

调用任务时可以显式指定路由:

important_task.apply_async(queue='high_priority')
bulk_task.apply_async(queue='low_priority')

8. 分布式部署:多机协同工作

Celery 天然支持分布式。只需确保所有机器的 Celery 应用代码一致,并且连接到同一个 Broker 和 Result Backend。然后,在多台机器上分别启动 Worker 即可。

假设你有两台机器 A 和 B,都部署了相同版本的 tasks.py,且都能访问 Redis 服务器。在 A 上运行:

celery -A tasks worker --hostname=worker_A@%h

在 B 上运行:

celery -A tasks worker --hostname=worker_B@%h

任务将在这两个 Worker 间自动负载均衡。你可以通过 --concurrency 参数控制每个 Worker 的并发进程数。

9. 监控与管理:Flower 与命令行工具

9.1 Celery 命令行监控

  • 查看活跃 Worker:
    celery -A tasks status
    
  • 列出已注册的任务:
    celery -A tasks inspect registered
    
  • 查看队列中的任务数量:
    celery -A tasks inspect active_queues
    

9.2 使用 Flower 实时监控

Flower 是一个基于 Web 的 Celery 实时监控工具。

安装:

pip install flower

启动(默认在 http://localhost:5555):

celery -A tasks flower

你可以在浏览器中查看 Worker 状态、任务历史、执行时间、重试情况等,并支持远程控制。

10. 生产环境配置建议

  • 选择可靠的 Broker:RabbitMQ 比 Redis 更适合高压力、任务不能丢失的场景。Redis 简单且性能高,但在 Broker 崩溃时可能丢失少量未处理任务。
  • 结果持久化:根据需求选择是否保留任务结果,大量结果可能撑爆 Redis 内存。可以设置 result_expires 自动过期。
  • Worker 部署:使用 supervisorsystemd 或容器化(Docker)管理 Worker 进程,保证崩溃自动重启。
  • 任务幂等性:由于网络或 Worker 重启,任务可能重复执行,确保任务实现幂等逻辑。
  • 日志与错误追踪:配置集中日志系统(如 ELK),并利用 Celery 的错误信号发送告警。

总结

Celery 凭借其异步、定时和易于分布式扩展的特性,成为 Python 生态中处理后台任务的首选方案。掌握 Celery 的核心流程、任务定义、调度、路由与监控,就能轻松构建高可用、可伸缩的分布式系统。动手试验不同的配置,逐步加深理解,你很快就能在生产环境中游刃有余地使用 Celery。