做工控项目的同学,大概率遇到过这种场景——产线上跑着十几年的老设备,底层走 OPC DA 协议,新来的需求要求接云平台、做数据看板,甚至要对外提供 REST API。
一看现有代码:COM Interop、裸 object 类型、lock 满天飞,根本没有现代化接口可言。
重写?停产风险太大。凑合用?技术债越堆越高。
咱们这次换个思路——不动设备、不停产线,用 .NET 8 + WinForms 在上面套一层网关适配器,把 OPC DA 的 COM 调用包成 REST API 对外暴露,同时给运维人员一个可视化的桌面操作界面。
读完本文,你将拿到一套完整可运行的工程结构,包含:
很多人第一次接触这块,踩的第一个坑不是协议,是线程模型。
OPC DA 基于 COM/DCOM,天生是单线程公寓(STA)模型。你在 .NET 的 Task 线程池里直接调它,轻则读出脏数据,重则进程直接崩。这就是为什么所有 COM 调用都必须收拢在 lock 里——不是强迫症,是救命符。
除此之外,还有几个藏得比较深的问题。
类型系统的鸿沟。 OPC DA 的数据全是 VARIANT,映射到 C# 就是 object。质量码(Quality)是个 short,但语义是位域——0xC0 才是 Good,很多人直接 quality > 0 判断,结果把 Uncertain 状态当 Good 用了,数据悄悄出错。
性能天花板。 逐点同步读取,500 个点位以上延迟就开始飙升。实测数据:1000 个点位逐一读取耗时约 8200ms,批量 SyncRead 压到 580ms,差了整整 14 倍。这个坑不踩不知道,踩了就忘不了。
可测试性为零。 COM Interop 的类没法 Mock,单元测试形同虚设。这也是为什么咱们这次引入 ITagReader 接口——不是为了炫技,是为了让代码将来能测、能换、能活。
传统方案用 OPCAutomation COM Interop,麻烦且依赖本机注册表。本文选用 Technosoftware 的 OpcClientSdk,纯托管库,NuGet 直装,.NET 8 原生支持,不需要 COM 注册,部署时少了一大堆环境依赖。

先看清楚这套东西长什么样,再动手写代码。

WinForms 和 WebApplication 共享同一个 OpcDaGateway 单例,通过 DI 容器注入,两侧都能读写数据,互不干扰。这个设计的好处是:运维人员可以在桌面界面操作,业务系统可以通过 HTTP 接口调用,底层 OPC 连接只维护一条。






csharpusing AppOpcDaGateway.Models;
namespace AppOpcDaGateway.Services;
public interface ITagReader : IDisposable
{
bool IsConnected { get; }
Task<TagValue> ReadTagAsync(string tagName, CancellationToken ct = default);
Task<List<TagValue>> ReadTagsAsync(string[] tagNames, CancellationToken ct = default);
Task<TagValue> WriteTagAsync(string tagName, object? value, CancellationToken ct = default);
Task<List<TagValue>> WriteTagsAsync(Dictionary<string, object?> tagValues, CancellationToken ct = default);
event EventHandler<TagValue> TagDataChanged;
event EventHandler<string> ConnectionStatusChanged;
}
今天是 OPC DA,明天可能是 OPC UA 或 Modbus。业务层只认 ITagReader,换协议时改一行 DI 注册就够了,其他地方动都不用动。
OpcDaGateway 是整个工程最关键的类,它做了三件事:
第一,用 OpcClientSdk 替代 COM Interop。 连接方式变成 URL 风格,干净很多:
csharpvar serverUrl = $"opcda://{_config.ServerHost}/{_config.ServerProgId}";
_opcServer = new TsCDaServer();
_opcServer.Connect(serverUrl);
第二,所有 COM 调用都在 _comLock 内执行。 这条规则不能破,破了就等着排查诡异的多线程崩溃吧。
第三,Quality 码正确解析。 很多项目在这里埋雷——
csharpprivate static short ConvertQuality(TsCDaQuality quality)
{
return quality.QualityBits switch
{
TsDaQualityBits.Good => 192, // 0xC0
TsDaQualityBits.GoodLocalOverride => 216, // 0xD8
_ => 0 // Bad / Uncertain 统一归 0
};
}
TagValue.IsGood 的判断也要用位运算,不能简单 > 0:
csharppublic bool IsGood => (Quality & 0xC0) == 0xC0;
c#using AppOpcDaGateway.Models;
using Microsoft.Extensions.Logging;
using OpcClientSdk;
using OpcClientSdk.Da;
using System.Collections.Concurrent;
namespace AppOpcDaGateway.Services;
public sealed class OpcDaGateway : ITagReader
{
private readonly ILogger<OpcDaGateway> _logger;
private readonly ConnectionConfig _config;
private TsCDaServer? _opcServer;
private TsCDaSubscription? _opcSubscription;
private readonly object _comLock = new();
private readonly ConcurrentDictionary<string, TagCache> _cache = new();
private readonly ConcurrentDictionary<string, byte> _subscribedTags = new();
private System.Threading.Timer? _cacheCleanupTimer;
private bool _disposed;
public bool IsConnected { get; private set; }
public event EventHandler<TagValue>? TagDataChanged;
public event EventHandler<string>? ConnectionStatusChanged;
public OpcDaGateway(ConnectionConfig config, ILogger<OpcDaGateway> logger)
{
_config = config;
_logger = logger;
}
// ── 连接 ──────────────────────────────────────────────────────────────
public void Connect()
{
lock (_comLock)
{
ConnectCore();
}
}
private void ConnectCore()
{
try
{
var serverUrl = $"opcda://{_config.ServerHost}/{_config.ServerProgId}";
_opcServer = new TsCDaServer();
_opcServer.Connect(serverUrl);
var subscriptionState = new TsCDaSubscriptionState
{
Name = "GatewayGroup",
Active = true,
UpdateRate = _config.UpdateRateMs,
Deadband = 0
};
_opcSubscription = (TsCDaSubscription)_opcServer.CreateSubscription(subscriptionState);
_opcSubscription.DataChangedEvent += OnDataChanged;
IsConnected = true;
StartCacheCleanup();
_logger.LogInformation("OPC 连接成功: {ProgId}@{Host}",
_config.ServerProgId, _config.ServerHost);
ConnectionStatusChanged?.Invoke(this,
$"已连接 → {_config.ServerProgId}@{_config.ServerHost}");
}
catch (Exception ex)
{
IsConnected = false;
_logger.LogError(ex, "OPC 连接失败");
ConnectionStatusChanged?.Invoke(this, $"连接失败: {ex.Message}");
throw new InvalidOperationException($"OPC 连接失败: {ex.Message}", ex);
}
}
public void Disconnect()
{
lock (_comLock)
{
DisconnectCore();
}
}
private void DisconnectCore()
{
_cacheCleanupTimer?.Dispose();
_cacheCleanupTimer = null;
try
{
if (_opcSubscription is not null)
_opcSubscription.DataChangedEvent -= OnDataChanged;
}
catch { }
try { _opcSubscription?.Dispose(); } catch { }
try { _opcServer?.Disconnect(); } catch { }
try { _opcServer?.Dispose(); } catch { }
_opcSubscription = null;
_opcServer = null;
_subscribedTags.Clear();
IsConnected = false;
_logger.LogInformation("OPC 连接已断开");
ConnectionStatusChanged?.Invoke(this, "已断开");
}
public async Task<TagValue> ReadTagAsync(
string tagName,
CancellationToken ct = default)
{
return await Task.Run(() => ReadTagCore(tagName), ct);
}
private TagValue ReadTagCore(string tagName)
{
if (_cache.TryGetValue(tagName, out var cached) &&
!cached.IsExpired(TimeSpan.FromSeconds(_config.CacheTtlSeconds)))
{
return TagValue.FromCache(tagName, cached);
}
lock (_comLock)
{
return ReadFromOpcCore(tagName);
}
}
private TagValue ReadFromOpcCore(string tagName)
{
try
{
EnsureConnected();
EnsureItemsSubscribed([tagName]);
var readItems = new[]
{
new TsCDaItem
{
ItemName = tagName,
MaxAgeSpecified = true,
MaxAge = 0,
Active = true,
ActiveSpecified = true
}
};
var itemValues = _opcServer!.Read(readItems);
if (itemValues.Length == 0 || itemValues[0].Result.IsError())
{
var err = itemValues.Length == 0 ? "读取结果为空" : itemValues[0].Result.Description();
return TagValue.Error(tagName, err);
}
var value = itemValues[0].Value;
var q = ConvertQuality(itemValues[0].Quality);
UpdateCache(tagName, value, q);
return new TagValue
{
Name = tagName,
Value = value,
Quality = q,
Timestamp = DateTime.Now,
Source = "OPC"
};
}
catch (Exception ex)
{
_logger.LogWarning(ex, "读取点位失败: {Tag}", tagName);
return TagValue.Error(tagName, ex.Message);
}
}
public async Task<List<TagValue>> ReadTagsAsync(
string[] tagNames,
CancellationToken ct = default)
{
return await Task.Run(() => ReadTagsBatchCore(tagNames), ct);
}
public async Task<TagValue> WriteTagAsync(
string tagName,
object? value,
CancellationToken ct = default)
{
return await Task.Run(() => WriteTagCore(tagName, value), ct);
}
private TagValue WriteTagCore(string tagName, object? value)
{
lock (_comLock)
{
try
{
EnsureConnected();
EnsureItemsSubscribed([tagName]);
var writeItems = new[]
{
new TsCDaItemValue
{
ItemName = tagName,
Value = value
}
};
_opcServer!.Write(writeItems);
const short writeQuality = 192;
UpdateCache(tagName, value, writeQuality);
var result = new TagValue
{
Name = tagName,
Value = value,
Quality = writeQuality,
Timestamp = DateTime.Now,
Source = "OPC_Write"
};
TagDataChanged?.Invoke(this, result);
return result;
}
catch (Exception ex)
{
_logger.LogWarning(ex, "写入点位失败: {Tag}", tagName);
return TagValue.Error(tagName, ex.Message);
}
}
}
public async Task<List<TagValue>> WriteTagsAsync(
Dictionary<string, object?> tagValues,
CancellationToken ct = default)
{
return await Task.Run(() => WriteTagsBatchCore(tagValues), ct);
}
private List<TagValue> WriteTagsBatchCore(Dictionary<string, object?> tagValues)
{
lock (_comLock)
{
var results = new List<TagValue>();
try
{
EnsureConnected();
var validItems = tagValues
.Where(kv => !string.IsNullOrWhiteSpace(kv.Key))
.ToArray();
if (validItems.Length == 0)
return results;
EnsureItemsSubscribed(validItems.Select(kv => kv.Key));
var writeItems = validItems
.Select(kv => new TsCDaItemValue
{
ItemName = kv.Key,
Value = kv.Value
})
.ToArray();
_opcServer!.Write(writeItems);
const short writeQuality = 192;
var now = DateTime.Now;
foreach (var kv in validItems)
{
UpdateCache(kv.Key, kv.Value, writeQuality);
var tagValue = new TagValue
{
Name = kv.Key,
Value = kv.Value,
Quality = writeQuality,
Timestamp = now,
Source = "OPC_WriteBatch"
};
results.Add(tagValue);
TagDataChanged?.Invoke(this, tagValue);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "批量写入失败");
foreach (var kv in tagValues)
results.Add(TagValue.Error(kv.Key, ex.Message));
}
return results;
}
}
private List<TagValue> ReadTagsBatchCore(string[] tagNames)
{
lock (_comLock)
{
return ReadTagsBatchLocked(tagNames);
}
}
private List<TagValue> ReadTagsBatchLocked(string[] tagNames)
{
var results = new List<TagValue>();
try
{
EnsureConnected();
EnsureItemsSubscribed(tagNames);
var readItems = tagNames.Select(name => new TsCDaItem
{
ItemName = name,
MaxAgeSpecified = true,
MaxAge = 0,
Active = true,
ActiveSpecified = true
}).ToArray();
var itemValues = _opcServer!.Read(readItems);
foreach (var valueResult in itemValues)
{
if (valueResult.Result.IsError())
{
results.Add(TagValue.Error(valueResult.ItemName, valueResult.Result.Description()));
continue;
}
var q = ConvertQuality(valueResult.Quality);
UpdateCache(valueResult.ItemName, valueResult.Value, q);
results.Add(new TagValue
{
Name = valueResult.ItemName,
Value = valueResult.Value,
Quality = q,
Timestamp = valueResult.Timestamp,
Source = "OPC_Batch"
});
}
}
catch (Exception ex)
{
_logger.LogError(ex, "批量读取失败");
}
return results;
}
private void OnDataChanged(
object subscriptionHandle,
object requestHandle,
TsCDaItemValueResult[] values)
{
foreach (var valueResult in values)
{
if (valueResult.Result.IsError())
continue;
var tagName = valueResult.ItemName;
var quality = ConvertQuality(valueResult.Quality);
var value = valueResult.Value;
UpdateCache(tagName, value, quality);
TagDataChanged?.Invoke(this, new TagValue
{
Name = tagName,
Value = value,
Quality = quality,
Timestamp = valueResult.Timestamp,
Source = "Subscription"
});
}
}
private void EnsureConnected()
{
if (!IsConnected || _opcServer is null || _opcSubscription is null)
throw new InvalidOperationException("OPC 未连接,请先调用 Connect()");
}
private void EnsureItemsSubscribed(IEnumerable<string> tagNames)
{
var pending = tagNames
.Where(name => !_subscribedTags.ContainsKey(name))
.Distinct()
.Select(name => new TsCDaItem
{
ItemName = name,
ClientHandle = name,
Active = true,
ActiveSpecified = true
})
.ToArray();
if (pending.Length == 0)
return;
var addResults = _opcSubscription!.AddItems(pending);
foreach (var added in addResults)
{
if (added.Result.IsSuccess())
_subscribedTags[added.ItemName] = 0;
else
_logger.LogWarning("订阅点位失败: {Tag}, {Error}", added.ItemName, added.Result.Description());
}
}
private void UpdateCache(string? tagName, object? value, short quality)
{
if (tagName is null) return;
_cache[tagName] = new TagCache
{
Value = value,
Quality = quality,
Timestamp = DateTime.Now
};
}
private static short ConvertQuality(TsCDaQuality quality)
{
return quality.QualityBits switch
{
TsDaQualityBits.Good => 192,
TsDaQualityBits.GoodLocalOverride => 216,
_ => 0
};
}
private void StartCacheCleanup()
{
_cacheCleanupTimer = new System.Threading.Timer(
_ => CleanupExpiredCache(),
null,
TimeSpan.FromMinutes(_config.CacheCleanupMinutes),
TimeSpan.FromMinutes(_config.CacheCleanupMinutes)
);
}
private void CleanupExpiredCache()
{
var expiredKeys = _cache
.Where(kv => kv.Value.IsExpired(TimeSpan.FromMinutes(_config.CacheCleanupMinutes)))
.Select(kv => kv.Key)
.ToList();
foreach (var key in expiredKeys)
_cache.TryRemove(key, out _);
_logger.LogDebug("缓存清理完成,移除 {Count} 个过期条目", expiredKeys.Count);
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
Disconnect();
}
}
批量读写是 OPC DA 协议里为数不多的"现代化"特性,一定要用上。核心是 _opcServer.Read(readItems[]),一次调用读取所有点位:
csharpvar readItems = tagNames.Select(name => new TsCDaItem
{
ItemName = name,
MaxAgeSpecified = true,
MaxAge = 0, // 强制从设备读,不走服务器缓存
Active = true,
ActiveSpecified = true
}).ToArray();
var itemValues = _opcServer!.Read(readItems);
DataChange 事件是 OPC DA 的推送机制,比轮询高效得多。但事件回调是 COM 线程触发的,直接在回调里做业务处理风险很高。咱们用 Channel<TagValue> 做缓冲:
csharp// 有界 Channel,满了丢最旧的数据——工控场景下实时性优先于完整性
_channel = Channel.CreateBounded<TagValue>(new BoundedChannelOptions(2000)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleWriter = false,
SingleReader = false
});
生产侧(COM 回调)只做 TryWrite,非阻塞,不会卡住 COM 线程。消费侧(UI 或下游服务)用 ReadAllAsync 异步消费,天然支持背压。
c#using System.Threading.Channels;
using AppOpcDaGateway.Models;
using Microsoft.Extensions.Logging;
namespace AppOpcDaGateway.Services;
public sealed class OpcRealtimeAdapter : IDisposable
{
private readonly ITagReader _reader;
private readonly ILogger<OpcRealtimeAdapter> _logger;
private readonly Channel<TagValue> _channel;
private bool _disposed;
public ChannelReader<TagValue> DataStream => _channel.Reader;
public OpcRealtimeAdapter(ITagReader reader, ILogger<OpcRealtimeAdapter> logger)
{
_reader = reader;
_logger = logger;
_channel = Channel.CreateBounded<TagValue>(new BoundedChannelOptions(2000)
{
FullMode = BoundedChannelFullMode.DropOldest,
SingleWriter = false,
SingleReader = false
});
_reader.TagDataChanged += OnTagDataChanged;
}
private void OnTagDataChanged(object? sender, TagValue tagValue)
{
_channel.Writer.TryWrite(tagValue);
}
public async Task ConsumeAsync(
Func<TagValue, Task> handler,
CancellationToken ct = default)
{
await foreach (var tagValue in _channel.Reader.ReadAllAsync(ct))
{
try
{
await handler(tagValue);
}
catch (Exception ex)
{
_logger.LogWarning(ex, "数据处理异常: {Tag}", tagValue.Name);
}
}
}
public void Dispose()
{
if (_disposed) return;
_disposed = true;
_reader.TagDataChanged -= OnTagDataChanged;
_channel.Writer.TryComplete();
}
}
这是本文最有意思的一个设计点。Program.cs 里同时启动 WinForms 和 WebApplication,共享同一套 DI 容器:
csharpusing AppOpcDaGateway.Forms;
using AppOpcDaGateway.Models;
using AppOpcDaGateway.Services;
using Microsoft.AspNetCore.Builder;
using Microsoft.AspNetCore.Http;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using System.Text.Json;
using System.Threading.Channels;
namespace AppOpcDaGateway;
internal static class Program
{
[STAThread]
private static void Main()
{
ApplicationConfiguration.Initialize();
var services = BuildServiceCollection();
using var provider = services.BuildServiceProvider();
var webApi = StartGatewayWebApi(provider);
var webApiTask = webApi.RunAsync();
try
{
var mainForm = provider.GetRequiredService<FrmMain>();
Application.Run(mainForm);
}
finally
{
webApi.StopAsync().GetAwaiter().GetResult();
webApi.DisposeAsync().AsTask().GetAwaiter().GetResult();
webApiTask.GetAwaiter().GetResult();
}
}
// ── DI 注册 ───────────────────────────────────────────────────────────
private static ServiceCollection BuildServiceCollection()
{
var services = new ServiceCollection();
var config = ConnectionConfig.LoadFromJson();
// 日志
services.AddLogging(builder =>
{
builder.AddConsole();
builder.SetMinimumLevel(LogLevel.Debug);
});
// 配置(单例,运行期可由 FrmSettings 更新)
services.AddSingleton(config);
// OPC DA 网关(单例,COM 对象不能多实例)
services.AddSingleton<OpcDaGateway>();
services.AddSingleton<ITagReader>(sp => sp.GetRequiredService<OpcDaGateway>());
// 实时适配器(单例,依赖网关)
services.AddSingleton<OpcRealtimeAdapter>();
// 主窗体
services.AddTransient<FrmMain>();
return services;
}
private static WebApplication StartGatewayWebApi(ServiceProvider provider)
{
var config = provider.GetRequiredService<ConnectionConfig>();
var gateway = provider.GetRequiredService<OpcDaGateway>();
var loggerFactory = provider.GetRequiredService<ILoggerFactory>();
var builder = WebApplication.CreateSlimBuilder();
builder.Services.AddSingleton(config);
builder.Services.AddSingleton(gateway);
builder.Services.AddSingleton<ITagReader>(gateway);
builder.Services.AddSingleton(loggerFactory);
var app = builder.Build();
app.Urls.Add($"http://localhost:{config.GatewayHttpPort}");
app.MapGet("/", (ConnectionConfig cfg) => Results.Ok(new
{
service = "AppOpcDaGateway",
api = "OPC DA REST",
port = cfg.GatewayHttpPort,
endpoints = new[]
{
"GET /api/opc/status",
"POST /api/opc/connect",
"POST /api/opc/disconnect",
"GET /api/opc/tags/read?name=TagName",
"POST /api/opc/tags/read-batch",
"POST /api/opc/tags/write",
"POST /api/opc/tags/write-batch",
"GET /api/opc/test-doc",
"GET /api/opc/events"
}
}));
app.MapGet("/api/opc/test-doc", (ConnectionConfig cfg) =>
{
var baseUrl = $"http://localhost:{cfg.GatewayHttpPort}";
var lines = new[]
{
"# OPC DA Gateway API 测试说明(可直接复制)",
"",
"## 1) 连接",
$"curl -X POST \"{baseUrl}/api/opc/connect\"",
"",
"## 2) 单点读取",
$"curl \"{baseUrl}/api/opc/tags/read?name=Channel1.Device1.Tag1\"",
"",
"## 3) 批量读取",
$"curl -X POST \"{baseUrl}/api/opc/tags/read-batch\" -H \"Content-Type: application/json\" -d \"{{\\\"tagNames\\\":[\\\"Channel1.Device1.Tag1\\\",\\\"Channel1.Device1.Tag2\\\"]}}\"",
"",
"## 4) 单点写入",
$"curl -X POST \"{baseUrl}/api/opc/tags/write\" -H \"Content-Type: application/json\" -d \"{{\\\"tagName\\\":\\\"Channel1.Device1.Tag1\\\",\\\"value\\\":123}}\"",
"",
"## 5) 批量写入",
$"curl -X POST \"{baseUrl}/api/opc/tags/write-batch\" -H \"Content-Type: application/json\" -d \"{{\\\"items\\\":[{{\\\"tagName\\\":\\\"Channel1.Device1.Tag1\\\",\\\"value\\\":111}},{{\\\"tagName\\\":\\\"Channel1.Device1.Tag2\\\",\\\"value\\\":222}}]}}\"",
"",
"## 6) 断开",
$"curl -X POST \"{baseUrl}/api/opc/disconnect\""
};
return Results.Text(string.Join(Environment.NewLine, lines), "text/plain; charset=utf-8");
});
app.MapGet("/api/opc/status", (OpcDaGateway reader) => Results.Ok(new
{
connected = reader.IsConnected,
serverHost = config.ServerHost,
serverProgId = config.ServerProgId
}));
app.MapPost("/api/opc/connect", (OpcDaGateway reader) =>
{
try
{
if (!reader.IsConnected)
reader.Connect();
return Results.Ok(new { connected = reader.IsConnected });
}
catch (Exception ex)
{
return Results.Problem(ex.Message, statusCode: StatusCodes.Status500InternalServerError);
}
});
app.MapPost("/api/opc/disconnect", (OpcDaGateway reader) =>
{
try
{
if (reader.IsConnected)
reader.Disconnect();
return Results.Ok(new { connected = reader.IsConnected });
}
catch (Exception ex)
{
return Results.Problem(ex.Message, statusCode: StatusCodes.Status500InternalServerError);
}
});
app.MapGet("/api/opc/tags/read", async (
string name,
OpcDaGateway reader,
CancellationToken ct) =>
{
if (string.IsNullOrWhiteSpace(name))
return Results.BadRequest("参数 name 不能为空");
var result = await reader.ReadTagAsync(name, ct);
return Results.Ok(result);
});
app.MapPost("/api/opc/tags/read-batch", async (
ReadBatchRequest request,
OpcDaGateway reader,
CancellationToken ct) =>
{
var tags = request.TagNames
.Where(x => !string.IsNullOrWhiteSpace(x))
.Distinct(StringComparer.Ordinal)
.ToArray();
if (tags.Length == 0)
return Results.BadRequest("TagNames 不能为空");
var result = await reader.ReadTagsAsync(tags, ct);
return Results.Ok(result);
});
app.MapPost("/api/opc/tags/write", async (
WriteTagRequest request,
OpcDaGateway reader,
CancellationToken ct) =>
{
if (string.IsNullOrWhiteSpace(request.TagName))
return Results.BadRequest("TagName 不能为空");
var value = NormalizeValue(request.Value);
var result = await reader.WriteTagAsync(request.TagName, value, ct);
return Results.Ok(result);
});
app.MapPost("/api/opc/tags/write-batch", async (
WriteBatchRequest request,
OpcDaGateway reader,
CancellationToken ct) =>
{
var tags = request.Items
.Where(x => !string.IsNullOrWhiteSpace(x.TagName))
.GroupBy(x => x.TagName, StringComparer.Ordinal)
.Select(g => g.Last())
.ToDictionary(x => x.TagName, x => NormalizeValue(x.Value), StringComparer.Ordinal);
if (tags.Count == 0)
return Results.BadRequest("Items 不能为空");
var result = await reader.WriteTagsAsync(tags, ct);
return Results.Ok(result);
});
app.MapGet("/api/opc/events", async (
HttpContext context,
OpcDaGateway reader,
CancellationToken ct) =>
{
context.Response.Headers.Append("Cache-Control", "no-cache");
context.Response.Headers.Append("Connection", "keep-alive");
context.Response.Headers.Append("X-Accel-Buffering", "no");
context.Response.ContentType = "text/event-stream";
var channel = Channel.CreateUnbounded<TagValue>();
var jsonOptions = new JsonSerializerOptions(JsonSerializerDefaults.Web);
void Handler(object? _, TagValue value) => channel.Writer.TryWrite(value);
reader.TagDataChanged += Handler;
try
{
await foreach (var tag in channel.Reader.ReadAllAsync(ct))
{
var payload = JsonSerializer.Serialize(tag, jsonOptions);
await context.Response.WriteAsync($"event: tag\ndata: {payload}\n\n", ct);
await context.Response.Body.FlushAsync(ct);
}
}
catch (OperationCanceledException)
{
// ignored
}
finally
{
reader.TagDataChanged -= Handler;
channel.Writer.TryComplete();
}
});
return app;
}
private sealed record ReadBatchRequest
{
public string[] TagNames { get; init; } = [];
}
private sealed record WriteTagRequest
{
public string TagName { get; init; } = string.Empty;
public object? Value { get; init; }
}
private sealed record WriteBatchRequest
{
public WriteTagRequest[] Items { get; init; } = [];
}
private static object? NormalizeValue(object? value)
{
if (value is not JsonElement json)
return value;
return json.ValueKind switch
{
JsonValueKind.Null => null,
JsonValueKind.True => true,
JsonValueKind.False => false,
JsonValueKind.String => json.GetString(),
JsonValueKind.Number when json.TryGetInt64(out var l) => l,
JsonValueKind.Number when json.TryGetDouble(out var d) => d,
_ => json.ToString()
};
}
}
对外暴露的 API 端点一览:
GET /api/opc/status → 连接状态查询 POST /api/opc/connect → 建立 OPC 连接 POST /api/opc/disconnect → 断开连接 GET /api/opc/tags/read → 单点读取 (?name=TagName) POST /api/opc/tags/read-batch → 批量读取 POST /api/opc/tags/write → 单点写入 POST /api/opc/tags/write-batch→ 批量写入 GET /api/opc/events → SSE 实时推送(长连接) GET /api/opc/test-doc → curl 测试命令速查
SSE 端点 /api/opc/events 是个亮点——前端或其他服务可以建立长连接,OPC 数据变化时服务端主动推送,不需要轮询:
javascript// 前端接入示例
const es = new EventSource('http://localhost:5050/api/opc/events');
es.addEventListener('tag', e => {
const tag = JSON.parse(e.data);
console.log(`${tag.name} = ${tag.value}`);
});
ConnectionConfig 支持从 opcda.settings.json 加载和保存,程序启动时自动读取上次的配置:
csharp// Program.cs 启动时
var config = ConnectionConfig.LoadFromJson();
services.AddSingleton(config);
// FrmSettings 保存时
_liveConfig.Apply(frm.ResultConfig); // 写回运行时单例
_liveConfig.SaveToJson(); // 持久化到文件
Apply 方法直接修改单例的属性值,下次 Connect() 调用时新配置自动生效,不需要重启程序。
界面分四个 Tab,职责清晰:
所有控件命名严格遵循前缀规范(btn、lbl、txt、dgv、tlp、grp 等),Designer.cs 里只做控件实例化和属性赋值,事件绑定全部在 WireEvents() 方法里集中管理,IDE 界面设计器可以正常打开和编辑。
布局全部用 TableLayoutPanel + FlowLayoutPanel 实现,禁用绝对坐标定位,窗体缩放时控件自适应,不会出现控件重叠或消失的问题。
坑一:AddItem 重复调用。 OpcClientSdk 里对同一个 Tag 重复订阅会返回错误。工程里用 _subscribedTags 字典记录已订阅的点位,EnsureItemsSubscribed 方法做幂等保护,只订阅新增的点位。
坑二:写入值的类型匹配。 REST API 收到的 JSON 数值是 JsonElement,直接传给 OPC Server 会报类型错误。NormalizeValue 方法做了类型转换——JsonValueKind.Number 优先尝试 Int64,再 fallback 到 Double,覆盖大多数工控场景。
坑三:async void 的异常吞噬。 BtnConnect_Click 这类事件处理器必须是 async void,但异常不会自动冒泡。工程里统一用 try/catch 包住,异常写入日志面板,不会静默失败。
坑四:Channel 容量设置。 BoundedChannelFullMode.DropOldest 适合工控实时场景——宁可丢历史数据,也要保证最新数据能进来。如果你的场景要求数据完整性(比如报警记录),改成 DropNewest 或者直接用无界 Channel,但要注意内存增长风险。
坑五:DCOM 防火墙。 远程连接 OPC Server 时,DCOM 动态端口范围是 49152~65535,防火墙只开 135 端口不够。用 OpcClientSdk 的 URL 方式连接,底层走 TCP,配置比裸 DCOM 简单很多,但防火墙规则还是要确认。
你们现在的 OPC DA 对接是什么方案?还在用 OPCAutomation COM Interop,还是已经迁到托管库了?
另外有个值得讨论的问题:WinForms 内嵌 WebApplication 这种架构,在工控场景下你觉得合适吗?还是更倾向于把网关单独部署成 Windows Service?两种方案各有取舍,欢迎在评论区聊聊实际项目里的选择。
掌握了本文的基础之后,下一步可以往这几个方向延伸:
协议升级:OPC UA 是 OPC DA 的现代化继任者,支持跨平台、内置安全机制。ITagReader 接口已经为切换留好了口子,可以参考 OPC Foundation 的 UA .NET Standard 库实现 OpcUaTagReader。
数据持久化:时序数据库(InfluxDB、TDengine)比关系型数据库更适合存储 OPC 点位数据,写入性能和压缩率都高出一个数量级。
消息队列接入:在 OpcRealtimeAdapter.ConsumeAsync 里接入 Kafka 或 RabbitMQ,可以把实时数据推送给多个下游消费者,解耦网关和业务系统。
部署方案:把 WinForms 换成 Windows Service + Minimal API,去掉 UI 依赖,可以在无人值守的服务器上运行,更符合生产环境的部署要求。
技术标签: C# · .NET 8 · OPC DA · WinForms · 工控系统 · REST API · 遗留系统改造
相关信息
我用夸克网盘给你分享了「AppOpcDaGateway.zip」,点击链接或复制整段内容,打开「夸克APP」即可获取。
/55653YXXeV:/
链接:https://pan.quark.cn/s/f4a3b06b5e70
提取码:XgxA
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!