Celery 任务队列:异步、定时与分布式
Celery 任务队列:异步、定时与分布式
Celery 是一个简单、灵活且可靠的分布式任务队列系统,专注于实时处理,同时也支持任务调度。它能够将耗时的操作从应用主线程中剥离,通过消息中间件将任务分发给多个工作进程(Worker)异步执行。本教程将从零开始,带你掌握 Celery 的核心概念、安装配置、异步任务、定时任务以及分布式部署。
1. 理解 Celery 的核心角色
在开始编码前,需要先了解 Celery 架构中的几个重要角色:
- 生产者(Producer):你的应用代码,负责创建并发送任务到消息队列。
- 消息中间件(Broker):负责接收生产者发送的任务,并将其分发给消费者。Celery 支持 RabbitMQ、Redis、Amazon SQS 等。
- 消费者(Worker):Celery 的工作进程,负责从 Broker 中取出任务并执行。
- 结果后端(Result Backend):可选组件,用于存储任务的执行结果,以便生产者查询。支持 Redis、数据库、MongoDB 等。
简单流程:Producer → Broker → Worker → Result Backend(可选)。
2. 快速安装与环境准备
我们将使用 Redis 同时作为 Broker 和 Result Backend,因为它轻量且适合大多数场景。
pip install celery redis
确保你本地已经安装并启动了 Redis 服务(下载地址:https://redis.io/download)。默认情况下,Celery 连接本地 Redis 的 6379 端口。
3. 创建第一个 Celery 实例
首先创建一个 tasks.py 文件,用于定义 Celery 应用和任务。
from celery import Celery
# 创建 Celery 实例,'my_app' 为应用名
app = Celery('my_app',
broker='redis://localhost:6379/0', # 使用 Redis 0 号数据库作为 Broker
backend='redis://localhost:6379/1') # 使用 Redis 1 号数据库存储结果
# 定义一个简单的异步任务
@app.task
def add(x, y):
return x + y
@app.task 装饰器将普通函数转换成 Celery 任务,该任务就能被发送到队列并异步执行了。
4. 启动 Worker 并执行异步任务
打开终端,进入 tasks.py 所在目录,启动一个 Worker:
celery -A tasks worker --loglevel=info
-A tasks 表示 Celery 应用实例所在的模块。看到 celery@xxx ready 即表示 Worker 启动成功。
现在,另开一个 Python 交互环境,来调用任务:
>>> from tasks import add
# 调用 delay 方法将任务发送到队列
>>> result = add.delay(4, 4)
# result 是一个 AsyncResult 对象,可以检查任务状态
>>> result.ready()
True
>>> result.get() # 获取返回值(阻塞直到结果就绪)
8
delay() 是调用任务最简便的方式,也可以使用 apply_async() 并传入额外参数(如倒计时、队列路由等)。
5. 定时任务:利用 Celery Beat 实现周期调度
Celery 内置了周期任务调度器 celery beat。我们需要在配置中定义任务执行的节奏。
修改 tasks.py,添加定时设置:
from celery import Celery
from celery.schedules import crontab
app = Celery('my_app', broker='redis://localhost:6379/0', backend='redis://localhost:6379/1')
# 配置时区与调度项
app.conf.timezone = 'Asia/Shanghai'
app.conf.beat_schedule = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': 30.0, # 每30秒执行一次
'args': (16, 16)
},
'print-every-minute': {
'task': 'tasks.print_msg',
'schedule': crontab(minute='*/1'), # 使用 crontab 每分钟执行
},
}
@app.task
def add(x, y):
return x + y
@app.task
def print_msg():
print('定时任务每分钟打印一次')
启动 Beat 调度器(需要单独进程):
celery -A tasks beat --loglevel=info
同时保持 Worker 运行(另一个终端):
celery -A tasks worker --loglevel=info
Beat 会按照 beat_schedule 将任务发送到 Broker,Worker 接收并执行。你可以在 Worker 日志中看见定时触发的任务。
6. 任务高级配置:重试、超时与错误处理
Celery 任务支持自动重试、结果过期、软硬超时等配置,极大增强了稳健性。
@app.task(bind=True, max_retries=3, default_retry_delay=60)
def fetch_url(self, url):
import requests
try:
response = requests.get(url, timeout=5)
return response.status_code
except Exception as exc:
# 触发重试,exc 会被记录
raise self.retry(exc=exc)
常用参数解释:
bind=True:任务函数的第一个参数将成为任务实例本身(通常命名为self),这样才能调用self.retry()。max_retries:最大重试次数。default_retry_delay:重试等待的默认秒数。soft_time_limit/time_limit:软/硬超时,抛出SoftTimeLimitExceeded并可进行清理操作。
7. 任务路由与多队列
在生产环境中,常常需要将不同优先级的任务发送到不同队列,并由不同的 Worker 处理。
定义任务时指定队列名称:
@app.task(queue='high_priority')
def important_task():
pass
@app.task(queue='low_priority')
def bulk_task():
pass
启动 Worker 时指定只消费特定队列:
celery -A tasks worker -Q high_priority --loglevel=info
celery -A tasks worker -Q low_priority --concurrency=2 --loglevel=info
调用任务时可以显式指定路由:
important_task.apply_async(queue='high_priority')
bulk_task.apply_async(queue='low_priority')
8. 分布式部署:多机协同工作
Celery 天然支持分布式。只需确保所有机器的 Celery 应用代码一致,并且连接到同一个 Broker 和 Result Backend。然后,在多台机器上分别启动 Worker 即可。
假设你有两台机器 A 和 B,都部署了相同版本的 tasks.py,且都能访问 Redis 服务器。在 A 上运行:
celery -A tasks worker --hostname=worker_A@%h
在 B 上运行:
celery -A tasks worker --hostname=worker_B@%h
任务将在这两个 Worker 间自动负载均衡。你可以通过 --concurrency 参数控制每个 Worker 的并发进程数。
9. 监控与管理:Flower 与命令行工具
9.1 Celery 命令行监控
- 查看活跃 Worker:
celery -A tasks status - 列出已注册的任务:
celery -A tasks inspect registered - 查看队列中的任务数量:
celery -A tasks inspect active_queues
9.2 使用 Flower 实时监控
Flower 是一个基于 Web 的 Celery 实时监控工具。
安装:
pip install flower
启动(默认在 http://localhost:5555):
celery -A tasks flower
你可以在浏览器中查看 Worker 状态、任务历史、执行时间、重试情况等,并支持远程控制。
10. 生产环境配置建议
- 选择可靠的 Broker:RabbitMQ 比 Redis 更适合高压力、任务不能丢失的场景。Redis 简单且性能高,但在 Broker 崩溃时可能丢失少量未处理任务。
- 结果持久化:根据需求选择是否保留任务结果,大量结果可能撑爆 Redis 内存。可以设置
result_expires自动过期。 - Worker 部署:使用
supervisor、systemd或容器化(Docker)管理 Worker 进程,保证崩溃自动重启。 - 任务幂等性:由于网络或 Worker 重启,任务可能重复执行,确保任务实现幂等逻辑。
- 日志与错误追踪:配置集中日志系统(如 ELK),并利用 Celery 的错误信号发送告警。
总结
Celery 凭借其异步、定时和易于分布式扩展的特性,成为 Python 生态中处理后台任务的首选方案。掌握 Celery 的核心流程、任务定义、调度、路由与监控,就能轻松构建高可用、可伸缩的分布式系统。动手试验不同的配置,逐步加深理解,你很快就能在生产环境中游刃有余地使用 Celery。