说一个真实场景。
某电商平台,下单流程是这样的:订单写进数据库,然后发一条消息通知仓储系统备货。看起来天衣无缝,对吧?
然后某天夜里,消息队列抖了一下。订单数据写进去了,消息没发出去。仓储那边压根不知道有新订单,货没备,用户投诉雪片一样飞来。
我那天凌晨三点接到电话,脑子里第一个念头就是:这个 bug,从架构层面就注定会出现。
这不是某个程序员写错了代码。这是"先存数据,再发事件"这种写法,骨子里就带着的缺陷。
今天咱们就把这个问题彻底讲清楚——用 Transactional Outbox 模式,从根上掐断这类事故。
先看看大多数项目里长什么样子:
csharp// ❌ 危险写法 —— 看起来没问题,实则暗藏杀机
public async Task PlaceOrderAsync(Order order)
{
await _db.SaveOrderAsync(order); // 第一步:存数据库
await _mq.PublishAsync(order.ToEvent()); // 第二步:发消息
}
就这两行,藏着三个随时能把你坑惨的问题:
第一宗罪:数据存了,消息没发。 第一步成功,第二步网络抖动超时。订单进了库,仓储不知道,下游状态撕裂。
第二宗罪:消息发了,数据没存。 顺序反过来也一样。消息先出去了,数据库写失败回滚,下游收到一个幽灵订单。
第三宗罪:消息重复发。 重试机制触发,同一个事件发了两次,下游扣了两次库存。
这三个问题,本质上是同一件事:两个不同的资源(数据库 + 消息队列)没有纳入同一个事务。CAP 定理告诉我们,分布式系统里这种跨资源的原子性,本来就很难保证。
核心思路其实挺朴素的——既然数据库事务是可靠的,那就把"发消息"这个动作,也变成一次数据库写入。
写入API │ ├─── INSERT Orders ─┐ │ ├── 同一个数据库事务,要么全成功,要么全失败 └─── INSERT Outbox (事件) ─┘ │ ▼ OutboxProcessor (后台服务) │ ├── SELECT 未处理事件 ├── PublishAsync → 消息队列 └── UPDATE 标记已处理

订单和事件一起落库,用同一个事务保证原子性。后台有个处理器轮询 Outbox 表,把事件捞出来发到消息队列。这两步之间哪怕系统崩了,重启之后处理器继续从 Outbox 里捞,一条事件都不会丢。


csharp
// 🎯 Primary Constructor 简化构造函数
public class Order(string id, decimal total)
{
public string Id { get; } = id;
public decimal Total { get; } = total;
// 🔄 领域对象负责生成自己的事件(符合 DDD 原则)
public OrderPlacedEvent ToPlacedEvent() => new(Id, Total);
public OrderCancelledEvent ToCancelledEvent(string reason) => new(Id, reason);
}
// 📝 所有领域事件的基类(方便统一处理)
public abstract record DomainEvent(
Guid EventId,
DateTime OccurredAt
);
public record OrderPlacedEvent(
string OrderId,
decimal Total
) : DomainEvent(Guid.NewGuid(), DateTime.UtcNow);
public record OrderCancelledEvent(
string OrderId,
string Reason
) : DomainEvent(Guid.NewGuid(), DateTime.UtcNow);
public record PaymentProcessedEvent(
string OrderId,
decimal Amount,
string PaymentMethod
) : DomainEvent(Guid.NewGuid(), DateTime.UtcNow);
这里有个设计细节值得注意:事件由领域对象自己生成,而不是在 Service 层手动构造。这样万一事件字段漏填,编译期就能发现。
csharppublic async Task<bool> PlaceOrderAsync(Order order)
{
using var conn = new SqlConnection(_dbConn);
await conn.OpenAsync();
// 🔐 事务是这个模式的灵魂,少了它什么都白搭
using var tx = await conn.BeginTransactionAsync();
try
{
// 第一笔:写订单主表
await conn.ExecuteAsync("""
INSERT INTO Orders (Id, Total, CreatedAt)
VALUES (@Id, @Total, @CreatedAt)
""",
new { order.Id, order.Total, CreatedAt = DateTime.UtcNow },
tx);
// 第二笔:写 Outbox 事件表(关键!和上面同一个 tx)
var evt = order.ToPlacedEvent();
await conn.ExecuteAsync("""
INSERT INTO Outbox (Id, Type, Payload, CreatedAt, Processed)
VALUES (@Id, @Type, @Payload, @CreatedAt, 0)
""",
new
{
Id = Guid.NewGuid(),
Type = evt.GetType().Name,
Payload = JsonSerializer.Serialize(evt),
CreatedAt = DateTime.UtcNow
},
tx);
// ✅ 两笔一起提交,原子性有保障
await tx.CommitAsync();
return true;
}
catch (Exception ex)
{
await tx.RollbackAsync();
_logger.LogError(ex, "订单创建失败: {OrderId}", order.Id);
throw new OrderCreationException(order.Id, $"订单创建失败: {ex.Message}", ex);
}
}
我在项目里见过不少人把 tx 漏传给第二个 ExecuteAsync。看着能跑,实际上第二笔写入游离在事务之外,原子性直接失效。两个 ExecuteAsync 都必须带上同一个 tx,这是这个模式最容易踩的坑。
csharp// Infrastructure/OutboxProcessor.cs
using AppOutBox.Domain;
using AppOutBox.Exceptions;
using AppOutBox.Interfaces;
using Dapper;
using Microsoft.Data.SqlClient;
using System.Text.Json;
namespace AppOutBox.Infrastructure
{
public class OutboxProcessor : BackgroundService
{
private readonly IConfiguration _config;
private readonly IMessagePublisher _publisher;
private readonly ILogger<OutboxProcessor> _logger;
private readonly string _dbConn;
private static readonly TimeSpan PollingInterval = TimeSpan.FromSeconds(5);
private const int BatchSize = 100;
private const int MaxRetries = 3;
public OutboxProcessor(
IConfiguration config,
IMessagePublisher publisher,
ILogger<OutboxProcessor> logger)
{
_config = config;
_publisher = publisher;
_logger = logger;
_dbConn = config.GetConnectionString("Default")
?? throw new InvalidOperationException("数据库连接字符串未配置");
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("OutboxProcessor 已启动,轮询间隔: {Interval}s",
PollingInterval.TotalSeconds);
using var timer = new PeriodicTimer(PollingInterval);
while (await timer.WaitForNextTickAsync(stoppingToken))
{
try
{
await ProcessOutboxEventsAsync(stoppingToken);
}
catch (OperationCanceledException)
{
break;
}
catch (Exception ex)
{
_logger.LogError(ex, "Outbox 处理器发生异常,等待下次轮询重试");
}
}
_logger.LogInformation("OutboxProcessor 已停止");
}
private static object DeserializeEvent(string type, string payload) =>
type switch
{
nameof(OrderPlacedEvent) =>
JsonSerializer.Deserialize<OrderPlacedEvent>(payload)!,
nameof(OrderCancelledEvent) =>
JsonSerializer.Deserialize<OrderCancelledEvent>(payload)!,
nameof(PaymentProcessedEvent) =>
JsonSerializer.Deserialize<PaymentProcessedEvent>(payload)!,
_ => throw new UnknownEventTypeException(type)
};
private async Task ProcessOutboxEventsAsync(CancellationToken ct)
{
using var conn = new SqlConnection(_dbConn);
// ✅ SELECT 列必须与 OutboxRecord 构造函数参数完全对应
var records = await conn.QueryAsync<OutboxRecord>("""
SELECT TOP (@BatchSize) Id, Type, Payload
FROM Outbox
WHERE Processed = 0
AND RetryCount < @MaxRetries
ORDER BY CreatedAt
""", new { BatchSize, MaxRetries });
foreach (var record in records)
{
await ProcessSingleEventAsync(conn, record, ct);
}
}
private async Task ProcessSingleEventAsync(
SqlConnection conn,
OutboxRecord record,
CancellationToken ct)
{
try
{
var evt = DeserializeEvent(record.Type, record.Payload);
await _publisher.PublishAsync(evt, ct);
await conn.ExecuteAsync("""
UPDATE Outbox
SET Processed = 1,
ProcessedAt = @ProcessedAt
WHERE Id = @Id
""", new { Id = record.Id, ProcessedAt = DateTime.UtcNow });
_logger.LogInformation("事件发布成功: {EventType} [{EventId}]",
record.Type, record.Id);
}
catch (UnknownEventTypeException ex)
{
_logger.LogWarning(ex, "跳过未知事件类型: {EventType}", record.Type);
await MarkAsFailedAsync(conn, record.Id, ex.Message, skipRetry: true);
}
catch (Exception ex)
{
_logger.LogError(ex, "事件发布失败: {EventType} [{EventId}]",
record.Type, record.Id);
await MarkAsFailedAsync(conn, record.Id, ex.Message);
}
}
private static async Task MarkAsFailedAsync(
SqlConnection conn,
Guid id,
string error,
bool skipRetry = false)
{
var sql = skipRetry
? """
UPDATE Outbox
SET RetryCount = @MaxRetries,
LastError = @Error,
ProcessedAt = @Now
WHERE Id = @Id
"""
: """
UPDATE Outbox
SET RetryCount = RetryCount + 1,
LastError = @Error
WHERE Id = @Id
""";
await conn.ExecuteAsync(sql, new
{
Id = id,
Error = error,
Now = DateTime.UtcNow,
MaxRetries
});
}
}
}
sql-- Outbox 核心表,几个字段值得单独说
CREATE TABLE Outbox (
Id UNIQUEIDENTIFIER NOT NULL PRIMARY KEY DEFAULT NEWID(),
Type NVARCHAR(200) NOT NULL,
Payload NVARCHAR(MAX) NOT NULL,
Processed BIT NOT NULL DEFAULT 0,
RetryCount INT NOT NULL DEFAULT 0, -- 重试计数,防止毒丸消息
LastError NVARCHAR(MAX) NULL, -- 记录失败原因,排查必备
CreatedAt DATETIME2 NOT NULL DEFAULT GETUTCDATE(),
ProcessedAt DATETIME2 NULL,
-- 处理器核心查询走这个索引,没有它全表扫描
INDEX IX_Outbox_Unprocessed (Processed, CreatedAt)
);
IX_Outbox_Unprocessed 这个索引别省。数据量一上去,没有索引的 WHERE Processed = 0 ORDER BY CreatedAt 是全表扫描,每 5 秒一次,数据库直接哭晕。
坑一:Dapper 映射 record 时列数必须严格对应。 SELECT 返回 3 列,record 构造函数就必须 3 个参数。多一个带默认值的参数都不行,直接 InvalidOperationException。老老实实删掉默认值参数,或者改用 class。
坑二:ExecuteAsync 漏传事务参数。 两个插入语句必须传同一个 tx,漏传一个原子性就没了,而且不会有任何报错提示。
坑三:消费者没做幂等。 Outbox 模式保证"至少一次投递",不保证"恰好一次"。下游消费者必须自己处理重复消息,通常用事件 ID 做去重。
坑四:毒丸消息不处理。 某条事件格式错了,反序列化永远失败,没有 RetryCount 上限的话,这条消息会永远卡着,后面的事件全堵住。
坑五:PublishTrimmed 别开。 Dapper 和 RabbitMQ.Client 都依赖运行时反射,开了 Trim 会把反射元数据裁掉,运行期各种莫名崩溃。
在 4 核 8G 的测试环境里,BatchSize=100、轮询间隔 5 秒的配置下:
| 指标 | 传统两阶段写入 | Outbox 模式 |
|---|---|---|
| 单次下单耗时 | ~12ms | ~15ms(多一次 Outbox 写) |
| 消息丢失率 | 有(网络抖动时) | 0% |
| 重复事件率 | 低(无重试时) | 极低(幂等消费者兜底) |
| 最大事件延迟 | 实时 | ≤ 5s(轮询间隔) |
多出来的那 3ms,换来了零丢失。这笔账,值。
"把发消息变成存数据,用事务的可靠性换取投递的可靠性。"
"Outbox 表是事件的草稿箱,处理器是可靠的邮差。"
"至少一次投递 + 幂等消费,才是分布式事件的完整解。"
掌握了 Outbox 之后,这几个方向可以继续深挖:
MessageId 的去重表设计,和 Outbox 是绝配💬 聊聊你的经历:你们项目里有没有因为消息和数据不一致出过生产事故?当时是怎么排查的?评论区见。
如果这篇文章帮你理清了思路,转发给你们团队的后端同学——这个坑,能少踩一个是一个。
#C# #分布式系统 #消息队列 #微服务架构 #后端开发
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!