深入淺出 IAsyncEnumerable:超越 async/await 的異步流式處理實(shí)戰(zhàn)指南
異步編程在現(xiàn)代 .NET 開發(fā)中無處不在。任務(wù)(Task)、async/await 和并行性是每個(gè)開發(fā)人員每天都要處理的主題。但是,當(dāng)您需要異步流式傳輸數(shù)據(jù)、處理大型數(shù)據(jù)集、使用分頁 API 或處理慢速 I/O 源時(shí),該怎么辦?這就是 IAsyncEnumerable 的用武之地。
如果您和大多數(shù)開發(fā)人員一樣,可能聽說過 IAsyncEnumerable,但還沒有找到合適的理由或正確的解釋將其應(yīng)用到實(shí)際項(xiàng)目中。在本次深度探討中,我將改變這一現(xiàn)狀。
我們將遠(yuǎn)遠(yuǎn)超出基礎(chǔ)示例的范圍。我將解釋:
? 什么是異步流(async streams)。
? 為什么它們不僅僅是另一個(gè)異步噱頭。
? 它們?nèi)绾谓鉀Q現(xiàn)實(shí)世界中的后端問題?
? 以及何時(shí)應(yīng)避免使用它們。
這是多部分系列的第一部分。在本部分中,我們將介紹理解 IAsyncEnumerable 所需的核心概念和實(shí)際使用模式。
之后,我們將深入探討高級(jí)模式、性能注意事項(xiàng)和流式 API。
在本指南結(jié)束時(shí),您將通過實(shí)際可運(yùn)行的示例(而不僅僅是理論)牢固掌握如何使用以及何時(shí)使用異步流。
讓我們開始吧。
簡史
異步流自 .NET Core 3.0 和 C# 8.0 起就成為 .NET 的一部分。引入它們是為了解決一個(gè)簡單但重要的問題:如何在不一次性加載所有內(nèi)容的情況下處理大型或連續(xù)數(shù)據(jù)?
在 IAsyncEnumerable 出現(xiàn)之前,您只有兩個(gè)不好的選擇:
1. 使用 Task<List<T>> 加載所有內(nèi)容,即將整個(gè)數(shù)據(jù)集加載到內(nèi)存中。
2. 使用基于事件或回調(diào)的設(shè)計(jì),這會(huì)使您的代碼復(fù)雜化。
有了 IAsyncEnumerable,.NET 為我們提供了一個(gè)簡潔、惰性、異步的流模型。您可以在數(shù)據(jù)產(chǎn)生時(shí)拉取數(shù)據(jù),而不會(huì)阻塞線程或?qū)е聝?nèi)存爆炸。
什么是 IAsyncEnumerable?
從其核心來說,IAsyncEnumerable<T> 正如其名:是 IEnumerable 的異步版本。但您不是同步拉取項(xiàng),而是使用 await foreach 異步拉取它們。
可以將其視為一個(gè)惰性數(shù)據(jù)管道:
? 您的方法不返回完整的列表。
? 它在一個(gè)項(xiàng)可用時(shí)逐個(gè)返回它們。
? 每個(gè)項(xiàng)都可以異步產(chǎn)生(例如,從文件讀取一行或等待 API 響應(yīng)之后)。
基礎(chǔ)示例:產(chǎn)生一個(gè)異步流
async IAsyncEnumerable<string> GetDataAsync()
{
    yield return await Task.FromResult("Item 1");
    yield return await Task.FromResult("Item 2");
    yield return await Task.FromResult("Item 3");
}每個(gè) yield return 都會(huì)暫停執(zhí)行并發(fā)出一個(gè)新值。async 關(guān)鍵字允許您在產(chǎn)生(yield)之前等待(await)異步操作。
消費(fèi)一個(gè)異步流:await foreach
要讀取異步流,您可以像這樣使用 await foreach:
await foreach (var item in GetDataAsync())
{
    Console.WriteLine(item);
}每個(gè)項(xiàng)一旦可用就會(huì)被處理,無需等待完整的數(shù)據(jù)集。
為什么不直接使用 Task<List>?
因?yàn)?nbsp;Task<List<T>> 迫使您等到整個(gè)數(shù)據(jù)集準(zhǔn)備就緒。異步流讓您能夠:
? 立即開始處理數(shù)據(jù)。
? 在結(jié)果產(chǎn)生時(shí)進(jìn)行流式傳輸。
? 高效處理海量或無限的數(shù)據(jù)集。
這是基礎(chǔ)。接下來,讓我們談?wù)勅绾萎a(chǎn)生和消費(fèi)現(xiàn)實(shí)世界中的異步流。
產(chǎn)生異步流
創(chuàng)建異步流意味著編寫一個(gè)返回 IAsyncEnumerable<T> 的方法,并使用 yield return 一次發(fā)射一個(gè)數(shù)據(jù)片段。
可以將其視為構(gòu)建一個(gè)惰性數(shù)據(jù)工廠,該工廠在請(qǐng)求項(xiàng)時(shí)異步產(chǎn)生它們。
示例:帶延遲的流式傳輸項(xiàng)這是一個(gè)模擬隨時(shí)間推移生成數(shù)據(jù)的簡單示例:
async IAsyncEnumerable<string> GenerateDataAsync()
{
    for (int i = 1; i <= 5; i++)
    {
        await Task.Delay(500); // 模擬異步工作(API 調(diào)用、文件讀取等)
        yield return $"Item {i}";
    }
}每個(gè) yield return 在模擬延遲后發(fā)出下一個(gè)項(xiàng)。
實(shí)際示例:逐行讀取大文件
您可以異步地逐行產(chǎn)生(yield)內(nèi)容,而不是將整個(gè)文件讀入內(nèi)存:
async IAsyncEnumerable<string> ReadLinesAsync(string filePath)
{
    using var reader = new StreamReader(filePath);
    while (!reader.EndOfStream)
    {
        yield return await reader.ReadLineAsync();
    }
}這使得消費(fèi)者可以在每行被讀取時(shí)處理它,而無需等待整個(gè)文件加載完畢。
資源作用域
請(qǐng)始終記?。?/p>
? 在流內(nèi)部使用 using 塊來管理可釋放資源。
? 一旦異步流完成,任何本地資源(如文件句柄)都會(huì)被釋放。
現(xiàn)在您知道了如何產(chǎn)生流,接下來讓我們探索如何有效地消費(fèi)它們。
消費(fèi)異步流
產(chǎn)生異步流只是故事的一半。要利用它,您需要以異步、逐項(xiàng)的方式消費(fèi)流。
這就是 await foreach 發(fā)揮作用的地方。
基礎(chǔ)消費(fèi)示例
await foreach (var item in GenerateDataAsync())
{
    Console.WriteLine($"Received: {item}");
}每個(gè)項(xiàng)一旦被產(chǎn)生(yielded)就會(huì)被立即處理,無需等待整個(gè)數(shù)據(jù)集。
為什么這很重要
與等待完整列表相比:
? 您可以立即開始工作。
? 您可以保持低內(nèi)存使用率。
? 您可以優(yōu)雅地處理連續(xù)或無限的數(shù)據(jù)流。
不要陷入這個(gè)陷阱
這樣做是錯(cuò)誤的:
var list = await GenerateDataAsync().ToListAsync();為什么?您將高效的流重新轉(zhuǎn)換為批量加載的列表,扼殺了它的主要優(yōu)勢。
如果您的目標(biāo)是流式傳輸,請(qǐng)堅(jiān)持使用 await foreach。
實(shí)時(shí)示例:處理大文件使用我們之前的 ReadLinesAsync 方法:
await foreach (var line in ReadLinesAsync(filePath))
{
    Console.WriteLine($"Line: {line}");
}每一行都被實(shí)時(shí)讀取、處理和顯示,非常適合等待會(huì)損害性能的大文件。
既然您已經(jīng)看到了兩個(gè)方面(產(chǎn)生和消費(fèi)),接下來讓我們將異步流應(yīng)用到實(shí)際用例中。
實(shí)際用例
讓我們超越玩具示例。以下是一些異步流可以大放異彩的現(xiàn)實(shí)場景。
1. 大文件處理
代替將整個(gè)文件讀入內(nèi)存:
await foreach (var line in ReadLinesAsync(filePath))
{
    ProcessLine(line);  // 在每行被讀取時(shí)立即處理它。
}非常適合:
? 日志處理。
? 大型 CSV 解析。
? 實(shí)時(shí)文件監(jiān)控。
2. 流式傳輸 API 數(shù)據(jù)
從遠(yuǎn)程 API 逐頁獲取數(shù)據(jù)?不要緩沖整個(gè)響應(yīng)。對(duì)其進(jìn)行流式傳輸。
將分頁 API 包裝為異步流的示例:
async IAsyncEnumerable<Order> FetchOrdersAsync()
{
    int page = 1;
    while (true)
    {
        var orders = await FetchPageAsync(page);
        if (orders.Count == 0)
            yield break;
        foreach (var order in orders)
            yield return order;
        page++;
    }
}這讓您的消費(fèi)者可以在訂單到達(dá)時(shí)處理它們:
await foreach (var order in FetchOrdersAsync())
{
    HandleOrder(order);
}無需等待整個(gè)數(shù)據(jù)集。您的 UI 或后端可以實(shí)時(shí)持續(xù)工作。
文件流和分頁 API 流這兩種模式是最常見的實(shí)際應(yīng)用。它們也是異步流通常優(yōu)于傳統(tǒng)的基于集合的方法的地方。
接下來,讓我們討論同樣重要的事情:何時(shí)不應(yīng)使用異步流。
何時(shí)不應(yīng)使用異步流盡管有諸多好處,但 IAsyncEnumerable 并不總是合適的工具。在以下情況下應(yīng)避免使用它:
小型數(shù)據(jù)集?不必麻煩
如果您正在處理:
? 十幾個(gè)左右的記錄,
? 一個(gè)簡短的項(xiàng)列表,
? 快速的內(nèi)存數(shù)據(jù)……
……就沒有理由引入異步迭代的開銷。一個(gè)簡單的 Task<List<T>> 甚至普通的同步循環(huán)更快、更清晰。
需要立即獲取完整結(jié)果?有時(shí),您需要所有數(shù)據(jù)才能開始處理,可能是因?yàn)椋?/p>
? 您正在聚合總計(jì),
? 應(yīng)用全局排序,或
? 將數(shù)據(jù)傳遞給僅接受列表的庫。
在這些情況下,異步流可能會(huì)在沒有真正收益的情況下使您的代碼復(fù)雜化。
您認(rèn)為它是并行的(劇透:它不是)
異步流默認(rèn)是順序的。它們允許您惰性地、異步地處理數(shù)據(jù),但不是并行地。
如果您的目標(biāo)是并行性,您將需要:
? 手動(dòng)緩沖項(xiàng),
? 啟動(dòng)任務(wù),或
? 使用其他模式,如 Parallel.ForEachAsync。
常見錯(cuò)誤和誤解
即使經(jīng)驗(yàn)豐富的開發(fā)人員也會(huì)誤用 IAsyncEnumerable。讓我們澄清一些最常見的誤解:
錯(cuò)誤 1:認(rèn)為它是并行的
許多開發(fā)人員將異步與并行混淆。IAsyncEnumerable 順序處理項(xiàng),只是是異步的。每次迭代都會(huì)等待前一次迭代完成,除非您顯式引入并行性。
錯(cuò)誤 2:忘記 await foreach
與常規(guī)的 foreach 不同,消費(fèi)異步流需要 await foreach。忘記這一點(diǎn)會(huì)導(dǎo)致令人困惑的編譯錯(cuò)誤。
await foreach (var item in GetDataAsync())
{
    // 處理項(xiàng)
}錯(cuò)誤 3:將流轉(zhuǎn)回列表
最諷刺的錯(cuò)誤之一:
var list = await GetDataAsync().ToListAsync();這破壞了流式傳輸?shù)娜恳饬x。您剛剛強(qiáng)制將流一次性全部放入內(nèi)存。
錯(cuò)誤 4:誤解惰性執(zhí)行異步流是惰性的。
生產(chǎn)者直到您開始迭代時(shí)才會(huì)運(yùn)行。
? 聲明方法時(shí)不會(huì)立即返回任何內(nèi)容。
? 當(dāng)通過 await foreach 請(qǐng)求第一個(gè)項(xiàng)時(shí),執(zhí)行才開始。
注意:異步流關(guān)乎效率,而非復(fù)雜性。除非它們能簡化您的代碼或改善內(nèi)存/資源使用情況,否則不要使用它們。
這就是 IAsyncEnumerable 的真實(shí)面貌。它不是某種理論抽象;而是一種工具,當(dāng)您處理大型或慢速數(shù)據(jù)時(shí),它可以解決實(shí)際問題。我們已經(jīng)探討了如何構(gòu)建異步流、如何正確消費(fèi)它們以及它們在哪些場景下有意義。更重要的是,您現(xiàn)在知道了在哪些場景下它們不適用。在下一部分中,我將帶您更深入地了解實(shí)際場景:流式 API、性能注意事項(xiàng)、錯(cuò)誤處理、取消以及 .NET 10 通過異步 LINQ 帶來的新特性。這才是真正有趣的地方。















 
 
 








 
 
 
 