来源: 《手把手教你用 .NET Core 搭建高并发、可扩展的 CQRS 与 DDD 架构》
最近有人留言,说讲解一下CQRS和DDD架构,其实DDD有分开讲,在历史文章中有体现,比较好理解。
这几天单独重新回顾一次。
直接开干!
01
第一章:DDD 核心概念与 .NET Core 中的分层架构
1.1 什么是 DDD?
领域驱动设计(Domain-Driven Design)是一种软件开发方法论,强调以业务领域为核心,通过与领域专家的深度合作,构建出能够准确反映业务逻辑的软件模型。其核心思想是将复杂的业务逻辑集中在“领域层”,并通过清晰的边界(如聚合、限界上下文)来管理复杂性。
DDD 适用于业务复杂、规则多变、核心竞争力在于业务逻辑的系统,例如金融、电商、ERP、医疗等系统。
02
1.2 DDD 的核心概念
在 .NET Core 项目中,我们通常会将 DDD 分为以下几个核心概念:
- 实体(Entity)
:具有唯一标识的对象,其状态会随时间变化。例如 Order(订单)、Customer(客户)。 - 值对象(Value Object)
:没有唯一标识,通过属性值来定义的对象。例如 Address(地址)、Money(金额)。 - 聚合(Aggregate)
:一组相关对象的集合,由一个**聚合根(Aggregate Root)**统一管理。聚合根负责维护聚合内部的一致性。例如 Order是聚合根,包含OrderItem等子实体。 - 领域服务(Domain Service)
:当某个操作不属于任何实体或值对象时,使用领域服务。它封装了跨多个实体的业务逻辑。 - 仓储(Repository)
:提供对聚合的持久化访问,屏蔽底层数据访问细节。在 .NET Core 中,通常通过接口定义仓储,由 Entity Framework Core 实现。 - 领域事件(Domain Event)
:表示领域中发生的重要事件,用于解耦和通知。例如 OrderPlacedEvent。 - 限界上下文(Bounded Context)
:一个明确的业务边界,在此边界内,术语、模型和规则具有一致性。在微服务架构中,一个限界上下文通常对应一个微服务。
03
1.3 .NET Core 中的典型分层架构
在 .NET Core 项目中,我们通常采用六边形架构或洋葱架构来实现 DDD。以下是常见的分层结构:
MyApp.Solution├── MyApp.Domain // 领域层:实体、值对象、聚合、领域服务、仓储接口├── MyApp.Application // 应用层:应用服务、DTO、CQRS 命令/查询、中介处理├── MyApp.Infrastructure // 基础设施层:EF Core 实现仓储、事件总线、外部服务调用├── MyApp.WebApi // 表现层:ASP.NET Core Web API,接收请求,返回响应└── MyApp.UnitTests // 单元测试
- Domain 层
:纯业务逻辑,不依赖任何外部框架或基础设施。 - Application 层
:协调领域逻辑与基础设施,定义用例。 - Infrastructure 层
:实现持久化、消息队列、缓存等。 - WebApi 层
:处理 HTTP 请求,调用应用服务。
关键点:依赖关系只能从外向内,即
WebApi → Application → Domain,Infrastructure实现Domain的接口。
04
1.4 实战示例:订单聚合
我们以电商系统中的 Order 聚合为例:
// Domain/Entities/Order.cspublic class Order : AggregateRoot{public Guid CustomerId { get; private set; }public decimal TotalAmount { get; private set; }public OrderStatus Status { get; private set; }private readonly List<OrderItem> _items = new();public IReadOnlyCollection<OrderItem> Items => _items.AsReadOnly();// 工厂方法public static Order Create(Guid customerId, List<OrderItemDto> items){var order = new Order{Id = Guid.NewGuid(),CustomerId = customerId,Status = OrderStatus.Pending};foreach (var item in items){order.AddItem(item.ProductId, item.Quantity, item.UnitPrice);}// 添加领域事件order.AddDomainEvent(new OrderCreatedEvent(order.Id));return order;}private void AddItem(Guid productId, int quantity, decimal unitPrice){// 业务规则校验if (quantity <= 0) throw new DomainException("数量必须大于0");var item = new OrderItem(productId, quantity, unitPrice);_items.Add(item);TotalAmount += item.Amount;}}
// Domain/Entities/OrderItem.cspublic class OrderItem : Entity{public Guid ProductId { get; private set; }public int Quantity { get; private set; }public decimal UnitPrice { get; private set; }public decimal Amount => Quantity * UnitPrice;public OrderItem(Guid productId, int quantity, decimal unitPrice){ProductId = productId;Quantity = quantity;UnitPrice = unitPrice;}}
// Domain/Repositories/IOrderRepository.cspublic interface IOrderRepository : IRepository<Order, Guid>{Task<Order> GetByOrderNumberAsync(string orderNumber);Task<List<Order>> GetByCustomerIdAsync(Guid customerId);}
Order 作为聚合根,封装了创建订单的业务规则,并通过领域事件解耦后续操作。
05
第二章:CQRS 模式详解与在 .NET Core 中的实现
2.1 什么是 CQRS?
CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一种架构模式,其核心思想是将**写操作(命令)和读操作(查询)**分离,使用不同的模型来处理。
- 命令(Command)
:用于修改系统状态的操作,如创建订单、更新用户信息。它不返回数据,只返回操作结果(成功/失败)。 - 查询(Query)
:用于读取数据的操作,如获取订单详情、查询用户列表。它不修改系统状态。
CQRS 的本质:一个系统可以有多个“读模型”,但只有一个“写模型”。
06
2.2 为什么需要 CQRS?
虽然 CRUD 模式简单直接,但在复杂系统中会遇到以下问题:
- 性能瓶颈
:读写共用同一数据库和模型,高并发读或写时互相影响。 - 模型复杂性
:同一个模型既要满足写操作的业务校验,又要满足前端多样化的查询需求,导致模型臃肿。 - 扩展性差
:读写无法独立扩展。 - 数据一致性延迟容忍
:某些场景下,查询数据可以容忍短暂延迟(最终一致性),从而提升读性能。
CQRS 正是为了解决这些问题而生。
07
2.3 CQRS 的基本结构
在 .NET Core 中,CQRS 通常与 MediatR 库结合使用,实现请求的中介模式。
客户端↓Web API Controller↓MediatR (中介者)↙ ↘Command Handler Query Handler↓ ↓Domain Layer (业务逻辑) Read Model (DTO/ViewModel)↓ ↓Write Database (EF Core) Read Database (SQL View / NoSQL / Cache)
- 命令处理器(Command Handler)
:处理写操作,调用领域模型,执行业务逻辑,持久化聚合。 - 查询处理器(Query Handler)
:处理读操作,直接从优化的读模型(如视图、缓存、Elasticsearch)中获取数据,不经过领域层。
08
2.4 在 .NET Core 中集成 MediatR
在 Application 层中安装 MediatR 和 MediatR.Extensions.Microsoft.DependencyInjection 包。
<PackageReference Include="MediatR" Version="12.2.0" /><PackageReference Include="MediatR.Extensions.Microsoft.DependencyInjection" Version="12.1.0" />
Program.cs 中注册 MediatR:
// WebApi/Program.csbuilder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(CreateOrderCommand).Assembly));
09
2.5 实现 CQRS:创建订单命令
我们以“创建订单”为例,展示命令的定义与处理。
// Application/Commands/CreateOrderCommand.cspublic record CreateOrderCommand(Guid CustomerId,List<OrderItemDto> Items) : IRequest<Guid>; // 返回订单ID// DTO 用于传输public record OrderItemDto(Guid ProductId, int Quantity, decimal UnitPrice);
// Application/Commands/CreateOrderCommandHandler.cspublic class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommand, Guid>{private readonly IOrderRepository _orderRepository;private readonly IUnitOfWork _unitOfWork;public CreateOrderCommandHandler(IOrderRepository orderRepository, IUnitOfWork unitOfWork){_orderRepository = orderRepository;_unitOfWork = unitOfWork;}public async Task<Guid> Handle(CreateOrderCommand request, CancellationToken ct){// 1. 使用领域模型创建订单var order = Order.Create(request.CustomerId, request.Items);// 2. 持久化await _orderRepository.AddAsync(order, ct);await _unitOfWork.CommitAsync(ct);// 3. 可选:发布领域事件(用于后续处理,如发送邮件、库存扣减)// 事件处理将在后续章节讲解return order.Id;}}
10
2.6 实现 CQRS:查询订单详情
查询操作不经过领域模型,直接从优化的读模型中获取。
// Application/Queries/GetOrderQuery.cspublic record GetOrderQuery(Guid OrderId) : IRequest<OrderDto>;// Application/Queries/GetOrderQueryHandler.cspublic class GetOrderQueryHandler : IRequestHandler<GetOrderQuery, OrderDto>{private readonly DapperContext _dapperContext; // 使用 Dapper 高效查询public GetOrderQueryHandler(DapperContext dapperContext){_dapperContext = dapperContext;}public async Task<OrderDto> Handle(GetOrderQuery request, CancellationToken ct){const string sql = @"SELECT o.Id, o.OrderNumber, o.TotalAmount, o.Status,c.Name as CustomerName,oi.ProductId, oi.Quantity, oi.UnitPriceFROM Orders oJOIN Customers c ON o.CustomerId = c.IdLEFT JOIN OrderItems oi ON o.Id = oi.OrderIdWHERE o.Id = @OrderId";using var connection = _dapperContext.CreateConnection();var lookup = new Dictionary<Guid, OrderDto>();await connection.QueryAsync<OrderDto, OrderItemDto, OrderDto>(sql,(order, item) =>{if (!lookup.TryGetValue(order.Id, out var existingOrder)){existingOrder = order;existingOrder.Items = new List<OrderItemDto>();lookup.Add(order.Id, existingOrder);}if (item != null) existingOrder.Items.Add(item);return existingOrder;},request,splitOn: "ProductId");return lookup.Values.FirstOrDefault();}}
// DTOspublic record OrderDto(Guid Id,string OrderNumber,decimal TotalAmount,OrderStatus Status,string CustomerName,List<OrderItemDto> Items);
11
12
2.7 控制器中使用 CQRS
// WebApi/Controllers/OrdersController.cs[][]public class OrdersController : ControllerBase{private readonly IMediator _mediator;public OrdersController(IMediator mediator){_mediator = mediator;}[]public async Task<IActionResult> CreateOrder([FromBody] CreateOrderCommand command){var orderId = await _mediator.Send(command);return CreatedAtAction(nameof(GetOrder), new { id = orderId }, orderId);}[]public async Task<IActionResult> GetOrder(Guid id){var query = new GetOrderQuery(id);var order = await _mediator.Send(query);return Ok(order);}}
13
2.8 CQRS 的优势总结
-
✅ 职责分离:写和读逻辑清晰分离。 -
✅ 性能优化:读模型可独立优化(视图、缓存、NoSQL)。 -
✅ 可扩展性:读写数据库可独立部署和扩展。 -
✅ 灵活性:前端查询需求变化不影响写模型。
注意:CQRS 增加了系统复杂性,不是所有项目都需要 CQRS。建议在业务复杂、读写负载差异大的系统中使用。
14
第三章:领域事件与事件溯源(Event Sourcing)
3.1 什么是领域事件?
领域事件(Domain Event)是在领域中发生的重要事情,一旦发生就不可变。它代表了系统状态的改变,例如:
OrderPlacedEvent
(订单已创建) PaymentCompletedEvent
(支付已完成) InventoryDeductedEvent
(库存已扣减)
领域事件是实现业务解耦和最终一致性的关键。
15
3.2 领域事件的价值
- 解耦业务逻辑
:将主流程与后续操作分离。例如,创建订单后自动发送邮件、更新积分,这些都可以通过事件触发,而无需在订单服务中硬编码。 - 实现最终一致性
:在分布式系统中,通过事件通知其他服务更新状态。 - 审计与追溯
:所有事件可持久化,用于审计或重建状态。 - 支持事件溯源
:为更高级的架构模式打下基础。
又到一年毕业季 到了说珍重的时候 总说 时光不老,我们不散 毕业遥遥无期 转眼间就各奔东西 毕业,有着说不完的话题 因为那是懵懂的结束 成熟的开始。
16
3.3 在 .NET Core 中实现领域事件
我们通过 MediatR 来发布和处理领域事件。
17
3.3.1 定义领域事件
// Domain/Events/OrderCreatedEvent.cspublic record OrderCreatedEvent(Guid OrderId) : INotification;
INotification 接口,表示这是一个广播事件,可以有多个处理器。
18
3.3.2 在聚合根中发布事件
// Domain/Entities/Order.cspublic class Order : AggregateRoot{// ... 其他代码public static Order Create(Guid customerId, List<OrderItemDto> items){var order = new Order{Id = Guid.NewGuid(),CustomerId = customerId,Status = OrderStatus.Pending};foreach (var item in items){order.AddItem(item.ProductId, item.Quantity, item.UnitPrice);}// 发布事件order.AddDomainEvent(new OrderCreatedEvent(order.Id));return order;}}
19
3.3.3 在仓储中发布事件
事件的发布通常在事务提交后进行。我们可以在 UnitOfWork 提交后,遍历所有聚合的领域事件并发布。
// Infrastructure/Data/UnitOfWork.cspublic class UnitOfWork : IUnitOfWork{private readonly AppDbContext _context;private readonly IPublisher _mediator; // MediatR 的发布者public UnitOfWork(AppDbContext context, IPublisher mediator){_context = context;_mediator = mediator;}public async Task<bool> CommitAsync(CancellationToken ct){// 1. 保存实体变更var result = await _context.SaveChangesAsync(ct);// 2. 发布所有聚合根的领域事件await PublishDomainEvents(ct);return result > 0;}private async Task PublishDomainEvents(CancellationToken ct){var aggregates = _context.ChangeTracker.Entries<IAggregateRoot>().Where(x => x.Entity.DomainEvents.Any()).Select(x => x.Entity).ToList();foreach (var aggregate in aggregates){var events = aggregate.DomainEvents.ToList();aggregate.ClearDomainEvents(); // 清空已发布的事件foreach (var @event in events){await _mediator.Publish(@event, ct);}}}}
20
3.3.4 事件处理器(Event Handler)
// Application/Handlers/OrderCreatedEventHandler.cspublic class OrderCreatedEventHandler : INotificationHandler<OrderCreatedEvent>{private readonly ILogger<OrderCreatedEventHandler> _logger;private readonly IEmailService _emailService;private readonly IInventoryService _inventoryService; // 可能是 gRPC 或 HTTP 客户端public OrderCreatedEventHandler(ILogger<OrderCreatedEventHandler> logger,IEmailService emailService,IInventoryService inventoryService){_logger = logger;_emailService = emailService;_inventoryService = inventoryService;}public async Task Handle(OrderCreatedEvent notification, CancellationToken ct){_logger.LogInformation("订单 {OrderId} 已创建,正在处理后续操作...", notification.OrderId);// 1. 发送订单确认邮件await _emailService.SendOrderConfirmationAsync(notification.OrderId, ct);// 2. 调用库存服务扣减库存(可能是异步消息)await _inventoryService.DeductInventoryAsync(notification.OrderId, ct);// 3. 更新用户积分(领域服务)// await _pointsService.AddPointsAsync(...);}}
21
3.4 事件溯源(Event Sourcing)
事件溯源是一种更激进的持久化模式:不保存实体的当前状态,而是保存导致状态变化的所有事件。实体的状态是通过重放事件来重建的。
22
3.4.1 事件溯源的核心思想
- 状态是事件的投影
: CurrentState = Apply(Events...) - 事件是唯一真相源
(Source of Truth)
23
3.4.2 事件溯源的结构
Command → [Aggregate] → Events → Event Store → (Replay) → Current State↓Projections → Read Models
24
3.4.3 何时使用事件溯源?
-
需要完整审计日志 -
需要时间旅行(查看历史状态) -
系统由事件驱动,状态变化频繁 -
与 CQRS 天然结合
25
26
3.4.4 简单示例:事件存储
// Domain/Events/OrderPlacedEvent.cspublic record OrderPlacedEvent(Guid OrderId,Guid CustomerId,List<OrderItem> Items,DateTime PlacedAt) : IEvent;// Infrastructure/EventStore/IEventStore.cspublic interface IEventStore{Task SaveEventsAsync<T>(Guid aggregateId, IEnumerable<IEvent> events, int expectedVersion);Task<List<IEvent>> GetEventsAsync(Guid aggregateId);}// 简单实现(实际可用 EventStoreDB、Cosmos DB 等)public class SqlEventStore : IEventStore{public async Task SaveEventsAsync<T>(Guid aggregateId, IEnumerable<IEvent> events, int expectedVersion){// 将事件保存到数据库表// 包含 AggregateId, AggregateType, Version, EventType, Data (JSON)}public async Task<List<IEvent>> GetEventsAsync(Guid aggregateId){// 从数据库加载所有事件并反序列化}}
27
3.5 领域事件 vs 事件溯源
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
28
第四章:CQRS 与 DDD 的深度整合与实战模式
在前几章中,我们分别介绍了 DDD 的分层、CQRS 的分离以及领域事件的使用。本章将聚焦于如何将 CQRS 与 DDD 深度整合,并介绍一些在 .NET Core 项目中常见的实战模式和最佳实践。
29
4.1 CQRS + DDD 的典型数据流
理解完整的请求生命周期是掌握架构的关键:
HTTP Request (WebApi)↓MediatR Request (Command / Query)↙ ↘[Command Handler] [Query Handler]↓ ↓Domain Layer: Read-Optimized:- Load Aggregate - Direct SQL/Dapper- Execute Business Logic - Cache (Redis)- Validate Rules - Elasticsearch- Emit Domain Events - Return DTO↓Persist via Repository & UnitOfWork↓Publish Domain Events → Event Handlers (Async)
关键点:
- 写路径
:必须经过领域模型,确保业务规则被强制执行。 - 读路径
:绕过领域模型,直接访问优化的数据源,追求性能。
30
4.2 实战模式一:异步最终一致性
在分布式系统中,强一致性往往带来性能瓶颈。通过领域事件实现最终一致性是常见做法。
场景:用户下单后,需要扣减库存。
传统做法(同步强一致):
// 在命令处理器中直接调用库存服务await _inventoryService.DeductAsync(orderId); // 失败则订单创建失败
改进做法(异步最终一致):
// OrderCreatedEventHandler.cspublic async Task Handle(OrderCreatedEvent notification, CancellationToken ct){// 发布一个集成事件到消息队列(如 RabbitMQ/Kafka)await _messageBus.PublishAsync(new InventoryDeductionRequestedEvent(OrderId: notification.OrderId,Items: orderItems // 可以从仓储加载), ct);}
库存服务监听 InventoryDeductionRequestedEvent,执行扣减。如果失败,可重试或进入死信队列人工处理。
✅ 优势:订单服务不再依赖库存服务,系统更健壮。 ❌ 代价:短暂时间内数据不一致(订单已创建但库存未扣)。
31
4.3 实战模式二:查询模型的优化策略
CQRS 的查询侧有多种优化方式:
|
|
|
|
|---|---|---|
| 数据库视图 |
|
|
| 独立读库 |
|
|
| 缓存(Redis) |
|
|
| 物化视图/Projection |
|
|
| Elasticsearch |
|
|
示例:使用 Redis 缓存订单详情
// GetOrderQueryHandler.cspublic async Task<OrderDto> Handle(GetOrderQuery request, CancellationToken ct){var cacheKey = $"order:{request.OrderId}";// 先查缓存var cached = await _cache.GetStringAsync(cacheKey, ct);if (!string.IsNullOrEmpty(cached)){return JsonSerializer.Deserialize<OrderDto>(cached);}// 缓存未命中,查数据库var order = await LoadFromDatabase(request.OrderId, ct);// 写入缓存(设置过期时间)await _cache.SetStringAsync(cacheKey,JsonSerializer.Serialize(order),TimeSpan.FromMinutes(10),ct);return order;}
32
4.4 实战模式三:Saga 分布式事务管理
当一个业务操作跨越多个限界上下文(微服务)时,需要用 Saga 模式来管理分布式事务。
场景:下单流程涉及 订单服务 → 支付服务 → 库存服务 → 物流服务。
Saga 流程:
Order Service
: 创建订单(初始状态为 “PendingPayment”) -
发布 OrderCreatedEvent Payment Service
: 监听事件,发起支付 -
支付成功,发布 PaymentCompletedEvent Inventory Service
: 扣减库存,发布 InventoryDeductedEventShipping Service
: 创建发货单
补偿机制:如果任一环节失败,触发补偿事务(Compensating Transaction):
-
支付失败 → 订单取消 -
库存不足 → 支付退款
Saga 可以是编排式(Orchestration)或协同式(Choreography)。DDD 中常用协同式(通过事件驱动)。
33
4.5 实战模式四:限界上下文与微服务划分
DDD 的“限界上下文”是划分微服务的理想依据。
电商系统的限界上下文示例:
|
|
|
|
|---|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
每个上下文可以独立部署为微服务,通过 API 或事件进行通信。
34
4.6 最佳实践总结
- 不要过度设计
:简单 CRUD 场景无需 CQRS 和事件溯源。 - 先做 CQRS,再考虑事件溯源
:事件溯源复杂度高,慎用。 - 领域事件命名
:使用过去时态,如 OrderShippedEvent。 - 事件幂等性
:确保事件处理器可安全重试。 - 监控与重试
:对事件总线、消息队列进行监控,实现失败重试机制。 - 文档化上下文映射
:明确各限界上下文之间的关系(合作关系、防腐层等)。
35
36
第五章:项目结构优化、测试策略与部署考量
在掌握了 DDD 和 CQRS 的核心概念与实战模式后,本章将关注项目的可维护性、可测试性和生产部署的实际考量。一个成功的架构不仅要在设计上合理,更要在工程实践中可持续。
37
5.1 项目结构优化与模块化
随着业务增长,项目可能变得庞大。合理的模块化能提升代码可维护性。
38
5.1.1 按限界上下文组织项目
eShop.Solution├── eShop.Ordering # 订单上下文(微服务)│ ├── Ordering.Domain│ ├── Ordering.Application│ ├── Ordering.Infrastructure│ └── Ordering.WebApi│├── eShop.Payment # 支付上下文│ ├── Payment.Domain│ ├── Payment.Application│ └── ...│├── eShop.SharedKernel # 共享内核(ID、时间、异常基类)├── eShop.EventBus # 事件总线抽象与实现└── eShop.UnitTests # 共享测试基类
39
5.1.2 使用功能切片(Vertical Slice Architecture)
对于单体应用,可采用“功能切片”替代传统分层,减少层间依赖。
// Features/Orders/CreateOrder/├── CreateOrderCommand.cs├── CreateOrderCommandHandler.cs├── CreateOrderValidator.cs // FluentValidation└── CreateOrderResponse.cs// Features/Orders/GetOrder/├── GetOrderQuery.cs├── GetOrderQueryHandler.cs└── GetOrderValidator.cs
40
5.2 测试策略
DDD + CQRS 架构需要分层测试:
|
|
|
|
|
|---|---|---|---|
| 单元测试 |
|
|
|
| 集成测试 |
|
|
|
| 端到端测试 |
|
|
|
| 契约测试 |
|
|
|
41
5.2.1 领域模型单元测试示例
// Ordering.Domain.Tests/OrderTests.cspublic class OrderTests{[]public void CreateOrder_WithValidItems_ShouldSucceed(){// Arrangevar customerId = Guid.NewGuid();var items = new List<OrderItemDto>{new(Guid.NewGuid(), 2, 100m)};// Actvar order = Order.Create(customerId, items);// AssertAssert.Equal(200m, order.TotalAmount);Assert.Single(order.Items);Assert.Contains(order.DomainEvents, e => e is OrderCreatedEvent);}[]public void AddItem_WithInvalidQuantity_ShouldThrow(){// Arrangevar order = Order.Create(Guid.NewGuid(), new List<OrderItemDto>());// Act & AssertAssert.Throws<DomainException>(() =>order.AddItem(Guid.NewGuid(), 0, 100m));}}
42
5.2.2 命令处理器集成测试
// Ordering.Application.Tests/CreateOrderCommandHandlerTests.cspublic class CreateOrderCommandHandlerTests : IClassFixture<DatabaseFixture>{private readonly DatabaseFixture _fixture;public CreateOrderCommandHandlerTests(DatabaseFixture fixture){_fixture = fixture;}[]public async Task Handle_ValidCommand_ShouldPersistOrder(){// Arrangevar handler = _fixture.GetService<IRequestHandler<CreateOrderCommand, Guid>>();var command = new CreateOrderCommand(CustomerId: Guid.NewGuid(),Items: new List<OrderItemDto> { /* ... */ });// Actvar orderId = await handler.Handle(command, CancellationToken.None);// Assertvar order = await _fixture.OrderRepository.GetByIdAsync(orderId);Assert.NotNull(order);Assert.Equal(OrderStatus.Pending, order.Status);}}
Testcontainers 或内存数据库(如 SQLite)隔离测试。
43
5.3 生产部署考量
5.3.1 数据库策略
- 写库
:使用高性能关系型数据库(如 PostgreSQL, SQL Server),确保 ACID。 - 读库:
-
使用物化视图或CQRS 读表,通过事件驱动更新。 -
高频查询使用 Redis 缓存。 -
复杂搜索使用 Elasticsearch。
-
44
45
5.3.2 事件总线选型
|
|
|
|---|---|
In-Memory
MediatR) |
|
| RabbitMQ |
|
| Kafka |
|
| Azure Service Bus |
|
建议:初期使用 RabbitMQ,成熟后根据需求迁移。
46
5.3.3 监控与可观测性
- 日志
:使用 Serilog 结构化日志,记录命令、事件、错误。 - 追踪
:集成 OpenTelemetry,追踪请求链路。 - 指标
:暴露 Prometheus 指标(如命令处理时间、事件发布延迟)。 - 告警
:对事件积压、失败任务设置告警。
47
5.3.4 部署模式
- 蓝绿部署 / 金丝雀发布
:降低发布风险。 - 健康检查
:实现 /health端点,检查数据库、事件总线连接。 - 配置管理
:使用 IConfiguration+ 配置中心(如 Azure App Configuration)。
48
5.4 常见陷阱与规避
- 过度工程
:不是所有项目都需要 CQRS 和事件溯源。从简单开始。 - 事件风暴滥用
:领域事件应代表业务关键决策,而非所有状态变更。 - 事务边界不清晰
:确保 UnitOfWork正确管理事务,避免部分提交。 - 循环依赖
:避免服务间循环发布事件。 - 缺乏文档
:用 上下文映射图(Context Map)记录限界上下文关系。
49
结语
DDD 与 CQRS 是强大的架构工具,但它们不是银弹。成功的关键在于:
- 以业务为核心
:模型必须准确反映领域知识。 - 渐进式演进
:从单体开始,逐步拆分限界上下文。 - 团队共识
:统一术语(通用语言),确保开发、产品、领域专家理解一致。 - 持续重构
:架构随业务发展而演进。
50
第六章:应对分布式挑战——一致性、性能与弹性
当我们将 DDD 和 CQRS 应用于生产级系统,尤其是微服务架构时,会面临一系列分布式系统特有的挑战。本章将聚焦于如何在保持业务清晰的同时,确保系统的数据一致性、高性能和高可用性。
51
6.1 数据一致性:最终一致性 vs 强一致性
在 CQRS 架构中,写模型和读模型是分离的。这意味着,当一个命令成功执行后,查询结果可能不会立即反映最新状态。
问题:在 t2 到 t4 之间,客户端查询可能看到旧数据或“未找到”。
解决方案
- 接受最终一致性:
- 说明
:这是 CQRS 的默认模式。向客户端明确说明数据是“最终一致”的。 - 适用场景
:大多数业务场景(如社交动态、商品列表)可以容忍短暂延迟。
- 说明
- 读写后读取(Read-Your-Writes Consistency):
- 方案 A(简单)
:命令成功后,不立即让客户端查询,而是直接返回完整的创建结果(DTO)。
- 说明
:确保用户能立即看到自己刚刚写入的数据。 - 实现:
- 方案 A(简单)
// Command Handler 返回完整 OrderDtopublic async Task<OrderDto> Handle(CreateOrderCommand request, ...){var order = Order.Create(...);await _repo.AddAsync(order);await _uow.CommitAsync();// 直接返回,避免查询延迟return _mapper.Map<OrderDto>(order);}
-
方案 B(复杂) :在查询时,如果请求的是“自己刚创建的资源”,则回退到从写库查询(牺牲一点性能换取一致性)。 - 会话一致性(Session Consistency):
- 说明
:确保同一个用户在一次会话中看到的数据是单调递增的。 - 实现
:使用时间戳或版本号。查询时带上上次操作的时间戳,只返回该时间戳之后的数据。
- 说明
52
6.2 性能优化:CQRS 的读写优化策略
CQRS 的核心优势之一就是可以独立优化读写路径。
53
6.2.1 写路径优化
- 批量处理
:对于高频写操作(如日志、指标),使用批量提交减少数据库压力。 - 异步持久化
:将事件先写入高速队列(如 Kafka),再由后台消费者持久化到数据库。 - 聚合设计
:避免大聚合。过大的聚合会导致并发冲突(乐观锁失败率高)。合理拆分聚合根。
54
6.2.2 读路径优化(核心)
|
|
|
|
|---|---|---|
| 数据库读写分离 |
|
|
| Redis 缓存 |
|
|
| 物化视图 (Materialized View) |
|
|
| Elasticsearch |
|
|
| CDN |
|
|
示例:用事件驱动更新物化视图
// 当 OrderCreatedEvent 发生时,更新 OrderSummary 表public class OrderCreatedProjection : INotificationHandler<OrderCreatedEvent>{private readonly AppDbContext _readContext; // 专用的读库上下文public async Task Handle(OrderCreatedEvent e, CancellationToken ct){var summary = new OrderSummary{OrderId = e.OrderId,Status = "Pending",CreatedAt = DateTime.UtcNow,// ... 其他预计算字段};await _readContext.OrderSummaries.AddAsync(summary, ct);await _readContext.SaveChangesAsync(ct);}}
55
6.3 弹性与容错:处理失败与重试
在分布式系统中,失败是常态。我们必须设计有弹性的系统。
56
6.3.1 命令处理的幂等性
确保同一个命令被多次处理时,结果一致。
// CreateOrderCommand.cspublic record CreateOrderCommand(Guid CommandId, // 唯一标识Guid CustomerId,List<OrderItemDto> Items) : IRequest<Guid>;// Command Handlerpublic async Task<Guid> Handle(CreateOrderCommand request, CancellationToken ct){// 1. 检查该 CommandId 是否已处理if (await _commandLog.IsProcessedAsync(request.CommandId, ct)){// 返回上次的结果(幂等)return await _commandLog.GetResultAsync<Guid>(request.CommandId, ct);}// 2. 正常处理逻辑var order = Order.Create(request.CustomerId, request.Items);await _repo.AddAsync(order);await _uow.CommitAsync();// 3. 记录命令已处理await _commandLog.LogProcessedAsync(request.CommandId, order.Id, ct);return order.Id;}
57
6.3.2 事件处理的重试机制
领域事件或集成事件发布失败时,必须重试。
- 使用消息队列
:RabbitMQ、Kafka 天然支持消息持久化和重试。 - 死信队列 (DLQ)
:处理多次重试仍失败的消息,便于人工干预。 - 指数退避
:重试间隔逐渐增加,避免雪崩。
// RabbitMQ Consumer 示例try{await HandleEvent(event);await model.BasicAck(deliveryTag, false); // 确认}catch (Exception ex){// 记录日志// 消息将自动重回队列或进入DLQthrow; // 不确认,触发重试}
58
59
6.4 分布式事务与 Saga 模式的进阶
在第五章我们提到了 Saga 模式。这里深入其**编排式(Orchestration)**实现。
场景:订单流程(创建 → 支付 → 发货)
Orchestrator (编排器) 的职责:
-
定义 Saga 的执行流程。 -
发送命令给各个服务。 -
监听各步骤的完成事件。 -
处理失败,触发补偿命令。
优点:流程清晰,易于调试和监控。缺点:编排器可能成为单点。
选择建议:
流程简单、服务少:用协同式(事件驱动)。 流程复杂、需要精确控制:用编排式。
60
6.5 监控、追踪与调试
复杂的分布式系统必须具备强大的可观测性。
- 结构化日志
:使用 Serilog + Seq 或 ELK,记录命令、事件、错误。 - 分布式追踪
:集成 OpenTelemetry,追踪一个请求从 API 到数据库再到其他服务的完整链路。 - 指标监控
:使用 Prometheus + Grafana 监控: -
命令处理延迟 -
事件发布/消费速率 -
数据库连接数 -
缓存命中率
-
- 健康检查
:实现 /health端点,检查数据库、Redis、消息队列等依赖。
61
62
第七章:演进式架构——从单体到微服务的平滑过渡
在前几章中,我们构建了一个基于 DDD 和 CQRS 的强大单体应用。然而,随着业务规模扩大、团队增多,单体架构可能成为瓶颈。本章将探讨如何以领域驱动的方式,将单体应用平滑演进为微服务架构,避免“大爆炸式”重写。
63
7.1 何时拆分微服务?
并非所有系统都需要微服务。以下信号表明可能是时候考虑拆分了:
- 团队协作困难
:多个团队频繁修改同一代码库,导致合并冲突和发布阻塞。 - 技术栈异构需求
:某个功能需要特定技术(如 AI 服务用 Python,核心交易用 .NET)。 - 伸缩性需求差异
:订单服务需要 10 台服务器,而客服系统只需 2 台。 - 发布频率不同
:营销活动需要每日发布,而财务系统每月发布一次。 - DDD 限界上下文清晰
:业务边界明确,上下文映射图已定义。
关键:以业务能力(Bounded Context)为单位拆分,而非技术分层。
64
7.2 演进策略:Strangler Fig 模式
“绞杀者模式”(Strangler Fig Pattern)是一种安全的演进策略:新建的微服务逐步“绞杀”旧的单体功能,直到单体被完全替代。
65
7.2.1 阶段一:识别并隔离限界上下文
在单体应用中,首先通过命名空间或项目来物理隔离不同的限界上下文。
// 单体应用中的模块化
MyApp.Solution
├── MyApp.Ordering // 订单上下文├── MyApp.Customer // 客户上下文├── MyApp.Catalog // 商品目录上下文├── MyApp.SharedKernel
└── MyApp.WebApi // API 网关入口
- 目标:
减少上下文间的耦合,明确依赖方向(如 Ordering依赖Catalog)。 - 实践:
-
使用 InternalsVisibleTo限制程序集访问。 -
通过 领域事件 耦合,而非直接调用服务。
-
66
7.2.2 阶段二:暴露 API 与防腐层(ACL)
为即将拆分的上下文设计稳定的 API,并为外部依赖创建防腐层(Anti-Corruption Layer)。
//Ordering/Infrastructure/CatalogAcl.cs
public class CatalogAcl : ICatalogService{ private readonly HttpClient _client; public async Task<ProductDto> GetProductAsync(GuidproductId) {
// 调用 Catalog 微服务的 HTTP API var response = await _client.GetAsync($"/api/products/{productId}"); // 将外部模型转换为内部模型 var external = await response.Content.ReadFromJsonAsync<ExternalProductDto>(); return _mapper.Map<ProductDto>(external); }}
67
7.2.3 阶段三:逐步迁移流量
- 部署新微服务
:将 Catalog上下文部署为独立的微服务catalog-service。 - 双写或双读:
- 双写
:在单体和新服务中同时写入数据(确保数据同步)。 - 双读
:新服务先从单体数据库读取,验证数据一致性。
- 双写
- 切换流量
:通过 API 网关(如 Ocelot、YARP)将 /api/products/*的流量逐步导向新服务。 - 移除旧代码
:确认新服务稳定后,移除单体中的 Catalog模块和双写逻辑。
68
7.3 微服务间的通信模式
拆分后,服务间通信至关重要。
|
|
|
|
|
|---|---|---|---|
| 同步 HTTP/REST |
|
|
|
| gRPC |
|
|
|
| 异步消息 |
|
|
|
建议:优先使用异步消息保持松耦合,必要时使用同步调用。
69
7.4 数据管理:避免分布式事务
微服务中应尽量避免跨服务的分布式事务(如两阶段提交),因其性能差且复杂。
替代方案:
- Saga 模式
:如第六章所述,用补偿事务管理长流程。 - CQRS + 事件溯源
:通过事件流保证数据一致性。 - API 组合器
:在查询时,由网关或前端服务聚合多个微服务的数据。
70
7.5 运维与治理
微服务带来运维复杂性,需建立配套体系:
- 服务发现
:Consul, Eureka, Kubernetes DNS。 - 配置中心
:Azure App Configuration, Consul KV。 - API 网关
:路由、认证、限流、熔断。 - CI/CD
:每个服务独立的流水线。 - 日志与监控
:集中式日志(ELK)、分布式追踪(Jaeger)、指标(Prometheus)。
71
7.6 案例:电商系统拆分路径
- 第一阶段
:拆分 Catalog(商品目录),因其独立性强、访问量大。 - 第二阶段
:拆分 Ordering(订单),因其业务复杂,需独立伸缩。 - 第三阶段
:拆分 Payment(支付),因其涉及第三方集成和安全要求。 - 第四阶段
: Customer(客户)和Marketing(营销)作为独立服务。
每一步都小步迭代,确保系统始终可用。
72
这种演进式方法最大限度地降低了风险,是大型系统架构演进的推荐实践。
希望这些内容能帮助你在 .NET Core 项目中成功应用 DDD 和 CQRS。如需深入某个主题,请随时提问!
73
Mikel