在日常C#开发中,你是不是经常遇到这样的场景:用户在搜索框疯狂输入,每次输入都触发一次API调用;或者多个异步操作同时进行,结果却乱序返回,界面显示的数据"驴唇不对马嘴"?更糟糕的是,你写了一堆嵌套回调、状态机、线程同步代码,最后自己都看不懂了。
使用传统异步模式处理这类场景,代码量往往会膨胀3-5倍。但如果用Rx.NET,同样的功能只需要几行优雅的代码就能搞定。
读完这篇文章,你将掌握:
✅ Rx.NET的核心思想与适用场景
✅ 3个立即可用的实战案例(从入门到进阶)
✅ 规避常见陷阱的最佳实践
咱们直接上干货,用最简单的Console应用展示Rx.NET的魔力。
Rx.NET就是把异步数据源当作"可观察的集合"来处理,就像你用LINQ查询数据库一样自然。
传统的异步编程就像"被动接电话"——事件来了你得赶紧处理,代码分散在各个回调里。而Rx.NET则是"主动管理数据流"——你定义好规则,数据自动按你的要求流转。
✅ 适用场景:
❌ 不适用场景:
用户在搜索框输入时,每次按键都触发API调用,服务器压力山大,用户体验也差。传统做法需要手动管理Timer、清理旧请求,代码容易出错。
csharpusing System.Reactive.Linq;
using System.Reactive.Subjects;
namespace AppRxNet
{
internal class Program
{
static void Main(string[] args)
{
Console.OutputEncoding = System.Text.Encoding.UTF8;
Console.WriteLine("🔍 模拟搜索框输入(输入'exit'退出)\n");
// 创建一个Subject作为用户输入的数据源
var searchInput = new Subject<string>();
// 核心逻辑:防抖 + 去重 + 过滤
var searchStream = searchInput
.Throttle(TimeSpan.FromMilliseconds(500)) // 500ms内无新输入才触发
.DistinctUntilChanged() // 过滤连续重复值
.Where(term => !string.IsNullOrWhiteSpace(term) && term.Length >= 2);
// 订阅处理结果
searchStream.Subscribe(term =>
{
Console.WriteLine($"✅ 发起搜索请求: '{term}'");
// 这里可以调用真实API
});
// 模拟用户输入
while (true)
{
var input = Console.ReadLine();
if (input == "exit") break;
searchInput.OnNext(input);
}
Console.WriteLine("👋 程序结束");
}
}
}

| 传统方式 | Rx.NET方式 |
|---|---|
| 需要Timer管理 | 一行Throttle搞定 |
| 手动记录上次值 | DistinctUntilChanged自动处理 |
| 代码30-50行 | 核心逻辑5行 |
注意:这里看到是不是是事件订阅与发布。
你需要同时调用3个API获取数据,传统方式要么用Task.WhenAll(无法控制顺序),要么手写状态机(复杂且易错)。
csharpusing System.Reactive.Linq;
using System.Reactive.Subjects;
namespace AppRxNet
{
internal class Program
{
static void Main(string[] args)
{
Console.OutputEncoding = System.Text.Encoding.UTF8;
Console.WriteLine("🌐 并发请求示例\n");
// 模拟三个异步API调用
var api1 = SimulateApiCall("用户信息", 1000);
var api2 = SimulateApiCall("订单列表", 1500);
var api3 = SimulateApiCall("推荐商品", 800);
// 方案1:等待所有结果(类似Task.WhenAll)
Console.WriteLine("📦 方案1: 等待所有结果");
Observable.Zip(api1, api2, api3, (u, o, r) => new { User = u, Orders = o, Recommendations = r })
.Subscribe(
result => Console.WriteLine($"✅ 全部完成: {result.User}, {result.Orders}, {result.Recommendations}"),
error => Console.WriteLine($"❌ 错误: {error.Message}"),
() => Console.WriteLine("🏁 所有请求完成\n")
);
Console.ReadLine();
// 方案2:谁快用谁(最快响应优先)
Console.WriteLine("📦 方案2: 最快响应优先");
Observable.Amb(api1, api2, api3)
.Subscribe(
result => Console.WriteLine($"⚡ 最快返回: {result}"),
() => Console.WriteLine("🏁 完成\n")
);
Console.ReadLine();
// 方案3:按顺序逐个处理(Concat)
Console.WriteLine("📦 方案3: 按顺序执行");
Observable.Concat(api1, api2, api3)
.Subscribe(
result => Console.WriteLine($"📝 按序返回: {result}"),
() => Console.WriteLine("🏁 全部完成")
);
Console.ReadLine();
}
// 模拟异步API调用
static IObservable<string> SimulateApiCall(string name, int delayMs)
{
return Observable.FromAsync(async () =>
{
await Task.Delay(delayMs);
return $"{name}(耗时{delayMs}ms)";
});
}
}
}

| 操作符 | 场景 | 特点 |
|---|---|---|
| Zip | 需要所有结果才能继续 | 等待最慢的,结果配对 |
| Amb | 谁快用谁 | 只要最快的,其他取消 |
| Concat | 严格顺序执行 | 前一个完成才开始下一个 |
在实际项目中,我经常这样用:
注意:这个是不是有点Task的感觉!
用户快速切换搜索词时,旧的请求还没返回,新的请求又发出了。如果不处理,可能导致界面显示的是旧结果。
csharpinternal class Program
{
static void Main()
{
Console.OutputEncoding = System.Text.Encoding.UTF8;
Console.WriteLine("🔍 智能搜索(自动取消过期请求)\n");
Console.WriteLine("提示:快速输入多个词,观察只有最后一个请求返回结果\n");
var searchInput = new Subject<string>();
// 核心逻辑:Switch会自动取消未完成的旧请求
var intelligentSearch = searchInput
.Throttle(TimeSpan.FromMilliseconds(300))
.DistinctUntilChanged()
.Select(term =>
{
Console.WriteLine($"🚀 发起请求: '{term}'");
return SearchApi(term); // 返回Observable
})
.Switch() // 🔥 关键:自动取消旧请求,只保留最新的
.Subscribe(
result => Console.WriteLine($"✅ 收到结果: {result}"),
error => Console.WriteLine($"❌ 错误: {error.Message}")
);
// 模拟用户快速输入
while (true)
{
var input = Console.ReadLine();
if (input == "exit") break;
searchInput.OnNext(input);
}
intelligentSearch.Dispose();
Console.WriteLine("👋 程序结束");
}
// 模拟搜索API(随机延迟)
static IObservable<string> SearchApi(string term)
{
return Observable.FromAsync(async () =>
{
var delay = new Random().Next(500, 2000);
Console.WriteLine($" ⏳ '{term}' 查询中... (预计{delay}ms)");
await Task.Delay(delay);
return $"'{term}' 的搜索结果";
});
}
}
注意:Switch确保只有最后一个请求的结果会被处理,这个应用比较神奇。
Switch的工作机制:
csharp// 使用TakeUntil的替代方案
var searchStream = searchInput
.Throttle(TimeSpan.FromMilliseconds(300))
.DistinctUntilChanged()
.SelectMany(term =>
SearchApi(term).TakeUntil(searchInput) // 新输入到来时取消当前请求
);
Switch vs TakeUntil:
Switch:更简洁,语义清晰TakeUntil:更灵活,可以指定任意"取消信号"| 操作符 | 用途 | 典型场景 |
|---|---|---|
| Throttle | 防抖动 | 搜索框输入 |
| DistinctUntilChanged | 去除连续重复 | 过滤无效事件 |
| Where | 过滤 | 条件筛选 |
| Select | 转换 | 数据映射 |
| SelectMany | 展平嵌套 | 异步操作串联 |
| Switch | 切换最新 | 取消过期请求 |
| Zip | 配对组合 | 多数据源合并 |
| Merge | 合并流 | 多事件源统一处理 |
csharpusing (var subscription = observable.Subscribe(...))
{
// 业务逻辑
} // 自动清理
csharpobservable
.ObserveOn(SynchronizationContext.Current) // 切换到UI线程
.Subscribe(data => UpdateUI(data));
csharp// 错误示范
observable.Subscribe(x => Console.WriteLine(x));
// 正确做法
observable.Subscribe(
x => Console.WriteLine(x),
ex => Console.WriteLine($"错误: {ex.Message}"),
() => Console.WriteLine("完成")
);
Subject滥用
Subject既是Observer又是Observable,容易造成混乱。优先使用Observable.Create或FromEvent。
在Subscribe中执行重逻辑
应该把复杂逻辑放在操作符链中,保持Subscribe的简洁。
✅ Rx.NET把异步当数据流:用LINQ的思维处理异步,代码简洁3-5倍
✅ 组合操作符威力巨大:Throttle、Switch、Zip等解决90%的异步场景
✅ 记得资源管理:Dispose订阅、处理OnError、注意线程上下文
问题1:你在项目中遇到过哪些"异步地狱"场景?用传统方式是怎么解决的?
问题2:对比async/await,你觉得Rx.NET的优势和劣势分别是什么?
欢迎在评论区分享你的实战经验!如果这篇文章帮到你了,记得点赞+收藏,下次遇到异步问题时随时翻出来参考。
标签:#CSharp #响应式编程 #Rx.NET #异步编程 #性能优化
💡 一句话金句:
- "异步编程不是回调地狱,而是优雅的数据流"
- "Switch操作符:让过期请求自动消失的魔法"
- "Rx.NET = LINQ + 时间维度 + 异步能力"
本文作者:技术老小子
本文链接:
版权声明:本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!