最近在技术群里看到一位朋友抱怨:"用BlockingCollection做任务队列,停止后再启动就不工作了,任务加进去但不执行,这是什么鬼?"
相信很多C#开发者都遇到过类似问题。BlockingCollection作为.NET提供的线程安全集合,看似简单易用,但在实际项目中却暗藏不少陷阱。停止重启失效就是其中最典型的坑点之一。
今天就通过一个完整的WinForm实战案例,彻底解决这个问题,让你的多线程编程更加稳定可靠!
当我们调用BlockingCollection的停止方法后,底层发生了什么?
C#public void Stop()
{
_collection.CompleteAdding(); // 标记不再接受新任务
_processingTask.Wait(); // 等待处理线程结束
}
问题就在这里! _processingTask一旦结束,就无法重新启动。很多开发者以为重新调用EnqueueTask就能恢复工作,实际上处理线程已经"死"了。
C#// ❌ 错误:以为重新入队就能工作
private void btnStart_Click(object sender, EventArgs e)
{
if (_processor?.IsRunning != true)
{
// 只是重新入队,但处理线程已经结束!
foreach (var task in pendingTasks)
{
_processor.EnqueueTask(task);
}
}
}
要解决这个问题,需要实现一个完整的重启机制:
C#using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace AppBlockingCollectionDemo
{
public class BlockingTaskProcessor : IDisposable
{
private BlockingCollection<TaskItem> _collection;
private Task _processingTask;
private volatile bool _isRunning = false;
private volatile bool _isDisposed = false;
// 事件:任务状态变化通知
public event Action<TaskItem> TaskStatusChanged;
public event Action<string> LogMessage;
public BlockingTaskProcessor()
{
Initialize();
}
private void Initialize()
{
if (_isDisposed) return;
_collection = new BlockingCollection<TaskItem>();
_isRunning = true;
_processingTask = Task.Run(ProcessTasks);
LogMessage?.Invoke("任务处理器已启动");
}
public void EnqueueTask(TaskItem task)
{
if (_isDisposed || _collection?.IsAddingCompleted == true || !_isRunning)
{
LogMessage?.Invoke("处理器已停止,无法添加新任务");
return;
}
try
{
_collection.Add(task);
LogMessage?.Invoke($"任务已入队: {task.Name}");
}
catch (InvalidOperationException)
{
LogMessage?.Invoke("处理器已停止,无法添加新任务");
}
}
private void ProcessTasks()
{
try
{
LogMessage?.Invoke("处理线程已启动,开始监听任务队列...");
foreach (var task in _collection.GetConsumingEnumerable())
{
if (!_isRunning)
{
LogMessage?.Invoke("收到停止信号,退出处理循环");
break;
}
ProcessSingleTask(task);
}
}
catch (InvalidOperationException)
{
LogMessage?.Invoke("任务队列已关闭");
}
catch (Exception ex)
{
LogError(ex);
}
finally
{
LogMessage?.Invoke("任务处理线程已结束");
}
}
private void ProcessSingleTask(TaskItem task)
{
try
{
// 更新状态为处理中
task.Status = TaskStatus.Processing;
TaskStatusChanged?.Invoke(task);
LogMessage?.Invoke($"开始处理任务: {task.Name}");
// 模拟任务处理时间(1-3秒)
var random = new Random();
var processingTime = random.Next(1000, 3000);
// 使用 CancellationToken 来支持优雅停止
var startTime = DateTime.Now;
while ((DateTime.Now - startTime).TotalMilliseconds < processingTime)
{
if (!_isRunning) return; // 如果被停止,直接返回
Thread.Sleep(100); // 小间隔检查
}
// 模拟任务可能失败(10%概率)
if (random.Next(1, 11) == 1)
{
task.Status = TaskStatus.Failed;
LogMessage?.Invoke($"任务处理失败: {task.Name}");
}
else
{
task.Status = TaskStatus.Completed;
LogMessage?.Invoke($"任务处理完成: {task.Name} (耗时: {processingTime / 1000.0:F1}秒)");
}
TaskStatusChanged?.Invoke(task);
}
catch (Exception ex)
{
task.Status = TaskStatus.Failed;
TaskStatusChanged?.Invoke(task);
LogError(ex);
}
}
public void Stop()
{
if (!_isRunning)
{
LogMessage?.Invoke("处理器已经停止");
return;
}
LogMessage?.Invoke("正在停止任务处理器...");
_isRunning = false;
try
{
_collection?.CompleteAdding();
if (_processingTask?.Wait(TimeSpan.FromSeconds(10)) == true)
{
LogMessage?.Invoke("任务处理器已正常停止");
}
else
{
LogMessage?.Invoke("任务处理器停止超时");
}
}
catch (Exception ex)
{
LogMessage?.Invoke($"停止处理器时发生错误: {ex.Message}");
}
}
public void Restart()
{
LogMessage?.Invoke("重启任务处理器...");
Stop();
// 等待一小段时间确保资源释放
Thread.Sleep(100);
Initialize();
}
public int PendingTaskCount => _collection?.Count ?? 0;
public bool IsRunning => _isRunning && !(_collection?.IsAddingCompleted ?? true) && !_isDisposed;
private void LogError(Exception ex)
{
LogMessage?.Invoke($"错误: {ex.Message}");
}
public void Dispose()
{
if (_isDisposed) return;
_isDisposed = true;
Stop();
_collection?.Dispose();
_processingTask?.Dispose();
LogMessage?.Invoke("任务处理器已释放资源");
}
}
}
C#private void btnStart_Click(object sender, EventArgs e)
{
try
{
if (_processor?.IsRunning != true)
{
// 🔥 关键:使用重启机制而不是简单的重新入队
if (_processor == null)
{
StartProcessor();
}
else
{
_processor.Restart(); // 重启处理器
}
// 重新处理未完成的任务
var pendingTasks = _allTasks.Where(t =>
t.Status == TaskStatus.Pending ||
t.Status == TaskStatus.Processing).ToList();
foreach (var task in pendingTasks)
{
task.Status = TaskStatus.Pending;
_processor.EnqueueTask(task);
}
OnLogMessage($"处理器已启动,重新入队 {pendingTasks.Count} 个任务");
RefreshTaskList();
}
}
catch (Exception ex)
{
OnLogMessage($"启动失败: {ex.Message}");
}
finally
{
UpdateUIStatus();
}
}

C#// ❌ 危险:可能导致死锁
private void ProcessTask(TaskItem task)
{
var result = SomeAsyncMethod().Wait(); // 危险操作
}
// ✅ 正确:使用GetAwaiter().GetResult()
private void ProcessTask(TaskItem task)
{
var result = SomeAsyncMethod().GetAwaiter().GetResult();
}
C#// ❌ 错误:没有正确释放资源
public void Dispose()
{
_collection.CompleteAdding(); // 不够!
}
// ✅ 正确:完整的资源释放
public void Dispose()
{
Stop();
_collection?.Dispose();
_processingTask?.Dispose();
}
C#// ❌ 错误:状态判断过于简单
public bool IsRunning => _isRunning;
// ✅ 正确:综合判断多个状态
public bool IsRunning => _isRunning &&
!(_collection?.IsAddingCompleted ?? true) &&
!_isDisposed;
C#private void ProcessSingleTask(TaskItem task)
{
var startTime = DateTime.Now;
while ((DateTime.Now - startTime).TotalMilliseconds < processingTime)
{
if (!_isRunning) return; // 支持优雅停止
Thread.Sleep(100);
}
}
C#// 限制队列容量
private readonly BlockingCollection<TaskItem> _collection =
new BlockingCollection<TaskItem>(boundedCapacity: 1000);
虽然本文讲解BlockingCollection的使用,但值得注意的是:
System.Threading.ChannelsC#// 现代替代方案预览
var channel = Channel.CreateUnbounded<TaskItem>();
await foreach (var task in channel.Reader.ReadAllAsync())
{
await ProcessTaskAsync(task);
}
收藏级代码模板:本文提供的BlockingTaskProcessor可以直接用于生产环境,解决了停止重启、优雅关闭、异常处理等核心问题。
技术金句:"多线程编程的难点不在于启动,而在于如何优雅地停止和重启"
💬 互动时间:
觉得这篇文章对你有帮助,请转发给更多需要的同行!让我们一起提升C#开发技能,避免踩坑!
关注我,获取更多实战性强的C#编程技巧和最佳实践!
相关信息
通过网盘分享的文件:AppBlockingCollectionDemo.zip 链接: https://pan.baidu.com/s/17WmJS-6wCPNEugyeIpIP5Q?pwd=wj3m 提取码: wj3m --来自百度网盘超级会员v9的分享
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!