Bull 任务队列:Redis 驱动的后台作业处理
Bull 任务队列:Redis 驱动的后台作业处理
1. 什么是 Bull?
Bull 是一个基于 Redis 的 Node.js 库,用于实现可靠、高性能的任务队列系统。它允许你将耗时或异步的任务(如发送邮件、图片处理、数据同步)放入队列,交由后台 Worker 逐一或并发处理,从而保持应用主线程的快速响应。
核心特点:
- 基于 Redis,无需额外依赖
- 支持延迟任务、重复任务、优先级队列
- 提供完善的任务生命周期事件
- 内置重试机制和失败处理
- 支持并发控制与速率限制
- 可搭配可视化面板(如 Bull Board)进行监控
2. 核心概念速览
在开始编码前,先理解 Bull 的四个关键要素:
- Queue(队列):任务的存储单元,通过 Redis 持久化。一个应用可以有多个队列。
- Job(任务):队列中的实际工作单元,包含数据和执行选项。
- Producer(生产者):任何向队列中添加任务的代码。
- Processor(处理器):定义如何处理某个任务类型的函数,运行在 Worker 进程或线程中。
3. 环境准备与安装
3.1 前置要求
- Node.js (v14 及以上)
- 运行中的 Redis 服务器(本地或远程均可)
3.2 安装 Bull
在项目根目录执行:
npm install bull
如果你的项目中需要同时使用 TypeScript 类型支持,可安装:
npm install --save-dev @types/bull
4. 创建第一个队列
Bull 队列拥有一个唯一的名称(name),并与 Redis 连接绑定。
const Queue = require('bull');
// 创建邮件队列,使用默认本地 Redis 配置
const emailQueue = new Queue('email', 'redis://127.0.0.1:6379');
// 也可传入配置对象(支持 Redis 集群、哨兵等)
const imageQueue = new Queue('image processing', {
redis: { port: 6379, host: '127.0.0.1', password: 'yourpassword' }
});
注意:队列名称会在 Redis 中作为键名前缀使用,避免命名冲突。
5. 向队列添加任务(生产任务)
使用 add() 方法将一个任务投递到队列。可以传递数据和配置选项。
// 添加一个简单任务
emailQueue.add({
to: 'user@example.com',
subject: 'Welcome!',
body: 'Thanks for signing up.'
});
// 添加带选项的任务:延迟执行、优先级、重试等
emailQueue.add(
{ to: 'admin@example.com', subject: 'Daily Report' },
{
delay: 60000, // 延迟 60 秒执行
attempts: 3, // 失败后最多重试 3 次
priority: 1, // 高优先级 (1 最高, 数字越大优先级越低)
backoff: 5000, // 重试间隔 5 秒
removeOnComplete: true // 完成后自动删除任务
}
);
add() 方法返回一个 Promise,解析为保存后的 Job 对象,你可以用其获取任务 ID 或监听状态。
6. 处理任务(Worker 端)
使用 process() 方法定义任务处理逻辑。每个队列可以绑定一个或多个处理器。
6.1 单处理器
emailQueue.process(async (job) => {
// job.data 包含了添加任务时传入的数据
const { to, subject, body } = job.data;
// 模拟发送邮件
await someEmailService.send(to, subject, body);
// 可以任意返回值,会被存储到任务结果中
return `Email sent to ${to}`;
});
6.2 基于任务类型的分处理器(命名处理器)
如果队列需要处理多种不同类型的任务,可以为每个任务指定 name。
// 添加任务时指定名称
emailQueue.add('welcome', { to: 'user@x.com' });
emailQueue.add('reset-password', { to: 'user@x.com', token: 'abc' });
// 注册不同名称的处理器
emailQueue.process('welcome', async (job) => { /* ... */ });
emailQueue.process('reset-password', async (job) => { /* ... */ });
// 设置默认处理器(处理未匹配到名称的任务)
emailQueue.process(async (job) => { /* ... */ });
7. 监听任务事件
任务在其生命周期中会发出多种事件,可用于日志记录、监控或业务触发。
// 队列级别事件(所有任务的全局状态变化)
emailQueue.on('completed', (job, result) => {
console.log(`Job ${job.id} completed with result ${result}`);
});
emailQueue.on('failed', (job, err) => {
console.error(`Job ${job.id} failed: ${err.message}`);
});
// 单个任务的事件(必须在任务创建时保存 job 实例)
const job = await emailQueue.add(data);
job.on('progress', (progress) => {
console.log(`Job ${job.id} is ${progress}% done`);
});
// 在处理器内部通过 job.progress(50) 来更新进度
常用事件:waiting、active、completed、failed、stalled、progress。
8. 高级任务选项与特性
8.1 延迟任务
通过 delay 选项(毫秒)让任务在将来某个时间点执行。
emailQueue.add(data, { delay: 60000 }); // 1 分钟后执行
8.2 重复任务(Cron 风格)
使用 repeat 选项定义基于 Cron 表达式的定时任务。
emailQueue.add(
{ type: 'report' },
{
repeat: {
cron: '0 8 * * *', // 每天上午 8 点
tz: 'Asia/Shanghai' // 时区
}
}
);
注意:重复任务通过重复的 Job ID 管理,默认 ID 由数据+选项生成。如需更新重复任务,确保使用相同的 jobId。
8.3 任务优先级
priority 数值越小优先级越高,高优先级任务会优先出队。
8.4 并发控制
通过 process(concurrency, handler) 限制同时处理的任务数量。
// 最多同时处理 5 个任务
emailQueue.process(5, async (job) => { /* ... */ });
8.5 速率限制(Rate Limiter)
限制任务处理速率,避免下游服务过载。
const rateLimitedQueue = new Queue('api-calls', 'redis://127.0.0.1:6379', {
limiter: {
max: 10, // 最大处理数量
duration: 5000 // 每 5000 毫秒内只能处理 max 个任务
}
});
9. 错误处理与重试
Bull 内置强大的重试机制。当处理器抛出异常时,任务自动进入 failed 状态并根据配置重试。
emailQueue.add(data, {
attempts: 5,
backoff: {
type: 'exponential', // 固定延迟 'fixed' 或指数退避 'exponential'
delay: 2000
}
});
在处理器内部,你也可以手动将任务标记为完成但附带错误结果,或主动抛出错误。
10. 管理队列与任务
10.1 获取任务信息
const job = await emailQueue.getJob(jobId);
console.log(job.data, job.progress(), job.returnvalue);
10.2 暂停/恢复队列
await emailQueue.pause();
// ... 稍后恢复
await emailQueue.resume();
10.3 清理旧任务
// 清空已完成的任务
await emailQueue.clean(1000, 'completed');
// 清空所有失败的任务
await emailQueue.clean(0, 'failed');
10.4 移除任务
const job = await emailQueue.getJob(jobId);
if (job) {
await job.remove();
}
11. 监控与可视化:Bull Board
为了直观查看队列状态、任务数量、失败原因等,可搭配 bull-board 使用。
安装:
npm install bull-board @bull-board/express
集成到 Express 应用:
const Queue = require('bull');
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');
const emailQueue = new Queue('email', redis);
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');
const { addQueue, removeQueue } = createBullBoard({
queues: [new BullAdapter(emailQueue)],
serverAdapter: serverAdapter,
});
app.use('/admin/queues', serverAdapter.getRouter());
访问 http://localhost:3000/admin/queues 即可看到仪表盘。
12. 多 Worker 与分布式部署
Bull 通过 Redis 天然支持多个 Worker 进程或分布式机器。只需在多个进程中创建同名队列并注册相同的处理器,Bull 会自动轮询分发任务,实现水平扩展。
注意:
- 确保所有 Worker 可连接到同一 Redis 实例。
- 避免在不同 Worker 中注册同名但逻辑不同的处理器,否则结果不可预期。
13. 最佳实践
- 连接管理:如 Redis 连接断开,Bull 默认会自动重连,但在应用关闭时建议调用
queue.close()或worker.close()优雅退出。 - 任务数据序列化:数据会被 JSON 序列化保存,避免传递 Binary 大对象,可改为传递 ID 并让 Worker 从数据库读取。
- 避免重复注册事件:事件监听器请放在模块顶层,防止多次注册导致内存泄漏。
- 监控停滞任务:设置
stalledInterval(默认 30000ms)检查被锁死的心跳,及时重试。 - 性能调优:根据负载调整 Redis 的
maxmemory-policy以及 Bull 的lockDuration等高级选项。
14. 完整示例
将上述各部分组合为一个最小可运行示例:
producer.js
const Queue = require('bull');
const q = new Queue('math');
async function main() {
await q.add({ x: 2, y: 3 });
await q.add({ x: 10, y: 5 }, { delay: 5000 });
}
main();
worker.js
const Queue = require('bull');
const q = new Queue('math');
q.process(async (job) => {
const { x, y } = job.data;
return x + y;
});
q.on('completed', (job, result) => {
console.log(`Job ${job.id} completed, result: ${result}`);
});
运行:
# 启动 Worker
node worker.js
# 启动 Producer
node producer.js
15. 常见问题排查
| 现象 | 可能原因 | 解决方案 |
|---|---|---|
| 任务一直处于 waiting 状态 | Worker 未启动或未注册对应名称的处理器 | 检查 worker 是否运行并注册了正确的处理器 |
| Job stalled repeatedly | 处理器执行超时(超过 lockDuration)或被阻塞 |
延长 lockDuration,优化处理器,避免同步阻塞代码 |
| Redis 内存持续增长 | 已完成/失败的任务未清理 | 使用 removeOnComplete、removeOnFail 选项或调用 clean() |
| 重复任务不按计划执行 | cron 表达式错误或时区未设置 | 验证 cron 表达式,明确设置 tz 选项 |
通过以上内容,你已经掌握了 Bull 任务队列的核心用法,可以开始构建健壮的后台作业系统了。结合你的业务场景,灵活运用优先级、并发、重试等机制,能够显著提升应用的稳定性和性能。