你有没有遇到过这样的困境?系统越做越复杂,服务间通信变得千丝万缕,一个小改动就牵一发而动全身。更糟糕的是——当你想按不同维度过滤消息时,传统的队列模式显得力不从心。
想象一下:你负责一个工业物联网平台,需要处理来自全国各地工厂的设备数据。有时候只想监控北方工厂的温度数据,有时候需要收集所有机器人的状态信息,有时候又要分析特定生产线的振动数据...
这就是今天要解决的核心问题:如何在复杂的分布式系统中实现灵活、可扩展的消息路由机制?
直连模式?太简单粗暴。扇出模式?缺乏精细控制。路由模式?只能单维度匹配。
而Topic模式(主题模式)——它就像一个超级智能的邮递员。不仅能按地址送信,还能根据信件类型、紧急程度、收件人属性等多个维度进行精准投递。
text通配符匹配规则: * (星号):匹配一个单词 # (井号):匹配零个或多个单词
想要北方工厂的所有数据?用factory_north.*.*.*.*
只关心温度相关的信息?试试*.*.*.*temperature
需要监控所有传感器设备?来个*.*.*.sensor_*.*
这种灵活性——简直是为复杂业务场景量身定制的!

让我们构建一个真实的工业数据采集系统。这个系统需要处理多层级的工厂结构:工厂.车间.生产线.设备.数据类型
c#public class DeviceData
{
public string Factory { get; set; }
public string Workshop { get; set; }
public string Line { get; set; }
public string Device { get; set; }
public string DataType { get; set; }
public double Value { get; set; }
public string Unit { get; set; }
public DateTime Timestamp { get; set; }
public string Status { get; set; }
public string GetRoutingKey()
{
return $"{Factory}.{Workshop}.{Line}.{Device}.{DataType}";
}
}
这个设计有什么巧妙之处?
c#public class RabbitMQManager : IDisposable
{
private IConnection _connection;
private IChannel _channel;
private readonly string _exchangeName = "industrial_data_exchange";
public async Task InitializeAsync()
{
_connection = await _factory.CreateConnectionAsync();
_channel = await _connection.CreateChannelAsync();
// 关键:声明Topic Exchange
await _channel.ExchangeDeclareAsync(
exchange: _exchangeName,
type: ExchangeType.Topic, // 这里是核心!
durable: true,
autoDelete: false);
}
public async Task PublishDataAsync(DeviceData data)
{
var routingKey = data.GetRoutingKey();
var message = JsonConvert.SerializeObject(data);
var body = Encoding.UTF8.GetBytes(message);
var properties = new BasicProperties
{
Persistent = true, // 消息持久化
ContentType = "application/json",
DeliveryMode = DeliveryModes.Persistent
};
await _channel.BasicPublishAsync(
exchange: _exchangeName,
routingKey: routingKey,
mandatory: false,
basicProperties: properties,
body: body);
}
}
注意这几个关键点:
ExchangeType.Topic:这是实现灵活路由的基础 Persistent = true:确保重要的工业数据不会丢失 c#public async Task<string> StartConsumerAsync(string bindingPattern)
{
var queueName = $"consumer_queue_{Guid.NewGuid():N}";
var queueResult = await _channel.QueueDeclareAsync(
queue: queueName,
durable: true,
exclusive: false,
autoDelete: false);
// 核心:队列与Exchange的绑定
await _channel.QueueBindAsync(
queue: queueResult.QueueName,
exchange: _exchangeName,
routingKey: bindingPattern); // 这里使用通配符模式
// 设置QoS避免消息堆积
await _channel.BasicQosAsync(0, 10, false);
var consumer = new AsyncEventingBasicConsumer(_channel);
consumer.ReceivedAsync += async (model, ea) =>
{
var message = Encoding.UTF8.GetString(ea.Body.ToArray());
var deviceData = DeviceData.FromJson(message);
// 处理业务逻辑...
// 手动确认,确保消息被正确处理
await _channel.BasicAckAsync(ea.DeliveryTag, false);
};
await _channel.BasicConsumeAsync(queueResult.QueueName, false, consumer);
return queueResult.QueueName;
}
为了让这个系统更加直观,我们设计了一个完整的WinForms应用。界面分为四个主要部分:
连接配置页:管理RabbitMQ连接参数
数据发送页:模拟设备数据生成和发送
数据接收页:配置消费者和查看接收到的数据
系统日志页:实时监控系统运行状态
c#
using Timer = System.Threading.Timer;
namespace AppDataCollectionMQ
{
public partial class FrmMain : Form
{
private RabbitMQManager _rabbitMQManager;
private Timer _simulationTimer;
private Random _random = new Random();
private int _sentMessagesCount = 0;
private int _receivedMessagesCount = 0;
private bool _isConnected = false;
private bool _isConsumerRunning = false;
private List<string> _activeConsumers = new List<string>();
// 工业数据结构定义
private readonly Dictionary<string, string[]> _industrialHierarchy = new Dictionary<string, string[]>
{
{ "factories", new[] { "factory_north", "factory_south", "factory_east" } },
{ "factory_north", new[] { "workshop_assembly", "workshop_painting", "workshop_testing" } },
{ "factory_south", new[] { "workshop_welding", "workshop_machining", "workshop_packaging" } },
{ "factory_east", new[] { "workshop_quality", "workshop_logistics", "workshop_maintenance" } },
{ "workshop_assembly", new[] { "line_a1", "line_a2", "line_a3" } },
{ "workshop_painting", new[] { "line_p1", "line_p2" } },
{ "workshop_testing", new[] { "line_t1", "line_t2", "line_t3" } },
{ "workshop_welding", new[] { "line_w1", "line_w2" } },
{ "workshop_machining", new[] { "line_m1", "line_m2", "line_m3" } },
{ "workshop_packaging", new[] { "line_pk1", "line_pk2" } },
{ "workshop_quality", new[] { "line_q1", "line_q2" } },
{ "workshop_logistics", new[] { "line_l1", "line_l2" } },
{ "workshop_maintenance", new[] { "line_mt1", "line_mt2" } }
};
private readonly Dictionary<string, string[]> _deviceTypes = new Dictionary<string, string[]>
{
{ "line_a", new[] { "robot_01", "robot_02", "conveyor_01", "sensor_temp_01" } },
{ "line_p", new[] { "spray_gun_01", "oven_01", "sensor_humidity_01" } },
{ "line_t", new[] { "tester_01", "scanner_01", "sensor_pressure_01" } },
{ "line_w", new[] { "welder_01", "welder_02", "gas_monitor_01" } },
{ "line_m", new[] { "cnc_01", "lathe_01", "mill_01", "sensor_vibration_01" } },
{ "line_pk", new[] { "packer_01", "labeler_01", "scale_01" } },
{ "line_q", new[] { "inspector_01", "camera_01", "gauge_01" } },
{ "line_l", new[] { "agv_01", "crane_01", "sorter_01" } },
{ "line_mt", new[] { "compressor_01", "pump_01", "valve_01" } }
};
private readonly Dictionary<string, string[]> _dataTypes = new Dictionary<string, string[]>
{
{ "robot", new[] { "position", "speed", "load", "status", "error" } },
{ "conveyor", new[] { "speed", "load", "temperature", "vibration" } },
{ "sensor", new[] { "value", "status", "calibration", "alarm" } },
{ "spray_gun", new[] { "pressure", "flow_rate", "temperature", "status" } },
{ "oven", new[] { "temperature", "humidity", "power", "status" } },
{ "welder", new[] { "current", "voltage", "temperature", "arc_time" } },
{ "cnc", new[] { "spindle_speed", "feed_rate", "tool_wear", "vibration" } },
{ "packer", new[] { "pack_count", "speed", "material_level", "status" } },
{ "inspector", new[] { "pass_count", "fail_count", "accuracy", "status" } },
{ "agv", new[] { "battery_level", "position", "speed", "load" } }
};
public FrmMain()
{
InitializeComponent();
}
private void FrmMain_Load(object sender, EventArgs e)
{
InitializeControls();
InitializePredefinedPatterns();
InitializeDataGridView();
}
private void InitializeControls()
{
// 初始化工厂下拉框
cmbFactory.Items.AddRange(_industrialHierarchy["factories"]);
cmbFactory.SelectedIndex = 0;
// 初始化状态下拉框
cmbStatus.SelectedIndex = 0;
// 初始化数据类型
cmbDataType.Items.AddRange(new[] { "temperature", "pressure", "flow_rate", "speed", "position", "status", "vibration", "power" });
cmbDataType.SelectedIndex = 0;
UpdateRoutingKey();
}
private void InitializePredefinedPatterns()
{
var patterns = new[]
{
"*.*.*.*.* # 所有消息",
"factory_north.*.*.*.* # 北厂所有数据",
"factory_south.*.*.*.* # 南厂所有数据",
"*.workshop_assembly.*.*.* # 所有装配车间数据",
"*.workshop_welding.*.*.* # 所有焊接车间数据",
"*.*.*.*temperature # 所有温度数据",
"*.*.*.*pressure # 所有压力数据",
"*.*.*.*status # 所有状态数据",
"factory_north.workshop_*.line_*.robot_*.* # 北厂所有机器人",
"*.*.line_a*.cnc_*.* # 所有A线数控设备",
"factory_*.workshop_quality.*.*.* # 所有质检车间",
"*.*.*.sensor_*.* # 所有传感器数据"
};
lstPredefinedPatterns.Items.AddRange(patterns);
}
private void InitializeDataGridView()
{
dgvReceivedData.Columns.Add("Timestamp", "时间戳");
dgvReceivedData.Columns.Add("RoutingKey", "路由键");
dgvReceivedData.Columns.Add("Factory", "工厂");
dgvReceivedData.Columns.Add("Workshop", "车间");
dgvReceivedData.Columns.Add("Line", "生产线");
dgvReceivedData.Columns.Add("Device", "设备");
dgvReceivedData.Columns.Add("DataType", "数据类型");
dgvReceivedData.Columns.Add("Value", "数值");
dgvReceivedData.Columns.Add("Unit", "单位");
dgvReceivedData.Columns.Add("Status", "状态");
dgvReceivedData.Columns["Timestamp"].Width = 120;
dgvReceivedData.Columns["RoutingKey"].Width = 200;
}
private async void btnConnect_Click(object sender, EventArgs e)
{
try
{
btnConnect.Enabled = false;
_rabbitMQManager = new RabbitMQManager(
txtHost.Text,
(int)nudPort.Value,
txtUsername.Text,
txtPassword.Text);
_rabbitMQManager.LogMessage += OnLogMessage;
_rabbitMQManager.MessageReceived += OnMessageReceived;
await _rabbitMQManager.InitializeAsync();
_isConnected = true;
lblConnectionStatus.Text = "已连接";
lblConnectionStatus.ForeColor = Color.Green;
lblStatusConnection.Text = "连接状态: 已连接";
btnConnect.Enabled = false;
btnDisconnect.Enabled = true;
// 启用其他控件
EnablePublisherControls(true);
EnableConsumerControls(true);
LogMessage("RabbitMQ连接成功建立", Color.Green);
}
catch (Exception ex)
{
LogMessage($"连接失败: {ex.Message}", Color.Red);
btnConnect.Enabled = true;
lblConnectionStatus.Text = "连接失败";
lblConnectionStatus.ForeColor = Color.Red;
}
}
private void btnDisconnect_Click(object sender, EventArgs e)
{
DisconnectRabbitMQ();
}
private void DisconnectRabbitMQ()
{
try
{
if (_simulationTimer != null)
{
_simulationTimer.Change(Timeout.Infinite, Timeout.Infinite);
_simulationTimer.Dispose();
_simulationTimer = null;
btnStartSimulation.Enabled = true;
btnStopSimulation.Enabled = false;
}
_rabbitMQManager?.Dispose();
_rabbitMQManager = null;
_isConnected = false;
_isConsumerRunning = false;
_activeConsumers.Clear();
lblConnectionStatus.Text = "未连接";
lblConnectionStatus.ForeColor = Color.Red;
lblStatusConnection.Text = "连接状态: 未连接";
btnConnect.Enabled = true;
btnDisconnect.Enabled = false;
EnablePublisherControls(false);
EnableConsumerControls(false);
LogMessage("RabbitMQ连接已断开", Color.Orange);
}
catch (Exception ex)
{
LogMessage($"断开连接时发生错误: {ex.Message}", Color.Red);
}
}
private void EnablePublisherControls(bool enabled)
{
btnSendData.Enabled = enabled;
btnStartSimulation.Enabled = enabled;
}
private void EnableConsumerControls(bool enabled)
{
btnStartConsumer.Enabled = enabled && !_isConsumerRunning;
btnStopConsumer.Enabled = enabled && _isConsumerRunning;
}
private void cmb_SelectedIndexChanged(object sender, EventArgs e)
{
var comboBox = sender as ComboBox;
if (comboBox == cmbFactory)
{
// 更新车间列表
var selectedFactory = cmbFactory.SelectedItem?.ToString();
if (!string.IsNullOrEmpty(selectedFactory) && _industrialHierarchy.ContainsKey(selectedFactory))
{
cmbWorkshop.Items.Clear();
cmbWorkshop.Items.AddRange(_industrialHierarchy[selectedFactory]);
cmbWorkshop.SelectedIndex = 0;
}
}
else if (comboBox == cmbWorkshop)
{
// 更新生产线列表
var selectedWorkshop = cmbWorkshop.SelectedItem?.ToString();
if (!string.IsNullOrEmpty(selectedWorkshop) && _industrialHierarchy.ContainsKey(selectedWorkshop))
{
cmbLine.Items.Clear();
cmbLine.Items.AddRange(_industrialHierarchy[selectedWorkshop]);
cmbLine.SelectedIndex = 0;
}
}
else if (comboBox == cmbLine)
{
// 更新设备列表
var selectedLine = cmbLine.SelectedItem?.ToString();
if (!string.IsNullOrEmpty(selectedLine))
{
var linePrefix = selectedLine.Substring(0, selectedLine.LastIndexOf('_') + 1);
var deviceKey = _deviceTypes.Keys.FirstOrDefault(k => linePrefix.StartsWith(k));
cmbDevice.Items.Clear();
if (!string.IsNullOrEmpty(deviceKey))
{
cmbDevice.Items.AddRange(_deviceTypes[deviceKey]);
cmbDevice.SelectedIndex = 0;
}
}
}
else if (comboBox == cmbDevice)
{
// 更新数据类型(基于设备类型)
var selectedDevice = cmbDevice.SelectedItem?.ToString();
if (!string.IsNullOrEmpty(selectedDevice))
{
var devicePrefix = selectedDevice.Split('_')[0];
if (_dataTypes.ContainsKey(devicePrefix))
{
cmbDataType.Items.Clear();
cmbDataType.Items.AddRange(_dataTypes[devicePrefix]);
cmbDataType.SelectedIndex = 0;
}
}
}
UpdateRoutingKey();
UpdateUnitAndValue();
}
private void UpdateRoutingKey()
{
if (cmbFactory.SelectedItem != null && cmbWorkshop.SelectedItem != null &&
cmbLine.SelectedItem != null && cmbDevice.SelectedItem != null &&
cmbDataType.SelectedItem != null)
{
var routingKey = $"{cmbFactory.SelectedItem}.{cmbWorkshop.SelectedItem}.{cmbLine.SelectedItem}.{cmbDevice.SelectedItem}.{cmbDataType.SelectedItem}";
txtRoutingKey.Text = routingKey;
}
}
private void UpdateUnitAndValue()
{
var dataType = cmbDataType.SelectedItem?.ToString();
if (!string.IsNullOrEmpty(dataType))
{
switch (dataType.ToLower())
{
case "temperature":
txtUnit.Text = "°C";
txtValue.Text = _random.Next(20, 80).ToString();
break;
case "pressure":
txtUnit.Text = "bar";
txtValue.Text = (_random.NextDouble() * 10).ToString("F2");
break;
case "speed":
txtUnit.Text = "rpm";
txtValue.Text = _random.Next(100, 3000).ToString();
break;
case "position":
txtUnit.Text = "mm";
txtValue.Text = (_random.NextDouble() * 1000).ToString("F1");
break;
case "power":
txtUnit.Text = "kW";
txtValue.Text = (_random.NextDouble() * 50).ToString("F1");
break;
case "vibration":
txtUnit.Text = "mm/s";
txtValue.Text = (_random.NextDouble() * 5).ToString("F2");
break;
case "flow_rate":
txtUnit.Text = "L/min";
txtValue.Text = (_random.NextDouble() * 100).ToString("F1");
break;
default:
txtUnit.Text = "";
txtValue.Text = _random.Next(0, 100).ToString();
break;
}
}
}
private async void btnSendData_Click(object sender, EventArgs e)
{
if (!_isConnected || _rabbitMQManager == null)
{
MessageBox.Show("请先连接到RabbitMQ服务器", "提示", MessageBoxButtons.OK, MessageBoxIcon.Warning);
return;
}
try
{
var deviceData = CreateDeviceData();
await _rabbitMQManager.PublishDataAsync(deviceData);
_sentMessagesCount++;
UpdateMessageCount();
LogMessage($"数据发送成功: {deviceData.GetRoutingKey()}", Color.Blue);
}
catch (Exception ex)
{
LogMessage($"发送数据失败: {ex.Message}", Color.Red);
}
}
private DeviceData CreateDeviceData()
{
return new DeviceData
{
Factory = cmbFactory.SelectedItem?.ToString() ?? "",
Workshop = cmbWorkshop.SelectedItem?.ToString() ?? "",
Line = cmbLine.SelectedItem?.ToString() ?? "",
Device = cmbDevice.SelectedItem?.ToString() ?? "",
DataType = cmbDataType.SelectedItem?.ToString() ?? "",
Value = double.TryParse(txtValue.Text, out var value) ? value : 0,
Unit = txtUnit.Text,
Status = cmbStatus.SelectedItem?.ToString() ?? "",
Timestamp = DateTime.Now
};
}
private async void btnStartSimulation_Click(object sender, EventArgs e)
{
if (!_isConnected || _rabbitMQManager == null)
{
MessageBox.Show("请先连接到RabbitMQ服务器", "提示", MessageBoxButtons.OK, MessageBoxIcon.Warning);
return;
}
btnStartSimulation.Enabled = false;
btnStopSimulation.Enabled = true;
_simulationTimer = new Timer(async _ => await SimulateDeviceData(), null, TimeSpan.Zero, TimeSpan.FromSeconds(2));
LogMessage("数据模拟已启动,每2秒发送一次随机设备数据", Color.Green);
}
private void btnStopSimulation_Click(object sender, EventArgs e)
{
if (_simulationTimer != null)
{
_simulationTimer.Change(Timeout.Infinite, Timeout.Infinite);
_simulationTimer.Dispose();
_simulationTimer = null;
}
btnStartSimulation.Enabled = true;
btnStopSimulation.Enabled = false;
LogMessage("数据模拟已停止", Color.Orange);
}
private async Task SimulateDeviceData()
{
try
{
// 随机选择工厂结构
var factories = _industrialHierarchy["factories"];
var factory = factories[_random.Next(factories.Length)];
var workshops = _industrialHierarchy[factory];
var workshop = workshops[_random.Next(workshops.Length)];
var lines = _industrialHierarchy[workshop];
var line = lines[_random.Next(lines.Length)];
// 从 "line_a1" 提取 "line_a"
var linePrefixMatch = System.Text.RegularExpressions.Regex.Match(line, @"^(line_[a-z]+)");
string deviceKey = null;
if (linePrefixMatch.Success)
{
var lineTypePrefix = linePrefixMatch.Groups[1].Value; // "line_a"
deviceKey = _deviceTypes.Keys.FirstOrDefault(k => k == lineTypePrefix);
}
if (!string.IsNullOrEmpty(deviceKey))
{
var devices = _deviceTypes[deviceKey];
var device = devices[_random.Next(devices.Length)];
var devicePrefix = device.Split('_')[0];
if (_dataTypes.ContainsKey(devicePrefix))
{
var dataTypes = _dataTypes[devicePrefix];
var dataType = dataTypes[_random.Next(dataTypes.Length)];
var deviceData = new DeviceData
{
Factory = factory,
Workshop = workshop,
Line = line,
Device = device,
DataType = dataType,
Value = GenerateRandomValue(dataType),
Unit = GetUnitForDataType(dataType),
Status = GetRandomStatus(),
Timestamp = DateTime.Now
};
await _rabbitMQManager.PublishDataAsync(deviceData);
Invoke(new Action(() =>
{
_sentMessagesCount++;
UpdateMessageCount();
}));
}
}
else
{
// 添加日志来调试
Invoke(new Action(() => LogMessage($"无法找到生产线 {line} 对应的设备类型", Color.Yellow)));
}
}
catch (Exception ex)
{
Invoke(new Action(() => LogMessage($"模拟数据发送失败: {ex.Message}", Color.Red)));
}
}
private double GenerateRandomValue(string dataType)
{
switch (dataType.ToLower())
{
case "temperature": return _random.Next(15, 85) + _random.NextDouble();
case "pressure": return _random.NextDouble() * 15;
case "speed": return _random.Next(50, 4000);
case "position": return _random.NextDouble() * 2000;
case "power": return _random.NextDouble() * 100;
case "vibration": return _random.NextDouble() * 8;
case "flow_rate": return _random.NextDouble() * 150;
case "current": return _random.NextDouble() * 200;
case "voltage": return 200 + _random.NextDouble() * 50;
default: return _random.NextDouble() * 100;
}
}
private string GetUnitForDataType(string dataType)
{
switch (dataType.ToLower())
{
case "temperature": return "°C";
case "pressure": return "bar";
case "speed": return "rpm";
case "position": return "mm";
case "power": return "kW";
case "vibration": return "mm/s";
case "flow_rate": return "L/min";
case "current": return "A";
case "voltage": return "V";
default: return "";
}
}
private string GetRandomStatus()
{
var statuses = new[] { "正常", "正常", "正常", "警告", "故障" }; // 权重倾向正常
return statuses[_random.Next(statuses.Length)];
}
private async void btnStartConsumer_Click(object sender, EventArgs e)
{
if (!_isConnected || _rabbitMQManager == null)
{
MessageBox.Show("请先连接到RabbitMQ服务器", "提示", MessageBoxButtons.OK, MessageBoxIcon.Warning);
return;
}
var bindingPattern = txtBindingPattern.Text.Trim();
if (string.IsNullOrEmpty(bindingPattern))
{
MessageBox.Show("请输入绑定模式", "提示", MessageBoxButtons.OK, MessageBoxIcon.Warning);
return;
}
try
{
var queueName = await _rabbitMQManager.StartConsumerAsync(bindingPattern);
_activeConsumers.Add(queueName);
_isConsumerRunning = true;
btnStartConsumer.Enabled = false;
btnStopConsumer.Enabled = true;
LogMessage($"消费者启动成功,绑定模式: {bindingPattern}", Color.Green);
}
catch (Exception ex)
{
LogMessage($"启动消费者失败: {ex.Message}", Color.Red);
}
}
private void btnStopConsumer_Click(object sender, EventArgs e)
{
// 注意:在实际应用中,你可能需要更复杂的消费者管理机制
_isConsumerRunning = false;
_activeConsumers.Clear();
btnStartConsumer.Enabled = true;
btnStopConsumer.Enabled = false;
LogMessage("消费者已停止", Color.Orange);
}
private void lstPredefinedPatterns_DoubleClick(object sender, EventArgs e)
{
if (lstPredefinedPatterns.SelectedItem != null)
{
var selectedPattern = lstPredefinedPatterns.SelectedItem.ToString();
var pattern = selectedPattern.Split('#')[0].Trim();
txtBindingPattern.Text = pattern;
}
}
private void btnClearLogs_Click(object sender, EventArgs e)
{
rtbLogs.Clear();
}
private void OnLogMessage(string message)
{
Invoke(new Action(() => LogMessage(message, Color.White)));
}
private void OnMessageReceived(string routingKey, DeviceData deviceData)
{
Invoke(new Action(() =>
{
_receivedMessagesCount++;
UpdateMessageCount();
// 添加到DataGridView
var row = new object[]
{
deviceData.Timestamp.ToString("HH:mm:ss.fff"),
routingKey,
deviceData.Factory,
deviceData.Workshop,
deviceData.Line,
deviceData.Device,
deviceData.DataType,
deviceData.Value.ToString("F2"),
deviceData.Unit,
deviceData.Status
};
dgvReceivedData.Rows.Insert(0, row);
// 保持最多1000行
if (dgvReceivedData.Rows.Count > 1000)
{
dgvReceivedData.Rows.RemoveAt(dgvReceivedData.Rows.Count - 1);
}
LogMessage($"接收消息: {routingKey} -> {deviceData.DataType}={deviceData.Value}{deviceData.Unit}", Color.Cyan);
}));
}
private void LogMessage(string message, Color color)
{
if (rtbLogs.InvokeRequired)
{
rtbLogs.Invoke(new Action(() => LogMessage(message, color)));
return;
}
var timestamp = DateTime.Now.ToString("yyyy-MM-dd HH:mm:ss.fff");
var logLine = $"[{timestamp}] {message}\n";
rtbLogs.SelectionStart = rtbLogs.TextLength;
rtbLogs.SelectionLength = 0;
rtbLogs.SelectionColor = color;
rtbLogs.AppendText(logLine);
rtbLogs.ScrollToCaret();
// 保持最多10000行
if (rtbLogs.Lines.Length > 10000)
{
var lines = rtbLogs.Lines.Skip(1000).ToArray();
rtbLogs.Lines = lines;
}
}
private void UpdateMessageCount()
{
lblStatusMessages.Text = $"消息计数: 发送{_sentMessagesCount} 接收{_receivedMessagesCount}";
}
private void FrmMain_FormClosing(object sender, FormClosingEventArgs e)
{
DisconnectRabbitMQ();
}
}
}



text// ❌ 错误做法:每次操作都创建新连接 var factory = new ConnectionFactory(); using var connection = await factory.CreateConnectionAsync(); // ✅ 正确做法:复用连接,多个Channel private IConnection _sharedConnection; private readonly ConcurrentQueue<IChannel> _channelPool;
text// 根据实际情况调整prefetch值 await _channel.BasicQosAsync( prefetchSize: 0, // 不限制消息大小 prefetchCount: 10, // 限制未确认消息数量 global: false); // 仅应用于当前channel
经验法则: prefetchCount = 总往返时间 / 单个消息处理时间
text// 发布者确认模式 await _channel.ConfirmSelectAsync(); await _channel.BasicPublishAsync(/* 参数 */); await _channel.WaitForConfirmsAsync(TimeSpan.FromSeconds(5)); // 消费者手动确认 await _channel.BasicAckAsync(deliveryTag, multiple: false);
问题: 使用固定分隔符,缺乏层次结构
解决: 采用点分层次结构,便于通配符匹配
问题: 消费者处理速度跟不上生产速度
解决: 合理设置QoS,监控队列长度,必要时启用懒队列
问题: 频繁创建销毁连接,影响性能
解决: 使用连接池,合理复用Channel
这套Topic模式不仅适用于工业场景,还可以应用于:
电商系统:region.category.brand.product.event
east.mobile.*.*.promotion *.*.apple.*.* 金融交易:market.instrument.currency.action.level
us.*.*.*.* *.*.*.risk.high 游戏系统:server.zone.player.action.type
server01.*.*.pvp.* *.*.*.*payment 随着云原生技术的发展,RabbitMQ也在不断进化。Quorum队列、Streams等新特性为高可用性和大吞吐量场景提供了更好的支持。
但无论技术如何发展,Topic模式的核心优势——灵活的路由控制——始终是构建复杂分布式系统的重要工具。
记住这几个要诀:
最后,抛给大家几个思考题:
如果你在使用RabbitMQ时遇到过什么有趣的问题,或者有什么独特的应用场景,欢迎在评论区分享!
技术要点回顾:
收藏级代码模板: 本文提供的RabbitMQManager类可以直接应用到你的项目中,记得根据实际需求调整参数配置!
觉得这篇文章对你有帮助吗?请转发给更多需要的同行,让更多C#开发者掌握这个强大的消息路由技巧!
---
关注我的公众号,获取更多实用的C#开发技巧和最佳实践!
🚀 C#工程师必看:RabbitMQ主题模式实战指南
相关信息
通过网盘分享的文件:AppDataCollectionMQ.zip 链接: https://pan.baidu.com/s/1gLvxjCeLi2gIrHuduLFT3Q?pwd=tbnv 提取码: tbnv --来自百度网盘超级会员v9的分享
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!