《手把手教你用 .NET Core 搭建高并发、可扩展的 CQRS 与 DDD 架构》

来源: 《手把手教你用 .NET Core 搭建高并发、可扩展的 CQRS 与 DDD 架构》

趣事:

最近有人留言,说讲解一下CQRS和DDD架构,其实DDD有分开讲,在历史文章中有体现,比较好理解。

这几天单独重新回顾一次。

直接开干!

01

第一章:DDD 核心概念与 .NET Core 中的分层架构

1.1 什么是 DDD?

领域驱动设计(Domain-Driven Design)是一种软件开发方法论,强调以业务领域为核心,通过与领域专家的深度合作,构建出能够准确反映业务逻辑的软件模型。其核心思想是将复杂的业务逻辑集中在“领域层”,并通过清晰的边界(如聚合、限界上下文)来管理复杂性。

DDD 适用于业务复杂、规则多变、核心竞争力在于业务逻辑的系统,例如金融、电商、ERP、医疗等系统。

02

1.2 DDD 的核心概念

在 .NET Core 项目中,我们通常会将 DDD 分为以下几个核心概念:

  • 实体(Entity)
    :具有唯一标识的对象,其状态会随时间变化。例如 Order(订单)、Customer(客户)。
  • 值对象(Value Object)
    :没有唯一标识,通过属性值来定义的对象。例如 Address(地址)、Money(金额)。
  • 聚合(Aggregate)
    :一组相关对象的集合,由一个**聚合根(Aggregate Root)**统一管理。聚合根负责维护聚合内部的一致性。例如 Order 是聚合根,包含 OrderItem 等子实体。
  • 领域服务(Domain Service)
    :当某个操作不属于任何实体或值对象时,使用领域服务。它封装了跨多个实体的业务逻辑。
  • 仓储(Repository)
    :提供对聚合的持久化访问,屏蔽底层数据访问细节。在 .NET Core 中,通常通过接口定义仓储,由 Entity Framework Core 实现。
  • 领域事件(Domain Event)
    :表示领域中发生的重要事件,用于解耦和通知。例如 OrderPlacedEvent
  • 限界上下文(Bounded Context)
    :一个明确的业务边界,在此边界内,术语、模型和规则具有一致性。在微服务架构中,一个限界上下文通常对应一个微服务。

03

1.3 .NET Core 中的典型分层架构

在 .NET Core 项目中,我们通常采用六边形架构洋葱架构来实现 DDD。以下是常见的分层结构:

MyApp.Solution├── MyApp.Domain         // 领域层:实体、值对象、聚合、领域服务、仓储接口├── MyApp.Application    // 应用层:应用服务、DTO、CQRS 命令/查询、中介处理├── MyApp.Infrastructure // 基础设施层:EF Core 实现仓储、事件总线、外部服务调用├── MyApp.WebApi         // 表现层:ASP.NET Core Web API,接收请求,返回响应└── MyApp.UnitTests      // 单元测试
  • Domain 层
    :纯业务逻辑,不依赖任何外部框架或基础设施。
  • Application 层
    :协调领域逻辑与基础设施,定义用例。
  • Infrastructure 层
    :实现持久化、消息队列、缓存等。
  • WebApi 层
    :处理 HTTP 请求,调用应用服务。

关键点:依赖关系只能从外向内,即 WebApi → Application → DomainInfrastructure 实现 Domain 的接口。

04

1.4 实战示例:订单聚合

我们以电商系统中的 Order 聚合为例:

// Domain/Entities/Order.cspublic class Order : AggregateRoot{    public Guid CustomerId { getprivate set; }    public decimal TotalAmount { getprivate set; }    public OrderStatus Status { getprivate set; }    private readonly List<OrderItem> _items = new();    public IReadOnlyCollection<OrderItem> Items => _items.AsReadOnly();    // 工厂方法    public static Order Create(Guid customerId, List<OrderItemDto> items)    {        var order = new Order        {            Id = Guid.NewGuid(),            CustomerId = customerId,            Status = OrderStatus.Pending        };        foreach (var item in items)        {            order.AddItem(item.ProductId, item.Quantity, item.UnitPrice);        }        // 添加领域事件        order.AddDomainEvent(new OrderCreatedEvent(order.Id));
        return order;    }    private void AddItem(Guid productId, int quantity, decimal unitPrice)    {        // 业务规则校验        if (quantity <= 0throw new DomainException("数量必须大于0");        var item = new OrderItem(productId, quantity, unitPrice);        _items.Add(item);        TotalAmount += item.Amount;    }}
// Domain/Entities/OrderItem.cspublic class OrderItem : Entity{    public Guid ProductId { getprivate set; }    public int Quantity { getprivate set; }    public decimal UnitPrice { getprivate set; }    public decimal Amount => Quantity * UnitPrice;    public OrderItem(Guid productId, int quantity, decimal unitPrice)    {        ProductId = productId;        Quantity = quantity;        UnitPrice = unitPrice;    }}
// Domain/Repositories/IOrderRepository.cspublic interface IOrderRepository : IRepository<OrderGuid>{    Task<Order> GetByOrderNumberAsync(string orderNumber);    Task<List<Order>> GetByCustomerIdAsync(Guid customerId);}
这一章我们建立了 DDD 的基础模型和分层结构。Order 作为聚合根,封装了创建订单的业务规则,并通过领域事件解耦后续操作。

05

第二章:CQRS 模式详解与在 .NET Core 中的实现

2.1 什么是 CQRS?

CQRS(Command Query Responsibility Segregation,命令查询职责分离)是一种架构模式,其核心思想是将**写操作(命令)读操作(查询)**分离,使用不同的模型来处理。

  • 命令(Command)
    :用于修改系统状态的操作,如创建订单、更新用户信息。它不返回数据,只返回操作结果(成功/失败)。
  • 查询(Query)
    :用于读取数据的操作,如获取订单详情、查询用户列表。它不修改系统状态。

CQRS 的本质:一个系统可以有多个“读模型”,但只有一个“写模型”。

06

2.2 为什么需要 CQRS?

虽然 CRUD 模式简单直接,但在复杂系统中会遇到以下问题:

  1. 性能瓶颈
    :读写共用同一数据库和模型,高并发读或写时互相影响。
  2. 模型复杂性
    :同一个模型既要满足写操作的业务校验,又要满足前端多样化的查询需求,导致模型臃肿。
  3. 扩展性差
    :读写无法独立扩展。
  4. 数据一致性延迟容忍
    :某些场景下,查询数据可以容忍短暂延迟(最终一致性),从而提升读性能。

CQRS 正是为了解决这些问题而生

07

2.3 CQRS 的基本结构

在 .NET Core 中,CQRS 通常与 MediatR 库结合使用,实现请求的中介模式。

客户端   ↓Web API Controller   ↓MediatR (中介者)   ↙       ↘Command Handler          Query Handler   ↓                       ↓Domain Layer (业务逻辑)     Read Model (DTO/ViewModel)   ↓                       ↓Write Database (EF Core)   Read Database (SQL View / NoSQL / Cache)
  • 命令处理器(Command Handler)
    :处理写操作,调用领域模型,执行业务逻辑,持久化聚合。
  • 查询处理器(Query Handler)
    :处理读操作,直接从优化的读模型(如视图、缓存、Elasticsearch)中获取数据,不经过领域层。

08

2.4 在 .NET Core 中集成 MediatR

在 Application 层中安装 MediatR 和 MediatR.Extensions.Microsoft.DependencyInjection 包。

<PackageReference Include="MediatR" Version="12.2.0" /><PackageReference Include="MediatR.Extensions.Microsoft.DependencyInjection" Version="12.1.0" />
在 Program.cs 中注册 MediatR:

// WebApi/Program.csbuilder.Services.AddMediatR(cfg => cfg.RegisterServicesFromAssembly(typeof(CreateOrderCommand).Assembly));

09

2.5 实现 CQRS:创建订单命令

我们以“创建订单”为例,展示命令的定义与处理。

// Application/Commands/CreateOrderCommand.cspublic record CreateOrderCommand(    Guid CustomerId,    List<OrderItemDto> Items) : IRequest<Guid>; // 返回订单ID// DTO 用于传输public record OrderItemDto(Guid ProductId, int Quantity, decimal UnitPrice);
// Application/Commands/CreateOrderCommandHandler.cspublic class CreateOrderCommandHandler : IRequestHandler<CreateOrderCommandGuid>{    private readonly IOrderRepository _orderRepository;    private readonly IUnitOfWork _unitOfWork;    public CreateOrderCommandHandler(IOrderRepository orderRepository, IUnitOfWork unitOfWork)    {        _orderRepository = orderRepository;        _unitOfWork = unitOfWork;    }    public async Task<Guid> Handle(CreateOrderCommand request, CancellationToken ct)    {        // 1. 使用领域模型创建订单        var order = Order.Create(request.CustomerId, request.Items);        // 2. 持久化        await _orderRepository.AddAsync(order, ct);        await _unitOfWork.CommitAsync(ct);        // 3. 可选:发布领域事件(用于后续处理,如发送邮件、库存扣减)        // 事件处理将在后续章节讲解        return order.Id;    }}

10

2.6 实现 CQRS:查询订单详情

查询操作不经过领域模型,直接从优化的读模型中获取。

// Application/Queries/GetOrderQuery.cspublic record GetOrderQuery(Guid OrderId) : IRequest<OrderDto>;// Application/Queries/GetOrderQueryHandler.cspublic class GetOrderQueryHandler : IRequestHandler<GetOrderQueryOrderDto>{    private readonly DapperContext _dapperContext; // 使用 Dapper 高效查询    public GetOrderQueryHandler(DapperContext dapperContext)    {        _dapperContext = dapperContext;    }    public async Task<OrderDto> Handle(GetOrderQuery request, CancellationToken ct)    {        const string sql = @"            SELECT o.Id, o.OrderNumber, o.TotalAmount, o.Status,                   c.Name as CustomerName,                   oi.ProductId, oi.Quantity, oi.UnitPrice            FROM Orders o            JOIN Customers c ON o.CustomerId = c.Id            LEFT JOIN OrderItems oi ON o.Id = oi.OrderId            WHERE o.Id = @OrderId";        using var connection = _dapperContext.CreateConnection();        var lookup = new Dictionary<Guid, OrderDto>();        await connection.QueryAsync<OrderDto, OrderItemDto, OrderDto>(            sql,            (order, item) =>            {                if (!lookup.TryGetValue(order.Id, out var existingOrder))                {                    existingOrder = order;                    existingOrder.Items = new List<OrderItemDto>();                    lookup.Add(order.Id, existingOrder);                }                if (item != null) existingOrder.Items.Add(item);                return existingOrder;            },            request,            splitOn: "ProductId"        );        return lookup.Values.FirstOrDefault();    }}
// DTOspublic record OrderDto(    Guid Id,    string OrderNumber,    decimal TotalAmount,    OrderStatus Status,    string CustomerName,    List<OrderItemDto> Items);

11

广告

12

2.7 控制器中使用 CQRS

// WebApi/Controllers/OrdersController.cs[ApiController][Route("api/[controller]")]public class OrdersController : ControllerBase{    private readonly IMediator _mediator;    public OrdersController(IMediator mediator)    {        _mediator = mediator;    }    [HttpPost]    public async Task<IActionResult> CreateOrder([FromBody] CreateOrderCommand command)    {        var orderId = await _mediator.Send(command);        return CreatedAtAction(nameof(GetOrder), new { id = orderId }, orderId);    }    [HttpGet("{id}")]    public async Task<IActionResult> GetOrder(Guid id)    {        var query = new GetOrderQuery(id);        var order = await _mediator.Send(query);        return Ok(order);    }}

13

2.8 CQRS 的优势总结

  • ✅ 职责分离:写和读逻辑清晰分离。
  • ✅ 性能优化:读模型可独立优化(视图、缓存、NoSQL)。
  • ✅ 可扩展性:读写数据库可独立部署和扩展。
  • ✅ 灵活性:前端查询需求变化不影响写模型。

注意:CQRS 增加了系统复杂性,不是所有项目都需要 CQRS。建议在业务复杂、读写负载差异大的系统中使用。

14

第三章:领域事件与事件溯源(Event Sourcing)

3.1 什么是领域事件?

领域事件(Domain Event)是在领域中发生的重要事情,一旦发生就不可变。它代表了系统状态的改变,例如:

  • OrderPlacedEvent
    (订单已创建)
  • PaymentCompletedEvent
    (支付已完成)
  • InventoryDeductedEvent
    (库存已扣减)

领域事件是实现业务解耦最终一致性的关键。

15

3.2 领域事件的价值

  1. 解耦业务逻辑
    :将主流程与后续操作分离。例如,创建订单后自动发送邮件、更新积分,这些都可以通过事件触发,而无需在订单服务中硬编码。
  2. 实现最终一致性
    :在分布式系统中,通过事件通知其他服务更新状态。
  3. 审计与追溯
    :所有事件可持久化,用于审计或重建状态。
  4. 支持事件溯源
    :为更高级的架构模式打下基础。

又到一年毕业季  到了说珍重的时候  总说  时光不老,我们不散  毕业遥遥无期 转眼间就各奔东西  毕业,有着说不完的话题  因为那是懵懂的结束  成熟的开始。

16

3.3 在 .NET Core 中实现领域事件

我们通过 MediatR 来发布和处理领域事件。

17

3.3.1 定义领域事件
// Domain/Events/OrderCreatedEvent.cspublic record OrderCreatedEvent(Guid OrderId) : INotification;
实现 INotification 接口,表示这是一个广播事件,可以有多个处理器。

18

3.3.2 在聚合根中发布事件
// Domain/Entities/Order.cspublic class Order : AggregateRoot{    // ... 其他代码    public static Order Create(Guid customerId, List<OrderItemDto> items)    {        var order = new Order        {            Id = Guid.NewGuid(),            CustomerId = customerId,            Status = OrderStatus.Pending        };        foreach (var item in items)        {            order.AddItem(item.ProductId, item.Quantity, item.UnitPrice);        }        // 发布事件        order.AddDomainEvent(new OrderCreatedEvent(order.Id));
        return order;    }}

19

3.3.3 在仓储中发布事件

事件的发布通常在事务提交后进行。我们可以在 UnitOfWork 提交后,遍历所有聚合的领域事件并发布。

// Infrastructure/Data/UnitOfWork.cspublic class UnitOfWork : IUnitOfWork{    private readonly AppDbContext _context;    private readonly IPublisher _mediator; // MediatR 的发布者    public UnitOfWork(AppDbContext context, IPublisher mediator)    {        _context = context;        _mediator = mediator;    }    public async Task<boolCommitAsync(CancellationToken ct)    {        // 1. 保存实体变更        var result = await _context.SaveChangesAsync(ct);        // 2. 发布所有聚合根的领域事件        await PublishDomainEvents(ct);        return result > 0;    }    private async Task PublishDomainEvents(CancellationToken ct)    {        var aggregates = _context.ChangeTracker            .Entries<IAggregateRoot>()            .Where(x => x.Entity.DomainEvents.Any())            .Select(x => x.Entity)            .ToList();        foreach (var aggregate in aggregates)        {            var events = aggregate.DomainEvents.ToList();            aggregate.ClearDomainEvents(); // 清空已发布的事件            foreach (var @event in events)            {                await _mediator.Publish(@event, ct);            }        }    }}

20

3.3.4 事件处理器(Event Handler)
// Application/Handlers/OrderCreatedEventHandler.cspublic class OrderCreatedEventHandler : INotificationHandler<OrderCreatedEvent>{    private readonly ILogger<OrderCreatedEventHandler> _logger;    private readonly IEmailService _emailService;    private readonly IInventoryService _inventoryService; // 可能是 gRPC 或 HTTP 客户端    public OrderCreatedEventHandler(        ILogger<OrderCreatedEventHandler> logger,        IEmailService emailService,        IInventoryService inventoryService)    {        _logger = logger;        _emailService = emailService;        _inventoryService = inventoryService;    }    public async Task Handle(OrderCreatedEvent notification, CancellationToken ct)    {        _logger.LogInformation("订单 {OrderId} 已创建,正在处理后续操作...", notification.OrderId);        // 1. 发送订单确认邮件        await _emailService.SendOrderConfirmationAsync(notification.OrderId, ct);        // 2. 调用库存服务扣减库存(可能是异步消息)        await _inventoryService.DeductInventoryAsync(notification.OrderId, ct);        // 3. 更新用户积分(领域服务)        // await _pointsService.AddPointsAsync(...);    }}
事件处理器可以有多个,每个关注不同的业务。

21

3.4 事件溯源(Event Sourcing)

事件溯源是一种更激进的持久化模式:不保存实体的当前状态,而是保存导致状态变化的所有事件。实体的状态是通过重放事件来重建的。

22

3.4.1 事件溯源的核心思想
  • 状态是事件的投影
    CurrentState = Apply(Events...)
  • 事件是唯一真相源
    (Source of Truth)

23

3.4.2 事件溯源的结构
Command → [Aggregate] → Events → Event Store → (Replay) → Current State                                 ↓                             Projections → Read Models

24

3.4.3 何时使用事件溯源?
  • 需要完整审计日志
  • 需要时间旅行(查看历史状态)
  • 系统由事件驱动,状态变化频繁
  • 与 CQRS 天然结合

25

26

3.4.4 简单示例:事件存储
// Domain/Events/OrderPlacedEvent.cspublic record OrderPlacedEvent(    Guid OrderId,    Guid CustomerId,    List<OrderItem> Items,    DateTime PlacedAt) : IEvent;// Infrastructure/EventStore/IEventStore.cspublic interface IEventStore{    Task SaveEventsAsync<T>(Guid aggregateId, IEnumerable<IEvent> events, int expectedVersion);    Task<List<IEvent>> GetEventsAsync(Guid aggregateId);}// 简单实现(实际可用 EventStoreDB、Cosmos DB 等)public class SqlEventStore : IEventStore{    public async Task SaveEventsAsync<T>(Guid aggregateId, IEnumerable<IEvent> events, int expectedVersion)    {        // 将事件保存到数据库表        // 包含 AggregateId, AggregateType, Version, EventType, Data (JSON)    }    public async Task<List<IEvent>> GetEventsAsync(Guid aggregateId)    {        // 从数据库加载所有事件并反序列化    }}
事件溯源会显著增加复杂性,建议在有明确需求时再引入。

27

3.5 领域事件 vs 事件溯源

特性
领域事件
事件溯源
持久化
事件可选持久化
事件是唯一持久化形式
状态存储
保存当前状态
不保存状态,通过事件重建
复杂度
中等
适用场景
解耦、通知
审计、历史追溯、CQRS 组合

28

第四章:CQRS 与 DDD 的深度整合与实战模式

在前几章中,我们分别介绍了 DDD 的分层、CQRS 的分离以及领域事件的使用。本章将聚焦于如何将 CQRS 与 DDD 深度整合,并介绍一些在 .NET Core 项目中常见的实战模式和最佳实践。

29

4.1 CQRS + DDD 的典型数据流

理解完整的请求生命周期是掌握架构的关键:

HTTP Request (WebApi)       ↓MediatR Request (Command / Query)       ↙                    ↘[Command Handler]        [Query Handler]       ↓                         ↓Domain Layer:              Read-Optimized:- Load Aggregate           - Direct SQL/Dapper- Execute Business Logic   - Cache (Redis)- Validate Rules           - Elasticsearch- Emit Domain Events       - Return DTO       ↓Persist via Repository & UnitOfWork       ↓Publish Domain Events → Event Handlers (Async)

关键点

  • 写路径
    :必须经过领域模型,确保业务规则被强制执行。
  • 读路径
    :绕过领域模型,直接访问优化的数据源,追求性能。

30

4.2 实战模式一:异步最终一致性

在分布式系统中,强一致性往往带来性能瓶颈。通过领域事件实现最终一致性是常见做法。

场景:用户下单后,需要扣减库存。

传统做法(同步强一致)

// 在命令处理器中直接调用库存服务await _inventoryService.DeductAsync(orderId); // 失败则订单创建失败
问题:库存服务不可用会导致订单无法创建。

改进做法(异步最终一致)

// OrderCreatedEventHandler.cspublic async Task Handle(OrderCreatedEvent notification, CancellationToken ct){    // 发布一个集成事件到消息队列(如 RabbitMQ/Kafka)    await _messageBus.PublishAsync(new InventoryDeductionRequestedEvent(        OrderId: notification.OrderId,        Items: orderItems // 可以从仓储加载    ), ct);}

库存服务监听 InventoryDeductionRequestedEvent,执行扣减。如果失败,可重试或进入死信队列人工处理。

✅ 优势:订单服务不再依赖库存服务,系统更健壮。 ❌ 代价:短暂时间内数据不一致(订单已创建但库存未扣)。

31

4.3 实战模式二:查询模型的优化策略

CQRS 的查询侧有多种优化方式:

策略
描述
适用场景
数据库视图
创建 SQL 视图,预计算关联数据
查询结构稳定,数据量不大
独立读库
主库写,从库读(读写分离)
读远多于写
缓存(Redis)
将查询结果缓存
高频访问、低频更新的数据
物化视图/Projection
用事件驱动的方式维护一个专用的读表
复杂聚合查询
Elasticsearch
全文搜索、复杂过滤
商品搜索、日志分析

示例:使用 Redis 缓存订单详情

// GetOrderQueryHandler.cspublic async Task<OrderDto> Handle(GetOrderQuery request, CancellationToken ct){    var cacheKey = $"order:{request.OrderId}";
    // 先查缓存    var cached = await _cache.GetStringAsync(cacheKey, ct);    if (!string.IsNullOrEmpty(cached))    {        return JsonSerializer.Deserialize<OrderDto>(cached);    }    // 缓存未命中,查数据库    var order = await LoadFromDatabase(request.OrderId, ct);
    // 写入缓存(设置过期时间)    await _cache.SetStringAsync(        cacheKey,         JsonSerializer.Serialize(order),         TimeSpan.FromMinutes(10),         ct);    return order;}
注意:当订单状态更新时,需清除或更新缓存,保证一致性。

32

4.4 实战模式三:Saga 分布式事务管理

当一个业务操作跨越多个限界上下文(微服务)时,需要用 Saga 模式来管理分布式事务。

场景:下单流程涉及 订单服务 → 支付服务 → 库存服务 → 物流服务。

Saga 流程

  1. Order Service
    : 创建订单(初始状态为 “PendingPayment”)
  2. 发布 OrderCreatedEvent
  3. Payment Service
    : 监听事件,发起支付
  4. 支付成功,发布 PaymentCompletedEvent
  5. Inventory Service
    : 扣减库存,发布 InventoryDeductedEvent
  6. Shipping Service
    : 创建发货单

补偿机制:如果任一环节失败,触发补偿事务(Compensating Transaction):

  • 支付失败 → 订单取消
  • 库存不足 → 支付退款

Saga 可以是编排式(Orchestration)或协同式(Choreography)。DDD 中常用协同式(通过事件驱动)。

33

4.5 实战模式四:限界上下文与微服务划分

DDD 的“限界上下文”是划分微服务的理想依据。

电商系统的限界上下文示例

限界上下文
聚合根
职责
订单上下文
Order, OrderItem
订单生命周期管理
支付上下文
Payment, Refund
支付、退款
库存上下文
ProductStock
库存扣减、回滚
客户上下文
Customer, Address
客户信息管理
营销上下文
Coupon, Promotion
优惠券、促销活动

每个上下文可以独立部署为微服务,通过 API 或事件进行通信。

34

4.6 最佳实践总结

  1. 不要过度设计
    :简单 CRUD 场景无需 CQRS 和事件溯源。
  2. 先做 CQRS,再考虑事件溯源
    :事件溯源复杂度高,慎用。
  3. 领域事件命名
    :使用过去时态,如 OrderShippedEvent
  4. 事件幂等性
    :确保事件处理器可安全重试。
  5. 监控与重试
    :对事件总线、消息队列进行监控,实现失败重试机制。
  6. 文档化上下文映射
    :明确各限界上下文之间的关系(合作关系、防腐层等)。

35

本章小结:我们探讨了 CQRS 与 DDD 在实际项目中的整合方式,包括最终一致性、查询优化、Saga 事务和微服务划分。这些模式帮助我们在保持业务清晰的同时,提升系统的可扩展性和健壮性。

36

第五章:项目结构优化、测试策略与部署考量

在掌握了 DDD 和 CQRS 的核心概念与实战模式后,本章将关注项目的可维护性、可测试性生产部署的实际考量。一个成功的架构不仅要在设计上合理,更要在工程实践中可持续。

37

5.1 项目结构优化与模块化

随着业务增长,项目可能变得庞大。合理的模块化能提升代码可维护性。

38

5.1.1 按限界上下文组织项目
eShop.Solution├── eShop.Ordering                # 订单上下文(微服务)│   ├── Ordering.Domain│   ├── Ordering.Application│   ├── Ordering.Infrastructure│   └── Ordering.WebApi├── eShop.Payment                 # 支付上下文│   ├── Payment.Domain│   ├── Payment.Application│   └── ...├── eShop.SharedKernel            # 共享内核(ID、时间、异常基类)├── eShop.EventBus                # 事件总线抽象与实现└── eShop.UnitTests               # 共享测试基类
优点:每个上下文独立开发、部署、扩展。

39

5.1.2 使用功能切片(Vertical Slice Architecture)

对于单体应用,可采用“功能切片”替代传统分层,减少层间依赖。

// Features/Orders/CreateOrder/├── CreateOrderCommand.cs├── CreateOrderCommandHandler.cs├── CreateOrderValidator.cs      // FluentValidation└── CreateOrderResponse.cs// Features/Orders/GetOrder/├── GetOrderQuery.cs├── GetOrderQueryHandler.cs└── GetOrderValidator.cs
优点:功能高度内聚,新增功能无需跨多层修改。

40

5.2 测试策略

DDD + CQRS 架构需要分层测试:

测试类型
范围
工具
说明
单元测试
领域模型、服务
xUnit, Moq
测试实体行为、业务规则
集成测试
命令/查询处理器
xUnit, TestServer
测试应用层与基础设施集成
端到端测试
API 接口
xUnit, SpecFlow
模拟用户场景
契约测试
微服务接口
Pact
确保服务间兼容

41

5.2.1 领域模型单元测试示例
// Ordering.Domain.Tests/OrderTests.cspublic class OrderTests{    [Fact]    public void CreateOrder_WithValidItems_ShouldSucceed()    {        // Arrange        var customerId = Guid.NewGuid();        var items = new List<OrderItemDto>        {            new(Guid.NewGuid(), 2100m)        };        // Act        var order = Order.Create(customerId, items);        // Assert        Assert.Equal(200m, order.TotalAmount);        Assert.Single(order.Items);        Assert.Contains(order.DomainEvents, e => e is OrderCreatedEvent);    }    [Fact]    public void AddItem_WithInvalidQuantity_ShouldThrow()    {        // Arrange        var order = Order.Create(Guid.NewGuid(), new List<OrderItemDto>());        // Act & Assert        Assert.Throws<DomainException>(() =>             order.AddItem(Guid.NewGuid(), 0100m));    }}

42

5.2.2 命令处理器集成测试
// Ordering.Application.Tests/CreateOrderCommandHandlerTests.cspublic class CreateOrderCommandHandlerTests : IClassFixture<DatabaseFixture>{    private readonly DatabaseFixture _fixture;    public CreateOrderCommandHandlerTests(DatabaseFixture fixture)    {        _fixture = fixture;    }    [Fact]    public async Task Handle_ValidCommand_ShouldPersistOrder()    {        // Arrange        var handler = _fixture.GetService<IRequestHandler<CreateOrderCommand, Guid>>();        var command = new CreateOrderCommand(            CustomerId: Guid.NewGuid(),            Items: new List<OrderItemDto> { /* ... */ }        );        // Act        var orderId = await handler.Handle(command, CancellationToken.None);        // Assert        var order = await _fixture.OrderRepository.GetByIdAsync(orderId);        Assert.NotNull(order);        Assert.Equal(OrderStatus.Pending, order.Status);    }}
DatabaseFixture 使用 Testcontainers 或内存数据库(如 SQLite)隔离测试。

43

5.3 生产部署考量

5.3.1 数据库策略
  • 写库
    :使用高性能关系型数据库(如 PostgreSQL, SQL Server),确保 ACID。
  • 读库:
    • 使用物化视图CQRS 读表,通过事件驱动更新。
    • 高频查询使用 Redis 缓存。
    • 复杂搜索使用 Elasticsearch

44

45

5.3.2 事件总线选型
方案
适用场景
In-Memory

 (MediatR)
单体应用,事件处理器在同一进程
RabbitMQ
微服务,需要可靠传递、重试
Kafka
高吞吐、事件溯源、流处理
Azure Service Bus
Azure 生态

建议:初期使用 RabbitMQ,成熟后根据需求迁移。

46

5.3.3 监控与可观测性
  • 日志
    :使用 Serilog 结构化日志,记录命令、事件、错误。
  • 追踪
    :集成 OpenTelemetry,追踪请求链路。
  • 指标
    :暴露 Prometheus 指标(如命令处理时间、事件发布延迟)。
  • 告警
    :对事件积压、失败任务设置告警。

47

5.3.4 部署模式
  • 蓝绿部署 / 金丝雀发布
    :降低发布风险。
  • 健康检查
    :实现 /health 端点,检查数据库、事件总线连接。
  • 配置管理
    :使用 IConfiguration + 配置中心(如 Azure App Configuration)。

48

5.4 常见陷阱与规避

  1. 过度工程
    :不是所有项目都需要 CQRS 和事件溯源。从简单开始。
  2. 事件风暴滥用
    :领域事件应代表业务关键决策,而非所有状态变更。
  3. 事务边界不清晰
    :确保 UnitOfWork 正确管理事务,避免部分提交。
  4. 循环依赖
    :避免服务间循环发布事件。
  5. 缺乏文档
    :用 上下文映射图(Context Map)记录限界上下文关系。

49

结语

DDD 与 CQRS 是强大的架构工具,但它们不是银弹。成功的关键在于:

  • 以业务为核心
    :模型必须准确反映领域知识。
  • 渐进式演进
    :从单体开始,逐步拆分限界上下文。
  • 团队共识
    :统一术语(通用语言),确保开发、产品、领域专家理解一致。
  • 持续重构
    :架构随业务发展而演进。

50

第六章:应对分布式挑战——一致性、性能与弹性

当我们将 DDD 和 CQRS 应用于生产级系统,尤其是微服务架构时,会面临一系列分布式系统特有的挑战。本章将聚焦于如何在保持业务清晰的同时,确保系统的数据一致性、高性能和高可用性

51

6.1 数据一致性:最终一致性 vs 强一致性

在 CQRS 架构中,写模型读模型是分离的。这意味着,当一个命令成功执行后,查询结果可能不会立即反映最新状态。

图片

问题:在 t2 到 t4 之间,客户端查询可能看到旧数据或“未找到”。

解决方案
  1. 接受最终一致性
    • 说明
      :这是 CQRS 的默认模式。向客户端明确说明数据是“最终一致”的。
    • 适用场景
      :大多数业务场景(如社交动态、商品列表)可以容忍短暂延迟。
  2. 读写后读取(Read-Your-Writes Consistency)
    • 方案 A(简单)
      :命令成功后,不立即让客户端查询,而是直接返回完整的创建结果(DTO)。
    • 说明
      :确保用户能立即看到自己刚刚写入的数据。
    • 实现:
// Command Handler 返回完整 OrderDtopublic async Task<OrderDtoHandle(CreateOrderCommand request, ...){    var order = Order.Create(...);    await _repo.AddAsync(order);    await _uow.CommitAsync();    // 直接返回,避免查询延迟    return _mapper.Map<OrderDto>(order);}
  1. 方案 B(复杂)

    :在查询时,如果请求的是“自己刚创建的资源”,则回退到从写库查询(牺牲一点性能换取一致性)。
  2. 会话一致性(Session Consistency)
    • 说明
      :确保同一个用户在一次会话中看到的数据是单调递增的。
    • 实现
      :使用时间戳版本号。查询时带上上次操作的时间戳,只返回该时间戳之后的数据。

52

6.2 性能优化:CQRS 的读写优化策略

CQRS 的核心优势之一就是可以独立优化读写路径。

53

6.2.1 写路径优化
  • 批量处理
    :对于高频写操作(如日志、指标),使用批量提交减少数据库压力。
  • 异步持久化
    :将事件先写入高速队列(如 Kafka),再由后台消费者持久化到数据库。
  • 聚合设计
    :避免大聚合。过大的聚合会导致并发冲突(乐观锁失败率高)。合理拆分聚合根。

54

6.2.2 读路径优化(核心)
技术
描述
适用场景
数据库读写分离
主库写,多个从库读
读远多于写的场景
Redis 缓存
将热点数据(如商品详情)缓存
高频访问、低频更新
物化视图 (Materialized View)
预计算复杂查询结果,存为物理表
报表、聚合统计
Elasticsearch
全文检索、复杂过滤、高亮
商品搜索、日志分析
CDN
缓存静态资源或 API 响应
全球用户访问

示例:用事件驱动更新物化视图

// 当 OrderCreatedEvent 发生时,更新 OrderSummary 表
public class OrderCreatedProjection : INotificationHandler<OrderCreatedEvent>{    private readonly AppDbContext _readContext; // 专用的读库上下文    public async Task Handle(OrderCreatedEvent e, CancellationToken ct)    {        var summary = new OrderSummary        {            OrderId = e.OrderId,            Status = "Pending",            CreatedAt = DateTime.UtcNow,            // ... 其他预计算字段        };        await _readContext.OrderSummaries.AddAsync(summary, ct);        await _readContext.SaveChangesAsync(ct);    }}
关键:投影(Projection)是幂等的,可以安全重放。

55

6.3 弹性与容错:处理失败与重试

在分布式系统中,失败是常态。我们必须设计有弹性的系统。

56

6.3.1 命令处理的幂等性

确保同一个命令被多次处理时,结果一致。

// CreateOrderCommand.cs
public record CreateOrderCommand(
    Guid CommandId, // 唯一标识
    Guid CustomerId,
    List<OrderItemDto> Items) : IRequest<Guid>;
// Command Handler
public async Task<GuidHandle(CreateOrderCommand request, CancellationToken ct){
    // 1. 检查该 CommandId 是否已处理
    if (await _commandLog.IsProcessedAsync(request.CommandId, ct))    {        // 返回上次的结果(幂等)        return await _commandLog.GetResultAsync<Guid>(request.CommandId, ct);    }    // 2. 正常处理逻辑    var order = Order.Create(request.CustomerId, request.Items);    await _repo.AddAsync(order);    await _uow.CommitAsync();    
// 3. 记录命令已处理
    await _commandLog.LogProcessedAsync(request.CommandId, order.Id, ct);
    return order.Id;}

57

6.3.2 事件处理的重试机制

领域事件或集成事件发布失败时,必须重试。

  • 使用消息队列
    :RabbitMQ、Kafka 天然支持消息持久化和重试。
  • 死信队列 (DLQ)
    :处理多次重试仍失败的消息,便于人工干预。
  • 指数退避
    :重试间隔逐渐增加,避免雪崩。
// RabbitMQ Consumer 示例
try{
    await HandleEvent(event);
    await model.BasicAck(deliveryTag, false);
 // 确认}
catch (Exception ex){
    
// 记录日志    
// 消息将自动重回队列或进入DLQ
    throw// 不确认,触发重试}

58

59

6.4 分布式事务与 Saga 模式的进阶

在第五章我们提到了 Saga 模式。这里深入其**编排式(Orchestration)**实现。

场景:订单流程(创建 → 支付 → 发货)

图片

Orchestrator (编排器) 的职责

  1. 定义 Saga 的执行流程。
  2. 发送命令给各个服务。
  3. 监听各步骤的完成事件。
  4. 处理失败,触发补偿命令。

优点:流程清晰,易于调试和监控。缺点:编排器可能成为单点。

选择建议

  • 流程简单、服务少:用协同式(事件驱动)。
  • 流程复杂、需要精确控制:用编排式

60

6.5 监控、追踪与调试

复杂的分布式系统必须具备强大的可观测性。

  • 结构化日志
    :使用 Serilog + Seq 或 ELK,记录命令、事件、错误。
  • 分布式追踪
    :集成 OpenTelemetry,追踪一个请求从 API 到数据库再到其他服务的完整链路。
  • 指标监控
    :使用 Prometheus + Grafana 监控:
    • 命令处理延迟
    • 事件发布/消费速率
    • 数据库连接数
    • 缓存命中率
  • 健康检查
    :实现 /health 端点,检查数据库、Redis、消息队列等依赖。

61

本章小结:我们探讨了在真实生产环境中,如何应对 CQRS 和 DDD 带来的分布式挑战。核心思想是:接受最终一致性、通过独立优化读写路径提升性能、设计幂等和重试机制保证弹性

62

第七章:演进式架构——从单体到微服务的平滑过渡

在前几章中,我们构建了一个基于 DDD 和 CQRS 的强大单体应用。然而,随着业务规模扩大、团队增多,单体架构可能成为瓶颈。本章将探讨如何以领域驱动的方式,将单体应用平滑演进为微服务架构,避免“大爆炸式”重写。

63

7.1 何时拆分微服务?

并非所有系统都需要微服务。以下信号表明可能是时候考虑拆分了:

  1. 团队协作困难
    :多个团队频繁修改同一代码库,导致合并冲突和发布阻塞。
  2. 技术栈异构需求
    :某个功能需要特定技术(如 AI 服务用 Python,核心交易用 .NET)。
  3. 伸缩性需求差异
    :订单服务需要 10 台服务器,而客服系统只需 2 台。
  4. 发布频率不同
    :营销活动需要每日发布,而财务系统每月发布一次。
  5. DDD 限界上下文清晰
    :业务边界明确,上下文映射图已定义。

关键以业务能力(Bounded Context)为单位拆分,而非技术分层。

64

7.2 演进策略:Strangler Fig 模式

“绞杀者模式”(Strangler Fig Pattern)是一种安全的演进策略:新建的微服务逐步“绞杀”旧的单体功能,直到单体被完全替代

65

7.2.1 阶段一:识别并隔离限界上下文

在单体应用中,首先通过命名空间或项目来物理隔离不同的限界上下文。

// 单体应用中的模块化

MyApp.Solution

├── MyApp.Ordering          // 订单上下文├── MyApp.Customer          // 客户上下文├── MyApp.Catalog           // 商品目录上下文├── MyApp.SharedKernel

└── MyApp.WebApi            // API 网关入口

  • 目标:
    减少上下文间的耦合,明确依赖方向(如 Ordering 依赖 Catalog)。
  • 实践:
    • 使用 InternalsVisibleTo 限制程序集访问。
    • 通过 领域事件 耦合,而非直接调用服务。

66

7.2.2 阶段二:暴露 API 与防腐层(ACL)

为即将拆分的上下文设计稳定的 API,并为外部依赖创建防腐层(Anti-Corruption Layer)

//Ordering/Infrastructure/CatalogAcl.cs

public class CatalogAcl : ICatalogService{    private readonly HttpClient _client;    public async Task<ProductDto> GetProductAsync(GuidproductId)    {

        // 调用 Catalog 微服务的 HTTP API        var response = await _client.GetAsync($"/api/products/{productId}");        // 将外部模型转换为内部模型        var external = await response.Content.ReadFromJsonAsync<ExternalProductDto>();        return _mapper.Map<ProductDto>(external);    }}

防腐层的作用:防止外部服务的模型变化污染内部领域模型。

67

7.2.3 阶段三:逐步迁移流量
  1. 部署新微服务
    :将 Catalog 上下文部署为独立的微服务 catalog-service
  2. 双写或双读:
    • 双写
      :在单体和新服务中同时写入数据(确保数据同步)。
    • 双读
      :新服务先从单体数据库读取,验证数据一致性。
  3. 切换流量
    :通过 API 网关(如 Ocelot、YARP)将 /api/products/* 的流量逐步导向新服务。
  4. 移除旧代码
    :确认新服务稳定后,移除单体中的 Catalog 模块和双写逻辑。
图片

68

7.3 微服务间的通信模式

拆分后,服务间通信至关重要。

模式
描述
工具
适用场景
同步 HTTP/REST
请求-响应模式
HttpClient, Refit
简单查询、强一致性要求
gRPC
高性能 RPC,强类型
gRPC .NET
高频调用、低延迟
异步消息
通过消息队列通信
RabbitMQ, Kafka
解耦、最终一致性、事件驱动

建议:优先使用异步消息保持松耦合,必要时使用同步调用。

69

7.4 数据管理:避免分布式事务

微服务中应尽量避免跨服务的分布式事务(如两阶段提交),因其性能差且复杂。

替代方案

  1. Saga 模式
    :如第六章所述,用补偿事务管理长流程。
  2. CQRS + 事件溯源
    :通过事件流保证数据一致性。
  3. API 组合器
    :在查询时,由网关或前端服务聚合多个微服务的数据。

70

7.5 运维与治理

微服务带来运维复杂性,需建立配套体系:

  • 服务发现
    :Consul, Eureka, Kubernetes DNS。
  • 配置中心
    :Azure App Configuration, Consul KV。
  • API 网关
    :路由、认证、限流、熔断。
  • CI/CD
    :每个服务独立的流水线。
  • 日志与监控
    :集中式日志(ELK)、分布式追踪(Jaeger)、指标(Prometheus)。

71

7.6 案例:电商系统拆分路径

  1. 第一阶段
    :拆分 Catalog(商品目录),因其独立性强、访问量大。
  2. 第二阶段
    :拆分 Ordering(订单),因其业务复杂,需独立伸缩。
  3. 第三阶段
    :拆分 Payment(支付),因其涉及第三方集成和安全要求。
  4. 第四阶段
    Customer(客户)和 Marketing(营销)作为独立服务。

每一步都小步迭代,确保系统始终可用

72

本章小结:我们学习了如何以 Strangler Fig 模式,将一个 DDD + CQRS 的单体应用,安全、可控地演进为微服务架构。关键在于以限界上下文为拆分单元,使用防腐层隔离变化,并通过 API 网关逐步切换流量

这种演进式方法最大限度地降低了风险,是大型系统架构演进的推荐实践。

希望这些内容能帮助你在 .NET Core 项目中成功应用 DDD 和 CQRS。如需深入某个主题,请随时提问!

73

赞(0) 打赏
分享到: 更多 (0)

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏