事件驱动与 CQRS 实战:构建反应式系统

FreeGuideOnline 最新 2026-06-18

事件驱动与 CQRS 实战:构建反应式系统

前言

在现代分布式系统中,事件驱动架构CQRS(命令查询职责分离) 是构建高可扩展、高响应性系统的核心模式。本教程将从零开始,带你理解关键概念,并通过一个简化订单系统的实战示例,掌握如何在实际项目中落地这两种模式。适合具备基础后端开发经验、希望进阶架构设计的开发者。


第一章:核心概念扫盲

1.1 什么是事件驱动架构(EDA)

事件驱动架构围绕事件的产生、检测、消费和反应来组织系统。事件是“已发生的事实”,例如 订单已创建支付已完成。系统组件通过事件总线(Message Broker)异步通信,实现松耦合和弹性伸缩。

关键角色

  • 事件生产者:发布事件。
  • 事件消费者:订阅并处理事件。
  • 事件通道:如 RabbitMQ、Kafka、AWS SNS/SQS。

优势

  • 服务独立演进,故障隔离。
  • 天然适合构建实时数据流和反应式系统。
  • 易于扩展,消费者可按需增减。

1.2 CQRS 本质解读

CQRS 将应用的读操作(查询)写操作(命令) 分离到不同的模型中。命令端负责处理业务规则并持久化状态,查询端则维护专为展示优化的只读数据副本。

传统 CRUD 的痛点

  • 用同一个领域模型同时处理复杂验证和多种查询,导致模型臃肿。
  • 读写争抢数据库资源,性能瓶颈明显。

CQRS 如何解决

  • 命令模型:聚焦业务逻辑,确保一致性。
  • 查询模型:可按需建立多种视图(如搜索索引、报表缓存),实现快速读取。
  • 两者可通过事件同步:命令端发布领域事件,查询端订阅事件并更新自己的读模型。

1.3 事件溯源(Event Sourcing)与 CQRS 的关系

事件溯源常与 CQRS 结合,但并非必须。事件溯源将每次状态变更存储为不可变的事件序列,当前状态通过重放事件得到。本教程聚焦于不强制事件溯源的轻量级 CQRS,更易上手。


第二章:架构设计总览

2.1 系统组件图

[ 客户端 ] 
   │
   ├─── 命令 (POST/PUT/DELETE) ──→ [ 命令API ]
   │                                    │
   │                        将命令转化为领域事件
   │                                    │
   │                         ┌──────────▼──────────┐
   │                         │   事件总线 (Kafka)    │
   │                         └──┬───────┬──────────┘
   │                            │       │
   │              ┌─────────────┘       └─────────────┐
   │              ▼                                   ▼
   │    [ 查询模型更新服务 ]                   [ 其他消费者:通知、审计 ]
   │              │                        
   │    更新读数据库 (Elasticsearch / 物化视图)  
   │              │
   └─── 查询 (GET) ──→ [ 查询API ] ──→ 读取优化后的视图

2.2 消息主题设计

  • order-commands:原始命令日志(可选,用于审计)。
  • order-events:领域事件,如 OrderCreated, OrderConfirmed, OrderShipped

第三章:实战准备

3.1 技术选型

  • 语言:Node.js + TypeScript(也可用 Java/Spring Boot 或 Go,思想通用)
  • 消息中间件:Apache Kafka(生产级)或 RabbitMQ,演示用轻量级 BullMQ (基于 Redis)
  • 读存储:PostgreSQL 用于书写侧;Elasticsearch 用于查询侧高性能检索(演示用内存模拟)
  • 依赖库ioredis, bullmq, express, uuid

3.2 项目结构

/src
  /command
    order.command.controller.ts   # 接收HTTP命令
    order.command.handler.ts      # 命令处理逻辑
  /query
    order.query.controller.ts     # 提供HTTP查询
    order.view.updater.ts         # 监听事件更新视图
  /events
    order.event.bus.ts            # 事件发布与订阅封装
  /domain
    order.aggregate.ts            # 订单聚合根(写模型)
  main.ts                         # 启动入口

第四章:写模型实现(命令端)

4.1 定义命令与领域事件

// domain/order.types.ts
export interface CreateOrderCommand {
  orderId: string;
  customerId: string;
  items: { productId: string; quantity: number }[];
}

export interface OrderCreatedEvent {
  type: 'OrderCreated';
  data: {
    orderId: string;
    customerId: string;
    items: { productId: string; quantity: number }[];
    createdAt: Date;
  };
}

4.2 订单聚合根

// domain/order.aggregate.ts
export class OrderAggregate {
  private state: 'draft' | 'confirmed' = 'draft';

  constructor(
    public orderId: string,
    public customerId: string,
    public items: { productId: string; quantity: number }[]
  ) {}

  // 工厂方法:从命令创建并返回事件
  static create(command: CreateOrderCommand): OrderCreatedEvent {
    // 业务规则验证(此处略)
    return {
      type: 'OrderCreated',
      data: {
        orderId: command.orderId,
        customerId: command.customerId,
        items: command.items,
        createdAt: new Date(),
      },
    };
  }
}

4.3 命令处理器与事件发布

// events/order.event.bus.ts
import { Queue, Worker } from 'bullmq';

// 使用 BullMQ 模拟事件总线
const eventQueue = new Queue('order-events');

export async function publishEvent(event: any) {
  await eventQueue.add(event.type, event);
}

// 注册处理器稍后在查询侧实现
export function subscribeToEvents(handler: (event: any) => Promise<void>) {
  const worker = new Worker('order-events', async job => {
    await handler(job.data);
  });
}
// command/order.command.handler.ts
import { OrderAggregate } from '../domain/order.aggregate';
import { publishEvent } from '../events/order.event.bus';
import { v4 as uuid } from 'uuid';
import { CreateOrderCommand } from '../domain/order.types';

export async function handleCreateOrder(cmd: Omit<CreateOrderCommand, 'orderId'>) {
  const orderId = uuid();
  const command: CreateOrderCommand = { ...cmd, orderId };
  const event = OrderAggregate.create(command);
  
  // 此处持久化写模型到 PostgreSQL(示例省略,可调用仓库保存聚合状态)
  await publishEvent(event);
  return orderId;
}

4.4 HTTP 命令接口

// command/order.command.controller.ts
import { Request, Response } from 'express';
import { handleCreateOrder } from './order.command.handler';

export async function createOrder(req: Request, res: Response) {
  try {
    const { customerId, items } = req.body;
    const orderId = await handleCreateOrder({ customerId, items });
    res.status(201).json({ orderId });
  } catch (err) {
    res.status(400).json({ error: err.message });
  }
}

第五章:读模型实现(查询端)

5.1 内存视图存储(生产环境替换为 Elasticsearch)

// query/order.view.store.ts
interface OrderView {
  orderId: string;
  customerId: string;
  itemCount: number;
  status: string;
  createdAt: Date;
}

// 简单内存 Map 模拟读库
const orderViews = new Map<string, OrderView>();

export const saveOrderView = (view: OrderView) => orderViews.set(view.orderId, view);
export const getOrderViewById = (id: string) => orderViews.get(id);
export const getAllOrderViews = () => Array.from(orderViews.values());

5.2 事件驱动的视图更新器

// query/order.view.updater.ts
import { subscribeToEvents } from '../events/order.event.bus';
import { saveOrderView } from './order.view.store';

export function startOrderViewUpdater() {
  subscribeToEvents(async (event) => {
    switch (event.type) {
      case 'OrderCreated': {
        const { orderId, customerId, items, createdAt } = event.data;
        saveOrderView({
          orderId,
          customerId,
          itemCount: items.reduce((sum, i) => sum + i.quantity, 0),
          status: 'CREATED',
          createdAt,
        });
        console.log(`[视图更新] 订单 ${orderId} 已添加`);
        break;
      }
      // 其他事件处理...
    }
  });
}

5.3 查询 API

// query/order.query.controller.ts
import { Request, Response } from 'express';
import { getOrderViewById, getAllOrderViews } from './order.view.store';

export async function getOrder(req: Request, res: Response) {
  const order = getOrderViewById(req.params.id);
  if (!order) return res.status(404).json({ error: 'Not found' });
  res.json(order);
}

export async function listOrders(req: Request, res: Response) {
  const orders = getAllOrderViews();
  res.json(orders);
}

第六章:启动与测试

6.1 服务器组装

// main.ts
import express from 'express';
import { createOrder } from './command/order.command.controller';
import { getOrder, listOrders } from './query/order.query.controller';
import { startOrderViewUpdater } from './query/order.view.updater';

const app = express();
app.use(express.json());

app.post('/orders', createOrder);
app.get('/orders/:id', getOrder);
app.get('/orders', listOrders);

// 启动事件订阅(视图更新器)
startOrderViewUpdater().then(() => {
  console.log('订单视图更新器已启动');
});

app.listen(3000, () => console.log('服务运行在 :3000'));

6.2 功能验证

  1. 启动 Redis(BullMQ 依赖)。
  2. 用 curl 创建订单:
    curl -X POST http://localhost:3000/orders \
      -H "Content-Type: application/json" \
      -d '{"customerId":"cust1","items":[{"productId":"p1","quantity":2}]}'
    
    返回 { "orderId": "xxx-xxx-xxx" }.
  3. 查询所有订单:curl http://localhost:3000/orders 可立即看到视图已包含新订单,证明命令和查询分离,并通过事件最终一致。

第七章:生产注意事项

7.1 最终一致性处理

  • 命令返回成功不代表查询立即可见;客户端需主动轮询或通过 WebSocket 推送更新。
  • 确保事件处理幂等:通过 eventId + 消费者 去重。

7.2 读写数据库选择

  • 写库:关系型数据库保证 ACID。
  • 读库:Elasticsearch、MongoDB、Redis 或物化视图。

7.3 事件顺序与并发

  • 同一聚合的事件需保证顺序(Kafka 分区键根据 orderId 哈希)。
  • 使用乐观锁或版本号处理并发命令。

7.4 监控与追踪

  • 为每个事件附加 correlationIdcausationId,便于链路追踪。

第八章:总结与下一步

你已成功构建了一个事件驱动 CQRS 系统的骨架。核心收获:

  • 命令和查询分离,模型更清晰,性能更优。
  • 事件作为第一公民,系统扩展性极强。

进阶方向

  • 引入事件溯源,实现完整审计与时间旅行。
  • 使用 Kafka Streams 或 Apache Flink 实现实时分析。
  • 将事务发件箱模式(Outbox Pattern)用于可靠的事件发布。

本教程代码仅为演示,生产实现需增加错误处理、重试、死信队列等成熟度。欢迎将这种模式应用于微服务解耦、复杂业务报表、实时仪表盘等场景。