事件驱动与 CQRS 实战:构建反应式系统
FreeGuideOnline
最新
2026-06-18
事件驱动与 CQRS 实战:构建反应式系统
前言
在现代分布式系统中,事件驱动架构与 CQRS(命令查询职责分离) 是构建高可扩展、高响应性系统的核心模式。本教程将从零开始,带你理解关键概念,并通过一个简化订单系统的实战示例,掌握如何在实际项目中落地这两种模式。适合具备基础后端开发经验、希望进阶架构设计的开发者。
第一章:核心概念扫盲
1.1 什么是事件驱动架构(EDA)
事件驱动架构围绕事件的产生、检测、消费和反应来组织系统。事件是“已发生的事实”,例如 订单已创建、支付已完成。系统组件通过事件总线(Message Broker)异步通信,实现松耦合和弹性伸缩。
关键角色
- 事件生产者:发布事件。
- 事件消费者:订阅并处理事件。
- 事件通道:如 RabbitMQ、Kafka、AWS SNS/SQS。
优势
- 服务独立演进,故障隔离。
- 天然适合构建实时数据流和反应式系统。
- 易于扩展,消费者可按需增减。
1.2 CQRS 本质解读
CQRS 将应用的读操作(查询) 和写操作(命令) 分离到不同的模型中。命令端负责处理业务规则并持久化状态,查询端则维护专为展示优化的只读数据副本。
传统 CRUD 的痛点
- 用同一个领域模型同时处理复杂验证和多种查询,导致模型臃肿。
- 读写争抢数据库资源,性能瓶颈明显。
CQRS 如何解决
- 命令模型:聚焦业务逻辑,确保一致性。
- 查询模型:可按需建立多种视图(如搜索索引、报表缓存),实现快速读取。
- 两者可通过事件同步:命令端发布领域事件,查询端订阅事件并更新自己的读模型。
1.3 事件溯源(Event Sourcing)与 CQRS 的关系
事件溯源常与 CQRS 结合,但并非必须。事件溯源将每次状态变更存储为不可变的事件序列,当前状态通过重放事件得到。本教程聚焦于不强制事件溯源的轻量级 CQRS,更易上手。
第二章:架构设计总览
2.1 系统组件图
[ 客户端 ]
│
├─── 命令 (POST/PUT/DELETE) ──→ [ 命令API ]
│ │
│ 将命令转化为领域事件
│ │
│ ┌──────────▼──────────┐
│ │ 事件总线 (Kafka) │
│ └──┬───────┬──────────┘
│ │ │
│ ┌─────────────┘ └─────────────┐
│ ▼ ▼
│ [ 查询模型更新服务 ] [ 其他消费者:通知、审计 ]
│ │
│ 更新读数据库 (Elasticsearch / 物化视图)
│ │
└─── 查询 (GET) ──→ [ 查询API ] ──→ 读取优化后的视图
2.2 消息主题设计
order-commands:原始命令日志(可选,用于审计)。order-events:领域事件,如OrderCreated,OrderConfirmed,OrderShipped。
第三章:实战准备
3.1 技术选型
- 语言:Node.js + TypeScript(也可用 Java/Spring Boot 或 Go,思想通用)
- 消息中间件:Apache Kafka(生产级)或 RabbitMQ,演示用轻量级 BullMQ (基于 Redis)
- 读存储:PostgreSQL 用于书写侧;Elasticsearch 用于查询侧高性能检索(演示用内存模拟)
- 依赖库:
ioredis,bullmq,express,uuid
3.2 项目结构
/src
/command
order.command.controller.ts # 接收HTTP命令
order.command.handler.ts # 命令处理逻辑
/query
order.query.controller.ts # 提供HTTP查询
order.view.updater.ts # 监听事件更新视图
/events
order.event.bus.ts # 事件发布与订阅封装
/domain
order.aggregate.ts # 订单聚合根(写模型)
main.ts # 启动入口
第四章:写模型实现(命令端)
4.1 定义命令与领域事件
// domain/order.types.ts
export interface CreateOrderCommand {
orderId: string;
customerId: string;
items: { productId: string; quantity: number }[];
}
export interface OrderCreatedEvent {
type: 'OrderCreated';
data: {
orderId: string;
customerId: string;
items: { productId: string; quantity: number }[];
createdAt: Date;
};
}
4.2 订单聚合根
// domain/order.aggregate.ts
export class OrderAggregate {
private state: 'draft' | 'confirmed' = 'draft';
constructor(
public orderId: string,
public customerId: string,
public items: { productId: string; quantity: number }[]
) {}
// 工厂方法:从命令创建并返回事件
static create(command: CreateOrderCommand): OrderCreatedEvent {
// 业务规则验证(此处略)
return {
type: 'OrderCreated',
data: {
orderId: command.orderId,
customerId: command.customerId,
items: command.items,
createdAt: new Date(),
},
};
}
}
4.3 命令处理器与事件发布
// events/order.event.bus.ts
import { Queue, Worker } from 'bullmq';
// 使用 BullMQ 模拟事件总线
const eventQueue = new Queue('order-events');
export async function publishEvent(event: any) {
await eventQueue.add(event.type, event);
}
// 注册处理器稍后在查询侧实现
export function subscribeToEvents(handler: (event: any) => Promise<void>) {
const worker = new Worker('order-events', async job => {
await handler(job.data);
});
}
// command/order.command.handler.ts
import { OrderAggregate } from '../domain/order.aggregate';
import { publishEvent } from '../events/order.event.bus';
import { v4 as uuid } from 'uuid';
import { CreateOrderCommand } from '../domain/order.types';
export async function handleCreateOrder(cmd: Omit<CreateOrderCommand, 'orderId'>) {
const orderId = uuid();
const command: CreateOrderCommand = { ...cmd, orderId };
const event = OrderAggregate.create(command);
// 此处持久化写模型到 PostgreSQL(示例省略,可调用仓库保存聚合状态)
await publishEvent(event);
return orderId;
}
4.4 HTTP 命令接口
// command/order.command.controller.ts
import { Request, Response } from 'express';
import { handleCreateOrder } from './order.command.handler';
export async function createOrder(req: Request, res: Response) {
try {
const { customerId, items } = req.body;
const orderId = await handleCreateOrder({ customerId, items });
res.status(201).json({ orderId });
} catch (err) {
res.status(400).json({ error: err.message });
}
}
第五章:读模型实现(查询端)
5.1 内存视图存储(生产环境替换为 Elasticsearch)
// query/order.view.store.ts
interface OrderView {
orderId: string;
customerId: string;
itemCount: number;
status: string;
createdAt: Date;
}
// 简单内存 Map 模拟读库
const orderViews = new Map<string, OrderView>();
export const saveOrderView = (view: OrderView) => orderViews.set(view.orderId, view);
export const getOrderViewById = (id: string) => orderViews.get(id);
export const getAllOrderViews = () => Array.from(orderViews.values());
5.2 事件驱动的视图更新器
// query/order.view.updater.ts
import { subscribeToEvents } from '../events/order.event.bus';
import { saveOrderView } from './order.view.store';
export function startOrderViewUpdater() {
subscribeToEvents(async (event) => {
switch (event.type) {
case 'OrderCreated': {
const { orderId, customerId, items, createdAt } = event.data;
saveOrderView({
orderId,
customerId,
itemCount: items.reduce((sum, i) => sum + i.quantity, 0),
status: 'CREATED',
createdAt,
});
console.log(`[视图更新] 订单 ${orderId} 已添加`);
break;
}
// 其他事件处理...
}
});
}
5.3 查询 API
// query/order.query.controller.ts
import { Request, Response } from 'express';
import { getOrderViewById, getAllOrderViews } from './order.view.store';
export async function getOrder(req: Request, res: Response) {
const order = getOrderViewById(req.params.id);
if (!order) return res.status(404).json({ error: 'Not found' });
res.json(order);
}
export async function listOrders(req: Request, res: Response) {
const orders = getAllOrderViews();
res.json(orders);
}
第六章:启动与测试
6.1 服务器组装
// main.ts
import express from 'express';
import { createOrder } from './command/order.command.controller';
import { getOrder, listOrders } from './query/order.query.controller';
import { startOrderViewUpdater } from './query/order.view.updater';
const app = express();
app.use(express.json());
app.post('/orders', createOrder);
app.get('/orders/:id', getOrder);
app.get('/orders', listOrders);
// 启动事件订阅(视图更新器)
startOrderViewUpdater().then(() => {
console.log('订单视图更新器已启动');
});
app.listen(3000, () => console.log('服务运行在 :3000'));
6.2 功能验证
- 启动 Redis(BullMQ 依赖)。
- 用 curl 创建订单:
返回curl -X POST http://localhost:3000/orders \ -H "Content-Type: application/json" \ -d '{"customerId":"cust1","items":[{"productId":"p1","quantity":2}]}'{ "orderId": "xxx-xxx-xxx" }. - 查询所有订单:
curl http://localhost:3000/orders可立即看到视图已包含新订单,证明命令和查询分离,并通过事件最终一致。
第七章:生产注意事项
7.1 最终一致性处理
- 命令返回成功不代表查询立即可见;客户端需主动轮询或通过 WebSocket 推送更新。
- 确保事件处理幂等:通过
eventId + 消费者去重。
7.2 读写数据库选择
- 写库:关系型数据库保证 ACID。
- 读库:Elasticsearch、MongoDB、Redis 或物化视图。
7.3 事件顺序与并发
- 同一聚合的事件需保证顺序(Kafka 分区键根据
orderId哈希)。 - 使用乐观锁或版本号处理并发命令。
7.4 监控与追踪
- 为每个事件附加
correlationId和causationId,便于链路追踪。
第八章:总结与下一步
你已成功构建了一个事件驱动 CQRS 系统的骨架。核心收获:
- 命令和查询分离,模型更清晰,性能更优。
- 事件作为第一公民,系统扩展性极强。
进阶方向
- 引入事件溯源,实现完整审计与时间旅行。
- 使用 Kafka Streams 或 Apache Flink 实现实时分析。
- 将事务发件箱模式(Outbox Pattern)用于可靠的事件发布。
本教程代码仅为演示,生产实现需增加错误处理、重试、死信队列等成熟度。欢迎将这种模式应用于微服务解耦、复杂业务报表、实时仪表盘等场景。