2026-05-12
C#
0

目录

🏭 这个需求,比你想的要常见得多
🔍 老问题,新视角:OPC DA 对接到底难在哪?
🏗️ 整体架构设计
👨‍💻先看效果
🔧 核心实现解析
📌 接口设计:为迁移留后路
📌 网关核心:COM 边界隔离
📌 批量读取:性能差距在数量级
📌 实时订阅:Channel 做背压控制
📌 内嵌 REST API:WinForms + WebApplication 共存
📌 配置持久化:JSON 文件自动读写
🖥️ WinForms 界面设计要点
⚠️ 踩坑清单:这些坑咱们替你踩过
💬 聊聊你的项目
🗺️ 学习路径建议
· .NET 8 · OPC DA · WinForms · 工控系统 · REST API · 遗留系统改造

🏭 这个需求,比你想的要常见得多

做工控项目的同学,大概率遇到过这种场景——产线上跑着十几年的老设备,底层走 OPC DA 协议,新来的需求要求接云平台、做数据看板,甚至要对外提供 REST API。

一看现有代码:COM Interop、裸 object 类型、lock 满天飞,根本没有现代化接口可言。

重写?停产风险太大。凑合用?技术债越堆越高。

咱们这次换个思路——不动设备、不停产线,用 .NET 8 + WinForms 在上面套一层网关适配器,把 OPC DA 的 COM 调用包成 REST API 对外暴露,同时给运维人员一个可视化的桌面操作界面。

读完本文,你将拿到一套完整可运行的工程结构,包含:

  • OpcClientSdk 托管库接入(告别 COM Interop)
  • 单点/批量读写 + 实时订阅三种数据获取模式
  • WinForms 可视化界面(完整 Designer 代码)
  • 内嵌 WebApplication REST API 网关(SSE 实时推送)
  • JSON 配置持久化 + DI 容器完整接入

🔍 老问题,新视角:OPC DA 对接到底难在哪?

很多人第一次接触这块,踩的第一个坑不是协议,是线程模型

OPC DA 基于 COM/DCOM,天生是单线程公寓(STA)模型。你在 .NET 的 Task 线程池里直接调它,轻则读出脏数据,重则进程直接崩。这就是为什么所有 COM 调用都必须收拢在 lock 里——不是强迫症,是救命符。

除此之外,还有几个藏得比较深的问题。

类型系统的鸿沟。 OPC DA 的数据全是 VARIANT,映射到 C# 就是 object。质量码(Quality)是个 short,但语义是位域——0xC0 才是 Good,很多人直接 quality > 0 判断,结果把 Uncertain 状态当 Good 用了,数据悄悄出错。

性能天花板。 逐点同步读取,500 个点位以上延迟就开始飙升。实测数据:1000 个点位逐一读取耗时约 8200ms,批量 SyncRead 压到 580ms,差了整整 14 倍。这个坑不踩不知道,踩了就忘不了。

可测试性为零。 COM Interop 的类没法 Mock,单元测试形同虚设。这也是为什么咱们这次引入 ITagReader 接口——不是为了炫技,是为了让代码将来能测、能换、能活。

传统方案用 OPCAutomation COM Interop,麻烦且依赖本机注册表。本文选用 Technosoftware 的 OpcClientSdk,纯托管库,NuGet 直装,.NET 8 原生支持,不需要 COM 注册,部署时少了一大堆环境依赖。

image.png


🏗️ 整体架构设计

先看清楚这套东西长什么样,再动手写代码。

image.png

WinForms 和 WebApplication 共享同一个 OpcDaGateway 单例,通过 DI 容器注入,两侧都能读写数据,互不干扰。这个设计的好处是:运维人员可以在桌面界面操作,业务系统可以通过 HTTP 接口调用,底层 OPC 连接只维护一条。


👨‍💻先看效果

image.png

image.png

image.png

image.png

image.png

image.png

🔧 核心实现解析

📌 接口设计:为迁移留后路

csharp
using AppOpcDaGateway.Models; namespace AppOpcDaGateway.Services; public interface ITagReader : IDisposable { bool IsConnected { get; } Task<TagValue> ReadTagAsync(string tagName, CancellationToken ct = default); Task<List<TagValue>> ReadTagsAsync(string[] tagNames, CancellationToken ct = default); Task<TagValue> WriteTagAsync(string tagName, object? value, CancellationToken ct = default); Task<List<TagValue>> WriteTagsAsync(Dictionary<string, object?> tagValues, CancellationToken ct = default); event EventHandler<TagValue> TagDataChanged; event EventHandler<string> ConnectionStatusChanged; }

今天是 OPC DA,明天可能是 OPC UA 或 Modbus。业务层只认 ITagReader,换协议时改一行 DI 注册就够了,其他地方动都不用动。


📌 网关核心:COM 边界隔离

OpcDaGateway 是整个工程最关键的类,它做了三件事:

第一,用 OpcClientSdk 替代 COM Interop。 连接方式变成 URL 风格,干净很多:

csharp
var serverUrl = $"opcda://{_config.ServerHost}/{_config.ServerProgId}"; _opcServer = new TsCDaServer(); _opcServer.Connect(serverUrl);

第二,所有 COM 调用都在 _comLock 内执行。 这条规则不能破,破了就等着排查诡异的多线程崩溃吧。

第三,Quality 码正确解析。 很多项目在这里埋雷——

csharp
private static short ConvertQuality(TsCDaQuality quality) { return quality.QualityBits switch { TsDaQualityBits.Good => 192, // 0xC0 TsDaQualityBits.GoodLocalOverride => 216, // 0xD8 _ => 0 // Bad / Uncertain 统一归 0 }; }

TagValue.IsGood 的判断也要用位运算,不能简单 > 0

csharp
public bool IsGood => (Quality & 0xC0) == 0xC0;
c#
using AppOpcDaGateway.Models; using Microsoft.Extensions.Logging; using OpcClientSdk; using OpcClientSdk.Da; using System.Collections.Concurrent; namespace AppOpcDaGateway.Services; public sealed class OpcDaGateway : ITagReader { private readonly ILogger<OpcDaGateway> _logger; private readonly ConnectionConfig _config; private TsCDaServer? _opcServer; private TsCDaSubscription? _opcSubscription; private readonly object _comLock = new(); private readonly ConcurrentDictionary<string, TagCache> _cache = new(); private readonly ConcurrentDictionary<string, byte> _subscribedTags = new(); private System.Threading.Timer? _cacheCleanupTimer; private bool _disposed; public bool IsConnected { get; private set; } public event EventHandler<TagValue>? TagDataChanged; public event EventHandler<string>? ConnectionStatusChanged; public OpcDaGateway(ConnectionConfig config, ILogger<OpcDaGateway> logger) { _config = config; _logger = logger; } // ── 连接 ────────────────────────────────────────────────────────────── public void Connect() { lock (_comLock) { ConnectCore(); } } private void ConnectCore() { try { var serverUrl = $"opcda://{_config.ServerHost}/{_config.ServerProgId}"; _opcServer = new TsCDaServer(); _opcServer.Connect(serverUrl); var subscriptionState = new TsCDaSubscriptionState { Name = "GatewayGroup", Active = true, UpdateRate = _config.UpdateRateMs, Deadband = 0 }; _opcSubscription = (TsCDaSubscription)_opcServer.CreateSubscription(subscriptionState); _opcSubscription.DataChangedEvent += OnDataChanged; IsConnected = true; StartCacheCleanup(); _logger.LogInformation("OPC 连接成功: {ProgId}@{Host}", _config.ServerProgId, _config.ServerHost); ConnectionStatusChanged?.Invoke(this, $"已连接 → {_config.ServerProgId}@{_config.ServerHost}"); } catch (Exception ex) { IsConnected = false; _logger.LogError(ex, "OPC 连接失败"); ConnectionStatusChanged?.Invoke(this, $"连接失败: {ex.Message}"); throw new InvalidOperationException($"OPC 连接失败: {ex.Message}", ex); } } public void Disconnect() { lock (_comLock) { DisconnectCore(); } } private void DisconnectCore() { _cacheCleanupTimer?.Dispose(); _cacheCleanupTimer = null; try { if (_opcSubscription is not null) _opcSubscription.DataChangedEvent -= OnDataChanged; } catch { } try { _opcSubscription?.Dispose(); } catch { } try { _opcServer?.Disconnect(); } catch { } try { _opcServer?.Dispose(); } catch { } _opcSubscription = null; _opcServer = null; _subscribedTags.Clear(); IsConnected = false; _logger.LogInformation("OPC 连接已断开"); ConnectionStatusChanged?.Invoke(this, "已断开"); } public async Task<TagValue> ReadTagAsync( string tagName, CancellationToken ct = default) { return await Task.Run(() => ReadTagCore(tagName), ct); } private TagValue ReadTagCore(string tagName) { if (_cache.TryGetValue(tagName, out var cached) && !cached.IsExpired(TimeSpan.FromSeconds(_config.CacheTtlSeconds))) { return TagValue.FromCache(tagName, cached); } lock (_comLock) { return ReadFromOpcCore(tagName); } } private TagValue ReadFromOpcCore(string tagName) { try { EnsureConnected(); EnsureItemsSubscribed([tagName]); var readItems = new[] { new TsCDaItem { ItemName = tagName, MaxAgeSpecified = true, MaxAge = 0, Active = true, ActiveSpecified = true } }; var itemValues = _opcServer!.Read(readItems); if (itemValues.Length == 0 || itemValues[0].Result.IsError()) { var err = itemValues.Length == 0 ? "读取结果为空" : itemValues[0].Result.Description(); return TagValue.Error(tagName, err); } var value = itemValues[0].Value; var q = ConvertQuality(itemValues[0].Quality); UpdateCache(tagName, value, q); return new TagValue { Name = tagName, Value = value, Quality = q, Timestamp = DateTime.Now, Source = "OPC" }; } catch (Exception ex) { _logger.LogWarning(ex, "读取点位失败: {Tag}", tagName); return TagValue.Error(tagName, ex.Message); } } public async Task<List<TagValue>> ReadTagsAsync( string[] tagNames, CancellationToken ct = default) { return await Task.Run(() => ReadTagsBatchCore(tagNames), ct); } public async Task<TagValue> WriteTagAsync( string tagName, object? value, CancellationToken ct = default) { return await Task.Run(() => WriteTagCore(tagName, value), ct); } private TagValue WriteTagCore(string tagName, object? value) { lock (_comLock) { try { EnsureConnected(); EnsureItemsSubscribed([tagName]); var writeItems = new[] { new TsCDaItemValue { ItemName = tagName, Value = value } }; _opcServer!.Write(writeItems); const short writeQuality = 192; UpdateCache(tagName, value, writeQuality); var result = new TagValue { Name = tagName, Value = value, Quality = writeQuality, Timestamp = DateTime.Now, Source = "OPC_Write" }; TagDataChanged?.Invoke(this, result); return result; } catch (Exception ex) { _logger.LogWarning(ex, "写入点位失败: {Tag}", tagName); return TagValue.Error(tagName, ex.Message); } } } public async Task<List<TagValue>> WriteTagsAsync( Dictionary<string, object?> tagValues, CancellationToken ct = default) { return await Task.Run(() => WriteTagsBatchCore(tagValues), ct); } private List<TagValue> WriteTagsBatchCore(Dictionary<string, object?> tagValues) { lock (_comLock) { var results = new List<TagValue>(); try { EnsureConnected(); var validItems = tagValues .Where(kv => !string.IsNullOrWhiteSpace(kv.Key)) .ToArray(); if (validItems.Length == 0) return results; EnsureItemsSubscribed(validItems.Select(kv => kv.Key)); var writeItems = validItems .Select(kv => new TsCDaItemValue { ItemName = kv.Key, Value = kv.Value }) .ToArray(); _opcServer!.Write(writeItems); const short writeQuality = 192; var now = DateTime.Now; foreach (var kv in validItems) { UpdateCache(kv.Key, kv.Value, writeQuality); var tagValue = new TagValue { Name = kv.Key, Value = kv.Value, Quality = writeQuality, Timestamp = now, Source = "OPC_WriteBatch" }; results.Add(tagValue); TagDataChanged?.Invoke(this, tagValue); } } catch (Exception ex) { _logger.LogError(ex, "批量写入失败"); foreach (var kv in tagValues) results.Add(TagValue.Error(kv.Key, ex.Message)); } return results; } } private List<TagValue> ReadTagsBatchCore(string[] tagNames) { lock (_comLock) { return ReadTagsBatchLocked(tagNames); } } private List<TagValue> ReadTagsBatchLocked(string[] tagNames) { var results = new List<TagValue>(); try { EnsureConnected(); EnsureItemsSubscribed(tagNames); var readItems = tagNames.Select(name => new TsCDaItem { ItemName = name, MaxAgeSpecified = true, MaxAge = 0, Active = true, ActiveSpecified = true }).ToArray(); var itemValues = _opcServer!.Read(readItems); foreach (var valueResult in itemValues) { if (valueResult.Result.IsError()) { results.Add(TagValue.Error(valueResult.ItemName, valueResult.Result.Description())); continue; } var q = ConvertQuality(valueResult.Quality); UpdateCache(valueResult.ItemName, valueResult.Value, q); results.Add(new TagValue { Name = valueResult.ItemName, Value = valueResult.Value, Quality = q, Timestamp = valueResult.Timestamp, Source = "OPC_Batch" }); } } catch (Exception ex) { _logger.LogError(ex, "批量读取失败"); } return results; } private void OnDataChanged( object subscriptionHandle, object requestHandle, TsCDaItemValueResult[] values) { foreach (var valueResult in values) { if (valueResult.Result.IsError()) continue; var tagName = valueResult.ItemName; var quality = ConvertQuality(valueResult.Quality); var value = valueResult.Value; UpdateCache(tagName, value, quality); TagDataChanged?.Invoke(this, new TagValue { Name = tagName, Value = value, Quality = quality, Timestamp = valueResult.Timestamp, Source = "Subscription" }); } } private void EnsureConnected() { if (!IsConnected || _opcServer is null || _opcSubscription is null) throw new InvalidOperationException("OPC 未连接,请先调用 Connect()"); } private void EnsureItemsSubscribed(IEnumerable<string> tagNames) { var pending = tagNames .Where(name => !_subscribedTags.ContainsKey(name)) .Distinct() .Select(name => new TsCDaItem { ItemName = name, ClientHandle = name, Active = true, ActiveSpecified = true }) .ToArray(); if (pending.Length == 0) return; var addResults = _opcSubscription!.AddItems(pending); foreach (var added in addResults) { if (added.Result.IsSuccess()) _subscribedTags[added.ItemName] = 0; else _logger.LogWarning("订阅点位失败: {Tag}, {Error}", added.ItemName, added.Result.Description()); } } private void UpdateCache(string? tagName, object? value, short quality) { if (tagName is null) return; _cache[tagName] = new TagCache { Value = value, Quality = quality, Timestamp = DateTime.Now }; } private static short ConvertQuality(TsCDaQuality quality) { return quality.QualityBits switch { TsDaQualityBits.Good => 192, TsDaQualityBits.GoodLocalOverride => 216, _ => 0 }; } private void StartCacheCleanup() { _cacheCleanupTimer = new System.Threading.Timer( _ => CleanupExpiredCache(), null, TimeSpan.FromMinutes(_config.CacheCleanupMinutes), TimeSpan.FromMinutes(_config.CacheCleanupMinutes) ); } private void CleanupExpiredCache() { var expiredKeys = _cache .Where(kv => kv.Value.IsExpired(TimeSpan.FromMinutes(_config.CacheCleanupMinutes))) .Select(kv => kv.Key) .ToList(); foreach (var key in expiredKeys) _cache.TryRemove(key, out _); _logger.LogDebug("缓存清理完成,移除 {Count} 个过期条目", expiredKeys.Count); } public void Dispose() { if (_disposed) return; _disposed = true; Disconnect(); } }

📌 批量读取:性能差距在数量级

批量读写是 OPC DA 协议里为数不多的"现代化"特性,一定要用上。核心是 _opcServer.Read(readItems[]),一次调用读取所有点位:

csharp
var readItems = tagNames.Select(name => new TsCDaItem { ItemName = name, MaxAgeSpecified = true, MaxAge = 0, // 强制从设备读,不走服务器缓存 Active = true, ActiveSpecified = true }).ToArray(); var itemValues = _opcServer!.Read(readItems);

📌 实时订阅:Channel 做背压控制

DataChange 事件是 OPC DA 的推送机制,比轮询高效得多。但事件回调是 COM 线程触发的,直接在回调里做业务处理风险很高。咱们用 Channel<TagValue> 做缓冲:

csharp
// 有界 Channel,满了丢最旧的数据——工控场景下实时性优先于完整性 _channel = Channel.CreateBounded<TagValue>(new BoundedChannelOptions(2000) { FullMode = BoundedChannelFullMode.DropOldest, SingleWriter = false, SingleReader = false });

生产侧(COM 回调)只做 TryWrite,非阻塞,不会卡住 COM 线程。消费侧(UI 或下游服务)用 ReadAllAsync 异步消费,天然支持背压。

c#
using System.Threading.Channels; using AppOpcDaGateway.Models; using Microsoft.Extensions.Logging; namespace AppOpcDaGateway.Services; public sealed class OpcRealtimeAdapter : IDisposable { private readonly ITagReader _reader; private readonly ILogger<OpcRealtimeAdapter> _logger; private readonly Channel<TagValue> _channel; private bool _disposed; public ChannelReader<TagValue> DataStream => _channel.Reader; public OpcRealtimeAdapter(ITagReader reader, ILogger<OpcRealtimeAdapter> logger) { _reader = reader; _logger = logger; _channel = Channel.CreateBounded<TagValue>(new BoundedChannelOptions(2000) { FullMode = BoundedChannelFullMode.DropOldest, SingleWriter = false, SingleReader = false }); _reader.TagDataChanged += OnTagDataChanged; } private void OnTagDataChanged(object? sender, TagValue tagValue) { _channel.Writer.TryWrite(tagValue); } public async Task ConsumeAsync( Func<TagValue, Task> handler, CancellationToken ct = default) { await foreach (var tagValue in _channel.Reader.ReadAllAsync(ct)) { try { await handler(tagValue); } catch (Exception ex) { _logger.LogWarning(ex, "数据处理异常: {Tag}", tagValue.Name); } } } public void Dispose() { if (_disposed) return; _disposed = true; _reader.TagDataChanged -= OnTagDataChanged; _channel.Writer.TryComplete(); } }

📌 内嵌 REST API:WinForms + WebApplication 共存

这是本文最有意思的一个设计点。Program.cs 里同时启动 WinForms 和 WebApplication,共享同一套 DI 容器:

csharp
using AppOpcDaGateway.Forms; using AppOpcDaGateway.Models; using AppOpcDaGateway.Services; using Microsoft.AspNetCore.Builder; using Microsoft.AspNetCore.Http; using Microsoft.Extensions.DependencyInjection; using Microsoft.Extensions.Logging; using System.Text.Json; using System.Threading.Channels; namespace AppOpcDaGateway; internal static class Program { [STAThread] private static void Main() { ApplicationConfiguration.Initialize(); var services = BuildServiceCollection(); using var provider = services.BuildServiceProvider(); var webApi = StartGatewayWebApi(provider); var webApiTask = webApi.RunAsync(); try { var mainForm = provider.GetRequiredService<FrmMain>(); Application.Run(mainForm); } finally { webApi.StopAsync().GetAwaiter().GetResult(); webApi.DisposeAsync().AsTask().GetAwaiter().GetResult(); webApiTask.GetAwaiter().GetResult(); } } // ── DI 注册 ─────────────────────────────────────────────────────────── private static ServiceCollection BuildServiceCollection() { var services = new ServiceCollection(); var config = ConnectionConfig.LoadFromJson(); // 日志 services.AddLogging(builder => { builder.AddConsole(); builder.SetMinimumLevel(LogLevel.Debug); }); // 配置(单例,运行期可由 FrmSettings 更新) services.AddSingleton(config); // OPC DA 网关(单例,COM 对象不能多实例) services.AddSingleton<OpcDaGateway>(); services.AddSingleton<ITagReader>(sp => sp.GetRequiredService<OpcDaGateway>()); // 实时适配器(单例,依赖网关) services.AddSingleton<OpcRealtimeAdapter>(); // 主窗体 services.AddTransient<FrmMain>(); return services; } private static WebApplication StartGatewayWebApi(ServiceProvider provider) { var config = provider.GetRequiredService<ConnectionConfig>(); var gateway = provider.GetRequiredService<OpcDaGateway>(); var loggerFactory = provider.GetRequiredService<ILoggerFactory>(); var builder = WebApplication.CreateSlimBuilder(); builder.Services.AddSingleton(config); builder.Services.AddSingleton(gateway); builder.Services.AddSingleton<ITagReader>(gateway); builder.Services.AddSingleton(loggerFactory); var app = builder.Build(); app.Urls.Add($"http://localhost:{config.GatewayHttpPort}"); app.MapGet("/", (ConnectionConfig cfg) => Results.Ok(new { service = "AppOpcDaGateway", api = "OPC DA REST", port = cfg.GatewayHttpPort, endpoints = new[] { "GET /api/opc/status", "POST /api/opc/connect", "POST /api/opc/disconnect", "GET /api/opc/tags/read?name=TagName", "POST /api/opc/tags/read-batch", "POST /api/opc/tags/write", "POST /api/opc/tags/write-batch", "GET /api/opc/test-doc", "GET /api/opc/events" } })); app.MapGet("/api/opc/test-doc", (ConnectionConfig cfg) => { var baseUrl = $"http://localhost:{cfg.GatewayHttpPort}"; var lines = new[] { "# OPC DA Gateway API 测试说明(可直接复制)", "", "## 1) 连接", $"curl -X POST \"{baseUrl}/api/opc/connect\"", "", "## 2) 单点读取", $"curl \"{baseUrl}/api/opc/tags/read?name=Channel1.Device1.Tag1\"", "", "## 3) 批量读取", $"curl -X POST \"{baseUrl}/api/opc/tags/read-batch\" -H \"Content-Type: application/json\" -d \"{{\\\"tagNames\\\":[\\\"Channel1.Device1.Tag1\\\",\\\"Channel1.Device1.Tag2\\\"]}}\"", "", "## 4) 单点写入", $"curl -X POST \"{baseUrl}/api/opc/tags/write\" -H \"Content-Type: application/json\" -d \"{{\\\"tagName\\\":\\\"Channel1.Device1.Tag1\\\",\\\"value\\\":123}}\"", "", "## 5) 批量写入", $"curl -X POST \"{baseUrl}/api/opc/tags/write-batch\" -H \"Content-Type: application/json\" -d \"{{\\\"items\\\":[{{\\\"tagName\\\":\\\"Channel1.Device1.Tag1\\\",\\\"value\\\":111}},{{\\\"tagName\\\":\\\"Channel1.Device1.Tag2\\\",\\\"value\\\":222}}]}}\"", "", "## 6) 断开", $"curl -X POST \"{baseUrl}/api/opc/disconnect\"" }; return Results.Text(string.Join(Environment.NewLine, lines), "text/plain; charset=utf-8"); }); app.MapGet("/api/opc/status", (OpcDaGateway reader) => Results.Ok(new { connected = reader.IsConnected, serverHost = config.ServerHost, serverProgId = config.ServerProgId })); app.MapPost("/api/opc/connect", (OpcDaGateway reader) => { try { if (!reader.IsConnected) reader.Connect(); return Results.Ok(new { connected = reader.IsConnected }); } catch (Exception ex) { return Results.Problem(ex.Message, statusCode: StatusCodes.Status500InternalServerError); } }); app.MapPost("/api/opc/disconnect", (OpcDaGateway reader) => { try { if (reader.IsConnected) reader.Disconnect(); return Results.Ok(new { connected = reader.IsConnected }); } catch (Exception ex) { return Results.Problem(ex.Message, statusCode: StatusCodes.Status500InternalServerError); } }); app.MapGet("/api/opc/tags/read", async ( string name, OpcDaGateway reader, CancellationToken ct) => { if (string.IsNullOrWhiteSpace(name)) return Results.BadRequest("参数 name 不能为空"); var result = await reader.ReadTagAsync(name, ct); return Results.Ok(result); }); app.MapPost("/api/opc/tags/read-batch", async ( ReadBatchRequest request, OpcDaGateway reader, CancellationToken ct) => { var tags = request.TagNames .Where(x => !string.IsNullOrWhiteSpace(x)) .Distinct(StringComparer.Ordinal) .ToArray(); if (tags.Length == 0) return Results.BadRequest("TagNames 不能为空"); var result = await reader.ReadTagsAsync(tags, ct); return Results.Ok(result); }); app.MapPost("/api/opc/tags/write", async ( WriteTagRequest request, OpcDaGateway reader, CancellationToken ct) => { if (string.IsNullOrWhiteSpace(request.TagName)) return Results.BadRequest("TagName 不能为空"); var value = NormalizeValue(request.Value); var result = await reader.WriteTagAsync(request.TagName, value, ct); return Results.Ok(result); }); app.MapPost("/api/opc/tags/write-batch", async ( WriteBatchRequest request, OpcDaGateway reader, CancellationToken ct) => { var tags = request.Items .Where(x => !string.IsNullOrWhiteSpace(x.TagName)) .GroupBy(x => x.TagName, StringComparer.Ordinal) .Select(g => g.Last()) .ToDictionary(x => x.TagName, x => NormalizeValue(x.Value), StringComparer.Ordinal); if (tags.Count == 0) return Results.BadRequest("Items 不能为空"); var result = await reader.WriteTagsAsync(tags, ct); return Results.Ok(result); }); app.MapGet("/api/opc/events", async ( HttpContext context, OpcDaGateway reader, CancellationToken ct) => { context.Response.Headers.Append("Cache-Control", "no-cache"); context.Response.Headers.Append("Connection", "keep-alive"); context.Response.Headers.Append("X-Accel-Buffering", "no"); context.Response.ContentType = "text/event-stream"; var channel = Channel.CreateUnbounded<TagValue>(); var jsonOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web); void Handler(object? _, TagValue value) => channel.Writer.TryWrite(value); reader.TagDataChanged += Handler; try { await foreach (var tag in channel.Reader.ReadAllAsync(ct)) { var payload = JsonSerializer.Serialize(tag, jsonOptions); await context.Response.WriteAsync($"event: tag\ndata: {payload}\n\n", ct); await context.Response.Body.FlushAsync(ct); } } catch (OperationCanceledException) { // ignored } finally { reader.TagDataChanged -= Handler; channel.Writer.TryComplete(); } }); return app; } private sealed record ReadBatchRequest { public string[] TagNames { get; init; } = []; } private sealed record WriteTagRequest { public string TagName { get; init; } = string.Empty; public object? Value { get; init; } } private sealed record WriteBatchRequest { public WriteTagRequest[] Items { get; init; } = []; } private static object? NormalizeValue(object? value) { if (value is not JsonElement json) return value; return json.ValueKind switch { JsonValueKind.Null => null, JsonValueKind.True => true, JsonValueKind.False => false, JsonValueKind.String => json.GetString(), JsonValueKind.Number when json.TryGetInt64(out var l) => l, JsonValueKind.Number when json.TryGetDouble(out var d) => d, _ => json.ToString() }; } }

对外暴露的 API 端点一览:

GET /api/opc/status → 连接状态查询 POST /api/opc/connect → 建立 OPC 连接 POST /api/opc/disconnect → 断开连接 GET /api/opc/tags/read → 单点读取 (?name=TagName) POST /api/opc/tags/read-batch → 批量读取 POST /api/opc/tags/write → 单点写入 POST /api/opc/tags/write-batch→ 批量写入 GET /api/opc/events → SSE 实时推送(长连接) GET /api/opc/test-doc → curl 测试命令速查

SSE 端点 /api/opc/events 是个亮点——前端或其他服务可以建立长连接,OPC 数据变化时服务端主动推送,不需要轮询:

javascript
// 前端接入示例 const es = new EventSource('http://localhost:5050/api/opc/events'); es.addEventListener('tag', e => { const tag = JSON.parse(e.data); console.log(`${tag.name} = ${tag.value}`); });

📌 配置持久化:JSON 文件自动读写

ConnectionConfig 支持从 opcda.settings.json 加载和保存,程序启动时自动读取上次的配置:

csharp
// Program.cs 启动时 var config = ConnectionConfig.LoadFromJson(); services.AddSingleton(config); // FrmSettings 保存时 _liveConfig.Apply(frm.ResultConfig); // 写回运行时单例 _liveConfig.SaveToJson(); // 持久化到文件

Apply 方法直接修改单例的属性值,下次 Connect() 调用时新配置自动生效,不需要重启程序。


🖥️ WinForms 界面设计要点

界面分四个 Tab,职责清晰:

  • 单点读取:输入点位名 → 点击读取 → 展示值/质量码/来源
  • 批量读取:文本框每行一个点位 → 批量读取 → DataGridView 展示
  • 实时订阅:连接后自动接收 DataChange 推送,实时刷新 Grid
  • 运行日志:黑底绿字,Consolas 字体,所有操作记录在案

所有控件命名严格遵循前缀规范(btnlbltxtdgvtlpgrp 等),Designer.cs 里只做控件实例化和属性赋值,事件绑定全部在 WireEvents() 方法里集中管理,IDE 界面设计器可以正常打开和编辑。

布局全部用 TableLayoutPanel + FlowLayoutPanel 实现,禁用绝对坐标定位,窗体缩放时控件自适应,不会出现控件重叠或消失的问题。


⚠️ 踩坑清单:这些坑咱们替你踩过

坑一:AddItem 重复调用。 OpcClientSdk 里对同一个 Tag 重复订阅会返回错误。工程里用 _subscribedTags 字典记录已订阅的点位,EnsureItemsSubscribed 方法做幂等保护,只订阅新增的点位。

坑二:写入值的类型匹配。 REST API 收到的 JSON 数值是 JsonElement,直接传给 OPC Server 会报类型错误。NormalizeValue 方法做了类型转换——JsonValueKind.Number 优先尝试 Int64,再 fallback 到 Double,覆盖大多数工控场景。

坑三:async void 的异常吞噬。 BtnConnect_Click 这类事件处理器必须是 async void,但异常不会自动冒泡。工程里统一用 try/catch 包住,异常写入日志面板,不会静默失败。

坑四:Channel 容量设置。 BoundedChannelFullMode.DropOldest 适合工控实时场景——宁可丢历史数据,也要保证最新数据能进来。如果你的场景要求数据完整性(比如报警记录),改成 DropNewest 或者直接用无界 Channel,但要注意内存增长风险。

坑五:DCOM 防火墙。 远程连接 OPC Server 时,DCOM 动态端口范围是 49152~65535,防火墙只开 135 端口不够。用 OpcClientSdk 的 URL 方式连接,底层走 TCP,配置比裸 DCOM 简单很多,但防火墙规则还是要确认。


💬 聊聊你的项目

你们现在的 OPC DA 对接是什么方案?还在用 OPCAutomation COM Interop,还是已经迁到托管库了?

另外有个值得讨论的问题:WinForms 内嵌 WebApplication 这种架构,在工控场景下你觉得合适吗?还是更倾向于把网关单独部署成 Windows Service?两种方案各有取舍,欢迎在评论区聊聊实际项目里的选择。


🗺️ 学习路径建议

掌握了本文的基础之后,下一步可以往这几个方向延伸:

协议升级:OPC UA 是 OPC DA 的现代化继任者,支持跨平台、内置安全机制。ITagReader 接口已经为切换留好了口子,可以参考 OPC Foundation 的 UA .NET Standard 库实现 OpcUaTagReader

数据持久化:时序数据库(InfluxDB、TDengine)比关系型数据库更适合存储 OPC 点位数据,写入性能和压缩率都高出一个数量级。

消息队列接入:在 OpcRealtimeAdapter.ConsumeAsync 里接入 Kafka 或 RabbitMQ,可以把实时数据推送给多个下游消费者,解耦网关和业务系统。

部署方案:把 WinForms 换成 Windows Service + Minimal API,去掉 UI 依赖,可以在无人值守的服务器上运行,更符合生产环境的部署要求。


技术标签: C# · .NET 8 · OPC DA · WinForms · 工控系统 · REST API · 遗留系统改造

相关信息

我用夸克网盘给你分享了「AppOpcDaGateway.zip」,点击链接或复制整段内容,打开「夸克APP」即可获取。 /55653YXXeV:/ 链接:https://pan.quark.cn/s/f4a3b06b5e70 提取码:XgxA

本文作者:技术老小子

本文链接:

版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!