Bull 任务队列:Redis 驱动的后台作业处理

FreeGuideOnline 最新 2026-06-16

Bull 任务队列:Redis 驱动的后台作业处理

1. 什么是 Bull?

Bull 是一个基于 Redis 的 Node.js 库,用于实现可靠、高性能的任务队列系统。它允许你将耗时或异步的任务(如发送邮件、图片处理、数据同步)放入队列,交由后台 Worker 逐一或并发处理,从而保持应用主线程的快速响应。

核心特点:

  • 基于 Redis,无需额外依赖
  • 支持延迟任务、重复任务、优先级队列
  • 提供完善的任务生命周期事件
  • 内置重试机制和失败处理
  • 支持并发控制与速率限制
  • 可搭配可视化面板(如 Bull Board)进行监控

2. 核心概念速览

在开始编码前,先理解 Bull 的四个关键要素:

  • Queue(队列):任务的存储单元,通过 Redis 持久化。一个应用可以有多个队列。
  • Job(任务):队列中的实际工作单元,包含数据和执行选项。
  • Producer(生产者):任何向队列中添加任务的代码。
  • Processor(处理器):定义如何处理某个任务类型的函数,运行在 Worker 进程或线程中。

3. 环境准备与安装

3.1 前置要求

  • Node.js (v14 及以上)
  • 运行中的 Redis 服务器(本地或远程均可)

3.2 安装 Bull

在项目根目录执行:

npm install bull

如果你的项目中需要同时使用 TypeScript 类型支持,可安装:

npm install --save-dev @types/bull

4. 创建第一个队列

Bull 队列拥有一个唯一的名称(name),并与 Redis 连接绑定。

const Queue = require('bull');

// 创建邮件队列,使用默认本地 Redis 配置
const emailQueue = new Queue('email', 'redis://127.0.0.1:6379');

// 也可传入配置对象(支持 Redis 集群、哨兵等)
const imageQueue = new Queue('image processing', {
  redis: { port: 6379, host: '127.0.0.1', password: 'yourpassword' }
});

注意:队列名称会在 Redis 中作为键名前缀使用,避免命名冲突。

5. 向队列添加任务(生产任务)

使用 add() 方法将一个任务投递到队列。可以传递数据和配置选项。

// 添加一个简单任务
emailQueue.add({
  to: 'user@example.com',
  subject: 'Welcome!',
  body: 'Thanks for signing up.'
});

// 添加带选项的任务:延迟执行、优先级、重试等
emailQueue.add(
  { to: 'admin@example.com', subject: 'Daily Report' },
  {
    delay: 60000,           // 延迟 60 秒执行
    attempts: 3,            // 失败后最多重试 3 次
    priority: 1,            // 高优先级 (1 最高, 数字越大优先级越低)
    backoff: 5000,          // 重试间隔 5 秒
    removeOnComplete: true  // 完成后自动删除任务
  }
);

add() 方法返回一个 Promise,解析为保存后的 Job 对象,你可以用其获取任务 ID 或监听状态。

6. 处理任务(Worker 端)

使用 process() 方法定义任务处理逻辑。每个队列可以绑定一个或多个处理器。

6.1 单处理器

emailQueue.process(async (job) => {
  // job.data 包含了添加任务时传入的数据
  const { to, subject, body } = job.data;

  // 模拟发送邮件
  await someEmailService.send(to, subject, body);

  // 可以任意返回值,会被存储到任务结果中
  return `Email sent to ${to}`;
});

6.2 基于任务类型的分处理器(命名处理器)

如果队列需要处理多种不同类型的任务,可以为每个任务指定 name

// 添加任务时指定名称
emailQueue.add('welcome', { to: 'user@x.com' });
emailQueue.add('reset-password', { to: 'user@x.com', token: 'abc' });

// 注册不同名称的处理器
emailQueue.process('welcome', async (job) => { /* ... */ });
emailQueue.process('reset-password', async (job) => { /* ... */ });

// 设置默认处理器(处理未匹配到名称的任务)
emailQueue.process(async (job) => { /* ... */ });

7. 监听任务事件

任务在其生命周期中会发出多种事件,可用于日志记录、监控或业务触发。

// 队列级别事件(所有任务的全局状态变化)
emailQueue.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed with result ${result}`);
});

emailQueue.on('failed', (job, err) => {
  console.error(`Job ${job.id} failed: ${err.message}`);
});

// 单个任务的事件(必须在任务创建时保存 job 实例)
const job = await emailQueue.add(data);
job.on('progress', (progress) => {
  console.log(`Job ${job.id} is ${progress}% done`);
});
// 在处理器内部通过 job.progress(50) 来更新进度

常用事件:waitingactivecompletedfailedstalledprogress

8. 高级任务选项与特性

8.1 延迟任务

通过 delay 选项(毫秒)让任务在将来某个时间点执行。

emailQueue.add(data, { delay: 60000 }); // 1 分钟后执行

8.2 重复任务(Cron 风格)

使用 repeat 选项定义基于 Cron 表达式的定时任务。

emailQueue.add(
  { type: 'report' },
  {
    repeat: {
      cron: '0 8 * * *',   // 每天上午 8 点
      tz: 'Asia/Shanghai'   // 时区
    }
  }
);

注意:重复任务通过重复的 Job ID 管理,默认 ID 由数据+选项生成。如需更新重复任务,确保使用相同的 jobId。

8.3 任务优先级

priority 数值越小优先级越高,高优先级任务会优先出队。

8.4 并发控制

通过 process(concurrency, handler) 限制同时处理的任务数量。

// 最多同时处理 5 个任务
emailQueue.process(5, async (job) => { /* ... */ });

8.5 速率限制(Rate Limiter)

限制任务处理速率,避免下游服务过载。

const rateLimitedQueue = new Queue('api-calls', 'redis://127.0.0.1:6379', {
  limiter: {
    max: 10,      // 最大处理数量
    duration: 5000 // 每 5000 毫秒内只能处理 max 个任务
  }
});

9. 错误处理与重试

Bull 内置强大的重试机制。当处理器抛出异常时,任务自动进入 failed 状态并根据配置重试。

emailQueue.add(data, {
  attempts: 5,
  backoff: {
    type: 'exponential', // 固定延迟 'fixed' 或指数退避 'exponential'
    delay: 2000
  }
});

在处理器内部,你也可以手动将任务标记为完成但附带错误结果,或主动抛出错误。

10. 管理队列与任务

10.1 获取任务信息

const job = await emailQueue.getJob(jobId);
console.log(job.data, job.progress(), job.returnvalue);

10.2 暂停/恢复队列

await emailQueue.pause();
// ... 稍后恢复
await emailQueue.resume();

10.3 清理旧任务

// 清空已完成的任务
await emailQueue.clean(1000, 'completed');
// 清空所有失败的任务
await emailQueue.clean(0, 'failed');

10.4 移除任务

const job = await emailQueue.getJob(jobId);
if (job) {
  await job.remove();
}

11. 监控与可视化:Bull Board

为了直观查看队列状态、任务数量、失败原因等,可搭配 bull-board 使用。

安装:

npm install bull-board @bull-board/express

集成到 Express 应用:

const Queue = require('bull');
const { createBullBoard } = require('@bull-board/api');
const { BullAdapter } = require('@bull-board/api/bullAdapter');
const { ExpressAdapter } = require('@bull-board/express');

const emailQueue = new Queue('email', redis);
const serverAdapter = new ExpressAdapter();
serverAdapter.setBasePath('/admin/queues');

const { addQueue, removeQueue } = createBullBoard({
  queues: [new BullAdapter(emailQueue)],
  serverAdapter: serverAdapter,
});

app.use('/admin/queues', serverAdapter.getRouter());

访问 http://localhost:3000/admin/queues 即可看到仪表盘。

12. 多 Worker 与分布式部署

Bull 通过 Redis 天然支持多个 Worker 进程或分布式机器。只需在多个进程中创建同名队列并注册相同的处理器,Bull 会自动轮询分发任务,实现水平扩展。

注意:

  • 确保所有 Worker 可连接到同一 Redis 实例。
  • 避免在不同 Worker 中注册同名但逻辑不同的处理器,否则结果不可预期。

13. 最佳实践

  • 连接管理:如 Redis 连接断开,Bull 默认会自动重连,但在应用关闭时建议调用 queue.close()worker.close() 优雅退出。
  • 任务数据序列化:数据会被 JSON 序列化保存,避免传递 Binary 大对象,可改为传递 ID 并让 Worker 从数据库读取。
  • 避免重复注册事件:事件监听器请放在模块顶层,防止多次注册导致内存泄漏。
  • 监控停滞任务:设置 stalledInterval(默认 30000ms)检查被锁死的心跳,及时重试。
  • 性能调优:根据负载调整 Redis 的 maxmemory-policy 以及 Bull 的 lockDuration 等高级选项。

14. 完整示例

将上述各部分组合为一个最小可运行示例:

producer.js

const Queue = require('bull');
const q = new Queue('math');

async function main() {
  await q.add({ x: 2, y: 3 });
  await q.add({ x: 10, y: 5 }, { delay: 5000 });
}
main();

worker.js

const Queue = require('bull');
const q = new Queue('math');

q.process(async (job) => {
  const { x, y } = job.data;
  return x + y;
});

q.on('completed', (job, result) => {
  console.log(`Job ${job.id} completed, result: ${result}`);
});

运行:

# 启动 Worker
node worker.js
# 启动 Producer
node producer.js

15. 常见问题排查

现象 可能原因 解决方案
任务一直处于 waiting 状态 Worker 未启动或未注册对应名称的处理器 检查 worker 是否运行并注册了正确的处理器
Job stalled repeatedly 处理器执行超时(超过 lockDuration)或被阻塞 延长 lockDuration,优化处理器,避免同步阻塞代码
Redis 内存持续增长 已完成/失败的任务未清理 使用 removeOnCompleteremoveOnFail 选项或调用 clean()
重复任务不按计划执行 cron 表达式错误或时区未设置 验证 cron 表达式,明确设置 tz 选项

通过以上内容,你已经掌握了 Bull 任务队列的核心用法,可以开始构建健壮的后台作业系统了。结合你的业务场景,灵活运用优先级、并发、重试等机制,能够显著提升应用的稳定性和性能。