编辑
2026-03-16
C#
00

目录

🔥 那个让我凌晨三点爬起来修 Bug 的惨痛教训
😈 传统写法的三宗罪
💡 Outbox 模式:把"发消息"变成"存数据"
运行效果
🏗️ 核心实现:从领域模型到数据库事务
领域层:让订单自己"说话"
关键一步:同一个事务写两张表
后台处理器:可靠投递的最后一公里
🗃️ 数据库表结构:细节决定成败
⚠️ 五个绕不过去的坑
📊 性能表现:实测数据说话
🎯 三句话带走的核心洞察
🚀 下一步学习路线
#分布式系统 #消息队列 #微服务架构 #后端开发`

别再用"先存数据再发事件"了!C# Outbox 模式实战避坑指南

🔥 那个让我凌晨三点爬起来修 Bug 的惨痛教训

说一个真实场景。

某电商平台,下单流程是这样的:订单写进数据库,然后发一条消息通知仓储系统备货。看起来天衣无缝,对吧?

然后某天夜里,消息队列抖了一下。订单数据写进去了,消息没发出去。仓储那边压根不知道有新订单,货没备,用户投诉雪片一样飞来。

我那天凌晨三点接到电话,脑子里第一个念头就是:这个 bug,从架构层面就注定会出现。

这不是某个程序员写错了代码。这是"先存数据,再发事件"这种写法,骨子里就带着的缺陷。

今天咱们就把这个问题彻底讲清楚——用 Transactional Outbox 模式,从根上掐断这类事故。


😈 传统写法的三宗罪

先看看大多数项目里长什么样子:

csharp
// ❌ 危险写法 —— 看起来没问题,实则暗藏杀机 public async Task PlaceOrderAsync(Order order) { await _db.SaveOrderAsync(order); // 第一步:存数据库 await _mq.PublishAsync(order.ToEvent()); // 第二步:发消息 }

就这两行,藏着三个随时能把你坑惨的问题:

第一宗罪:数据存了,消息没发。 第一步成功,第二步网络抖动超时。订单进了库,仓储不知道,下游状态撕裂。

第二宗罪:消息发了,数据没存。 顺序反过来也一样。消息先出去了,数据库写失败回滚,下游收到一个幽灵订单。

第三宗罪:消息重复发。 重试机制触发,同一个事件发了两次,下游扣了两次库存。

这三个问题,本质上是同一件事:两个不同的资源(数据库 + 消息队列)没有纳入同一个事务。CAP 定理告诉我们,分布式系统里这种跨资源的原子性,本来就很难保证。


💡 Outbox 模式:把"发消息"变成"存数据"

核心思路其实挺朴素的——既然数据库事务是可靠的,那就把"发消息"这个动作,也变成一次数据库写入。

写入API │ ├─── INSERT Orders ─┐ │ ├── 同一个数据库事务,要么全成功,要么全失败 └─── INSERT Outbox (事件) ─┘ │ ▼ OutboxProcessor (后台服务) │ ├── SELECT 未处理事件 ├── PublishAsync → 消息队列 └── UPDATE 标记已处理

image.png

订单和事件一起落库,用同一个事务保证原子性。后台有个处理器轮询 Outbox 表,把事件捞出来发到消息队列。这两步之间哪怕系统崩了,重启之后处理器继续从 Outbox 里捞,一条事件都不会丢


运行效果

image.png

image.png

🏗️ 核心实现:从领域模型到数据库事务

领域层:让订单自己"说话"

csharp
// 🎯 Primary Constructor 简化构造函数 public class Order(string id, decimal total) { public string Id { get; } = id; public decimal Total { get; } = total; // 🔄 领域对象负责生成自己的事件(符合 DDD 原则) public OrderPlacedEvent ToPlacedEvent() => new(Id, Total); public OrderCancelledEvent ToCancelledEvent(string reason) => new(Id, reason); } // 📝 所有领域事件的基类(方便统一处理) public abstract record DomainEvent( Guid EventId, DateTime OccurredAt ); public record OrderPlacedEvent( string OrderId, decimal Total ) : DomainEvent(Guid.NewGuid(), DateTime.UtcNow); public record OrderCancelledEvent( string OrderId, string Reason ) : DomainEvent(Guid.NewGuid(), DateTime.UtcNow); public record PaymentProcessedEvent( string OrderId, decimal Amount, string PaymentMethod ) : DomainEvent(Guid.NewGuid(), DateTime.UtcNow);

这里有个设计细节值得注意:事件由领域对象自己生成,而不是在 Service 层手动构造。这样万一事件字段漏填,编译期就能发现。


关键一步:同一个事务写两张表

csharp
public async Task<bool> PlaceOrderAsync(Order order) { using var conn = new SqlConnection(_dbConn); await conn.OpenAsync(); // 🔐 事务是这个模式的灵魂,少了它什么都白搭 using var tx = await conn.BeginTransactionAsync(); try { // 第一笔:写订单主表 await conn.ExecuteAsync(""" INSERT INTO Orders (Id, Total, CreatedAt) VALUES (@Id, @Total, @CreatedAt) """, new { order.Id, order.Total, CreatedAt = DateTime.UtcNow }, tx); // 第二笔:写 Outbox 事件表(关键!和上面同一个 tx) var evt = order.ToPlacedEvent(); await conn.ExecuteAsync(""" INSERT INTO Outbox (Id, Type, Payload, CreatedAt, Processed) VALUES (@Id, @Type, @Payload, @CreatedAt, 0) """, new { Id = Guid.NewGuid(), Type = evt.GetType().Name, Payload = JsonSerializer.Serialize(evt), CreatedAt = DateTime.UtcNow }, tx); // ✅ 两笔一起提交,原子性有保障 await tx.CommitAsync(); return true; } catch (Exception ex) { await tx.RollbackAsync(); _logger.LogError(ex, "订单创建失败: {OrderId}", order.Id); throw new OrderCreationException(order.Id, $"订单创建失败: {ex.Message}", ex); } }

我在项目里见过不少人把 tx 漏传给第二个 ExecuteAsync。看着能跑,实际上第二笔写入游离在事务之外,原子性直接失效。两个 ExecuteAsync 都必须带上同一个 tx,这是这个模式最容易踩的坑。


后台处理器:可靠投递的最后一公里

csharp
// Infrastructure/OutboxProcessor.cs using AppOutBox.Domain; using AppOutBox.Exceptions; using AppOutBox.Interfaces; using Dapper; using Microsoft.Data.SqlClient; using System.Text.Json; namespace AppOutBox.Infrastructure { public class OutboxProcessor : BackgroundService { private readonly IConfiguration _config; private readonly IMessagePublisher _publisher; private readonly ILogger<OutboxProcessor> _logger; private readonly string _dbConn; private static readonly TimeSpan PollingInterval = TimeSpan.FromSeconds(5); private const int BatchSize = 100; private const int MaxRetries = 3; public OutboxProcessor( IConfiguration config, IMessagePublisher publisher, ILogger<OutboxProcessor> logger) { _config = config; _publisher = publisher; _logger = logger; _dbConn = config.GetConnectionString("Default") ?? throw new InvalidOperationException("数据库连接字符串未配置"); } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { _logger.LogInformation("OutboxProcessor 已启动,轮询间隔: {Interval}s", PollingInterval.TotalSeconds); using var timer = new PeriodicTimer(PollingInterval); while (await timer.WaitForNextTickAsync(stoppingToken)) { try { await ProcessOutboxEventsAsync(stoppingToken); } catch (OperationCanceledException) { break; } catch (Exception ex) { _logger.LogError(ex, "Outbox 处理器发生异常,等待下次轮询重试"); } } _logger.LogInformation("OutboxProcessor 已停止"); } private static object DeserializeEvent(string type, string payload) => type switch { nameof(OrderPlacedEvent) => JsonSerializer.Deserialize<OrderPlacedEvent>(payload)!, nameof(OrderCancelledEvent) => JsonSerializer.Deserialize<OrderCancelledEvent>(payload)!, nameof(PaymentProcessedEvent) => JsonSerializer.Deserialize<PaymentProcessedEvent>(payload)!, _ => throw new UnknownEventTypeException(type) }; private async Task ProcessOutboxEventsAsync(CancellationToken ct) { using var conn = new SqlConnection(_dbConn); // ✅ SELECT 列必须与 OutboxRecord 构造函数参数完全对应 var records = await conn.QueryAsync<OutboxRecord>(""" SELECT TOP (@BatchSize) Id, Type, Payload FROM Outbox WHERE Processed = 0 AND RetryCount < @MaxRetries ORDER BY CreatedAt """, new { BatchSize, MaxRetries }); foreach (var record in records) { await ProcessSingleEventAsync(conn, record, ct); } } private async Task ProcessSingleEventAsync( SqlConnection conn, OutboxRecord record, CancellationToken ct) { try { var evt = DeserializeEvent(record.Type, record.Payload); await _publisher.PublishAsync(evt, ct); await conn.ExecuteAsync(""" UPDATE Outbox SET Processed = 1, ProcessedAt = @ProcessedAt WHERE Id = @Id """, new { Id = record.Id, ProcessedAt = DateTime.UtcNow }); _logger.LogInformation("事件发布成功: {EventType} [{EventId}]", record.Type, record.Id); } catch (UnknownEventTypeException ex) { _logger.LogWarning(ex, "跳过未知事件类型: {EventType}", record.Type); await MarkAsFailedAsync(conn, record.Id, ex.Message, skipRetry: true); } catch (Exception ex) { _logger.LogError(ex, "事件发布失败: {EventType} [{EventId}]", record.Type, record.Id); await MarkAsFailedAsync(conn, record.Id, ex.Message); } } private static async Task MarkAsFailedAsync( SqlConnection conn, Guid id, string error, bool skipRetry = false) { var sql = skipRetry ? """ UPDATE Outbox SET RetryCount = @MaxRetries, LastError = @Error, ProcessedAt = @Now WHERE Id = @Id """ : """ UPDATE Outbox SET RetryCount = RetryCount + 1, LastError = @Error WHERE Id = @Id """; await conn.ExecuteAsync(sql, new { Id = id, Error = error, Now = DateTime.UtcNow, MaxRetries }); } } }

🗃️ 数据库表结构:细节决定成败

sql
-- Outbox 核心表,几个字段值得单独说 CREATE TABLE Outbox ( Id UNIQUEIDENTIFIER NOT NULL PRIMARY KEY DEFAULT NEWID(), Type NVARCHAR(200) NOT NULL, Payload NVARCHAR(MAX) NOT NULL, Processed BIT NOT NULL DEFAULT 0, RetryCount INT NOT NULL DEFAULT 0, -- 重试计数,防止毒丸消息 LastError NVARCHAR(MAX) NULL, -- 记录失败原因,排查必备 CreatedAt DATETIME2 NOT NULL DEFAULT GETUTCDATE(), ProcessedAt DATETIME2 NULL, -- 处理器核心查询走这个索引,没有它全表扫描 INDEX IX_Outbox_Unprocessed (Processed, CreatedAt) );

IX_Outbox_Unprocessed 这个索引别省。数据量一上去,没有索引的 WHERE Processed = 0 ORDER BY CreatedAt 是全表扫描,每 5 秒一次,数据库直接哭晕。


⚠️ 五个绕不过去的坑

坑一:Dapper 映射 record 时列数必须严格对应。 SELECT 返回 3 列,record 构造函数就必须 3 个参数。多一个带默认值的参数都不行,直接 InvalidOperationException。老老实实删掉默认值参数,或者改用 class

坑二:ExecuteAsync 漏传事务参数。 两个插入语句必须传同一个 tx,漏传一个原子性就没了,而且不会有任何报错提示。

坑三:消费者没做幂等。 Outbox 模式保证"至少一次投递",不保证"恰好一次"。下游消费者必须自己处理重复消息,通常用事件 ID 做去重。

坑四:毒丸消息不处理。 某条事件格式错了,反序列化永远失败,没有 RetryCount 上限的话,这条消息会永远卡着,后面的事件全堵住。

坑五:PublishTrimmed 别开。 Dapper 和 RabbitMQ.Client 都依赖运行时反射,开了 Trim 会把反射元数据裁掉,运行期各种莫名崩溃。


📊 性能表现:实测数据说话

在 4 核 8G 的测试环境里,BatchSize=100、轮询间隔 5 秒的配置下:

指标传统两阶段写入Outbox 模式
单次下单耗时~12ms~15ms(多一次 Outbox 写)
消息丢失率有(网络抖动时)0%
重复事件率低(无重试时)极低(幂等消费者兜底)
最大事件延迟实时≤ 5s(轮询间隔)

多出来的那 3ms,换来了零丢失。这笔账,值。


🎯 三句话带走的核心洞察

"把发消息变成存数据,用事务的可靠性换取投递的可靠性。"

"Outbox 表是事件的草稿箱,处理器是可靠的邮差。"

"至少一次投递 + 幂等消费,才是分布式事件的完整解。"


🚀 下一步学习路线

掌握了 Outbox 之后,这几个方向可以继续深挖:

  • CDC(Change Data Capture):用 Debezium 监听数据库 binlog,比轮询更实时,Outbox 的进阶形态
  • Saga 模式:多服务间的长事务编排,Outbox 是 Saga 的基础设施
  • 幂等消费者实现:基于 MessageId 的去重表设计,和 Outbox 是绝配
  • MassTransit / NServiceBus:成熟的 .NET 消息框架,内置 Outbox 支持,生产环境可以直接上

💬 聊聊你的经历:你们项目里有没有因为消息和数据不一致出过生产事故?当时是怎么排查的?评论区见。

如果这篇文章帮你理清了思路,转发给你们团队的后端同学——这个坑,能少踩一个是一个。


#C# #分布式系统 #消息队列 #微服务架构 #后端开发

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!