2026年2月8日 星期日

使用非同步數據流 Asynchronous Stream 來處理迭代工作

使用非同步數據流 Asynchronous Stream 來處理迭代工作

為什麼現代系統需要非同步串流設計?在早期的系統設計中,只要把耗時工作包進非同步方法,等結果全部準備好再一次回傳,幾乎是理所當然的做法。然而,隨著系統面對的資料量越來越大、處理時間越來越長,這種「整包回傳」的思維,開始暴露出結構性的問題。這並不是效能調校的問題,而是資料交付模型本身就不符合現代系統的運作方式。

傳統非同步設計的限制,在沒有非同步串流的設計下,系統往往隱含一個假設:所有資料必須準備完成,才能開始被使用。,這會導致幾個常見現象,結果就是:系統感覺慢、重、且缺乏彈性。:

  • 明明資料已經陸續產生,卻必須等到最後一刻才能開始處理
  • 使用者長時間看到「等待中」,即使系統早已有部分結果
  • 大量資料被集中在記憶體中,造成不必要的壓力
  • 呼叫端無法中途停止、取消或根據前段結果調整行為

非同步串流解決的不是「速度」,而是「時間感」,非同步串流的核心價值,不在於跑得比較快,而在於讓結果可以「準備好就交付」。這種設計方式,讓系統行為更接近現實世界,換句話說,非同步串流讓「等待」被拆解,而不是被累積。:

  • 資料是逐步產生的,而不是瞬間完成的
  • 使用者可以邊接收、邊處理,而不是被迫等待
  • 系統可以在過程中決定是否繼續,而不是一次性賭到底

當資料變成「流」,系統就會變得不一樣,一旦資料被視為流動的過程,而不是一次性的結果,整個系統的特性會出現質變,這種設計特別適合長時間處理、大量資料、或即時回饋的場景。:

  • 記憶體使用更穩定,不再需要一次承載全部資料
  • 使用者體驗更即時,能更早感知系統正在工作
  • 系統更容易被取消、暫停或提前結束
  • 設計更貼近實際資料來源的行為模式

為什麼這在現代系統中特別重要?在醫療資料、搜尋系統、資料匯出、即時儀表板、AI 回應等場景中,資料本來就不是一次性存在的。若仍沿用「等全部完成再回傳」的模型,只會讓系統變得笨重、延遲放大,並且難以擴充。非同步串流不是進階技巧,而是讓系統與資料本質對齊的設計選擇。

C# 異步數據流(Async Streams)是 C# 8.0 引入的強大特性,利用 IAsyncEnumerable 和 await foreach 語句,允許以非阻塞方式產生和消費異步序列。它非常適合處理分頁 API、感測器數據或長時間運行的任務,能在資料產生時即時處理,而無需等待全部資料載入。 關鍵概念與實作

  • 返回類型 (IAsyncEnumerable):異步數據流的方法返回 IAsyncEnumerable,這允許方法使用 yield return 異步產生資料。
  • 產生資料 (await foreach):使用 async 和 yield return 的方法可逐步產生數據。
  • 消耗資料 (await foreach):呼叫者使用 await foreach 遍歷序列,在讀取下一個元素時暫停執行。

建立測試專案

請依照底下的操作,建立起這篇文章需要用到的練習專案

  • 打開 Visual Studio 2026 IDE 應用程式
  • 從 [Visual Studio 2026] 對話窗中,點選右下方的 [建立新的專案] 按鈕
  • 在 [建立新專案] 對話窗右半部
    • 切換 [所有語言 (L)] 下拉選單控制項為 [C#]
    • 切換 [所有專案類型 (T)] 下拉選單控制項為 [主控台]
  • 在中間的專案範本清單中,找到並且點選 [主控台應用程式] 專案範本選項

    專案,用於建立可在 Windows、Linux 及 macOS 於 .NET 執行的命令列應用程式

  • 點選右下角的 [下一步] 按鈕
  • 在 [設定新的專案] 對話窗
  • 找到 [專案名稱] 欄位,輸入 csAsynchronousStream 作為專案名稱
  • 在剛剛輸入的 [專案名稱] 欄位下方,確認沒有勾選 [將解決方案與專案至於相同目錄中] 這個檢查盒控制項
  • 點選右下角的 [下一步] 按鈕
  • 現在將會看到 [其他資訊] 對話窗
  • 在 [架構] 欄位中,請選擇最新的開發框架,這裡選擇的 [架構] 是 : .NET 10.0 (長期支援)
  • 在這個練習中,需要去勾選 [不要使用最上層陳述式(T)] 這個檢查盒控制項

    這裡的這個操作,可以由讀者自行決定是否要勾選這個檢查盒控制項

  • 請點選右下角的 [建立] 按鈕

稍微等候一下,這個 背景工作服務 專案將會建立完成

修改 Program.cs 類別內容

  • 在專案中找到並且打開 [Program.cs] 檔案
  • 將底下的程式碼取代掉 Program.cs 檔案中內容
using System.Diagnostics;

namespace csAsynchronousStream;

internal class Program
{
    static async Task Main(string[] args)
    {
        var sw = Stopwatch.StartNew();

        Log("=== 非同步串流 Asynchronous Stream 的比較展示 ===");

        await Demo_Stream(sw);
        Console.WriteLine();
        Console.WriteLine();
        Console.WriteLine();
        await Demo_Batch(sw);

        Log("=== 展示結束 ===");
    }

    static void Log(string msg) => Console.WriteLine(msg);

    static void Log(Stopwatch sw, string msg)
        => Console.WriteLine($"{sw.ElapsedMilliseconds,5}ms | {msg}");

    static async Task Demo_Stream(Stopwatch sw)
    {
        Log(sw, "[Stream 串流] 使用 IAsyncEnumerable");

        await foreach (var i in RangeAsync(start: 1, count: 5, delayMs: 300, sw))
        {
            Log(sw, $"[Stream 串流] 接收到 {i} 迭代請求 -> 開始進行處理 ...");
            await Task.Delay(200); // 模擬呼叫端處理耗時
            Log(sw, $"[Stream 串流] 已經處理完成 {i} 請求 -> 請求下一筆");
        }

        Log(sw, "[Stream 串流] 結束");
    }

    static async Task Demo_Batch(Stopwatch sw)
    {
        Log(sw, "[Batch 批次] 開始進行等候 Task<List<int>> (需要等待所有的迭代都完成後,才會繼續往下處理)");

        var list = await RangeTaskAsync(start: 1, count: 5, delayMs: 300, sw);

        Log(sw, $"[Batch 批次] 準備進行處理所有的迭代工作 (count={list.Count}) -> 開始進行處理所有工作...");
        foreach (var i in list)
        {
            Log(sw, $"[Batch 批次] 正在處理 {i} 個工作...");
            await Task.Delay(200); // 模擬呼叫端處理耗時
            Log(sw, $"[Batch 批次] 已經處理完成 {i} 個工作");
        }

        Log(sw, "[Batch 批次] 結束");
    }

    static async IAsyncEnumerable<int> RangeAsync(int start, int count, int delayMs, Stopwatch sw)
    {
        for (int i = start; i < start + count; i++)
        {
            Log(sw, $"[Stream 串流] 準備要產生迭代工作 {i}...");
            await Task.Delay(delayMs);                 // 模擬「取得下一筆資料」耗時
            Log(sw, $"[Stream 串流] 產生出結果給呼叫端 {i}");
            yield return i;                             // 交付給呼叫端,呼叫端可立刻處理
        }
    }

    static async Task<List<int>> RangeTaskAsync(int start, int count, int delayMs, Stopwatch sw)
    {
        var data = new List<int>();

        for (int i = start; i < start + count; i++)
        {
            Log(sw, $"[Batch 批次] 準備要產生迭代工作 {i}...");
            await Task.Delay(delayMs);                 // 模擬「取得下一筆資料」耗時
            data.Add(i);                               // 先收集起來
            Log(sw, $"[Batch 批次] 產生結果集合 {i} (此時將還不會回傳)");
        }

        Log(sw, "[Batch 批次] 全部都處理完成,並且回傳結果");
        return data;
    }
}

在這段程式碼中,將會透過兩個方法 [Demo_Stream] 和 [Demo_Batch],分別展示了使用非同步串流和傳統批次處理的方式來產生和消費資料。RangeAsync 方法使用 IAsyncEnumerable<int> 來逐步產生資料,而 RangeTaskAsync 方法則是一次性產生所有資料並回傳一個 List<int>。另外還有設計其他的方法,這些方法將會是用來模擬資料產生的過程,以及在主控台上輸出相關的日誌訊息,如此可以清楚地看到兩種不同設計模式的行為差異。

對於批次的作法,將會透過 [RangeTaskAsync] 方法來模擬產生資料的過程,每次產生一筆資料都會有一段延遲,直到所有資料都產生完成後才會回傳給呼叫端。呼叫端在收到結果後才開始進行處理,這裡會透過 [foreach] 迴圈來處理每一筆資料,這樣使用者就必須等待整個過程完成才能看到任何結果,整體的使用體驗也會比較差。另外對於非同步串流的作法,將會透過 [RangeAsync] 方法來模擬產生資料的過程,每次產生一筆資料都會有一段延遲,當每筆資料準備好後就會立即回傳給呼叫端,呼叫端在收到每筆資料後就可以立刻開始處理。

首先先來看看在非同步串流出現之前的做法與其執行結果,也就是 [Demo_Batch] 方法的行為,這裡的設計是先把所有資料都準備好,然後一次性回傳給呼叫端,呼叫端在收到結果後才開始進行處理。這種設計模式的問題在於,呼叫端必須等到所有資料都準備完成後才能開始處理,這會導致使用者感覺到系統很慢,因為他們必須等待整個過程完成才能看到任何結果。

首先將會透過 var list = await RangeTaskAsync(start: 1, count: 5, delayMs: 300, sw); 這行程式碼來呼叫 RangeTaskAsync 方法,這個方法會模擬產生資料的過程,每次產生一筆資料都會有一段延遲,直到所有資料都產生完成後才會回傳給呼叫端。呼叫端在收到結果後才開始進行處理,這裡會透過 [foreach] 迴圈來處理每一筆資料。

從後面的執行結果可以看到在螢幕輸出的結果文字中,會看到在 [Batch 批次] 的部分,直到最後一筆資料都產生完成後才會回傳給呼叫端,呼叫端在收到結果後才開始進行處理,這裡會透過 [foreach] 迴圈來處理每一筆資料,從執行結果便可以知道其執行特性。

相對地,在 [Demo_Stream] 方法中,使用了非同步串流的設計,資料是逐步產生的,呼叫端可以在每次收到新的資料時就開始處理,這樣使用者就能更快地看到系統正在工作的反饋,整體的使用體驗也會更好。

在此,一開始將會使用 await foreach (var i in RangeAsync(start: 1, count: 5, delayMs: 300, sw)) 這行程式碼來呼叫 RangeAsync 方法,這個方法會模擬產生資料的過程,每次產生一筆資料都會有一段延遲,當每筆資料準備好後就會立即回傳給呼叫端,呼叫端在收到每筆資料後就可以立刻開始處理,這樣使用者就能更快地看到系統正在工作的反饋,整體的使用體驗也會更好。

這裡也可以從執行結果的文字輸出來觀察到在 [Stream 串流] 的部分,當每筆資料準備好後就會立即回傳給呼叫端,呼叫端在收到每筆資料後就可以立刻開始處理,其帶來的好處將會可想而知。

由此可以看到兩個方法 [Demo_Stream] 和 [Demo_Batch] 的設計作法差異,前者使用了 [await foreach] 程式碼來設計,後者將會使用 var list = await RangeTaskAsync 程式碼來一次性取得所有結果,對於行為差異,前者使用非同步串流的設計,資料是逐步產生的,呼叫端可以在每次收到新的資料時就開始處理,這樣使用者就能更快地看到系統正在工作的反饋,整體的使用體驗也會更好;而後者則是一次性產生所有資料並回傳給呼叫端,呼叫端在收到結果後才開始進行處理。

現在來分析下這兩種做法的優缺點: 非同步串流設計的優點為:

  • 資料是逐步產生的,而不是瞬間完成的
  • 使用者可以邊接收、邊處理,而不是被迫等待
  • 系統可以在過程中決定是否繼續,而不是一次性賭到底
  • 記憶體使用更穩定,不再需要一次承載全部資料
  • 使用者體驗更即時,能更早感知系統正在工作
  • 系統更容易被取消、暫停或提前結束
  • 設計更貼近實際資料來源的行為模式 傳統批次設計的缺點為:
  • 呼叫端必須等到所有資料都準備完成後才能開始處理
  • 使用者長時間看到「等待中」,即使系統早已有部分結果
  • 大量資料被集中在記憶體中,造成不必要的壓力
  • 呼叫端無法中途停止、取消或根據前段結果調整行為

執行程式碼

  • 按下 F5 鍵,開始執行這個程式
  • 程式將會開始執行,並且在主控台視窗內,將會看到類似下圖的輸出結果
=== 非同步串流 Asynchronous Stream 的比較展示 ===
    5ms | [Stream 串流] 使用 IAsyncEnumerable
    6ms | [Stream 串流] 準備要產生迭代工作 1...
  316ms | [Stream 串流] 產生出結果給呼叫端 1
  316ms | [Stream 串流] 接收到 1 迭代請求 -> 開始進行處理 ...
  517ms | [Stream 串流] 已經處理完成 1 請求 -> 請求下一筆
  517ms | [Stream 串流] 準備要產生迭代工作 2...
  818ms | [Stream 串流] 產生出結果給呼叫端 2
  818ms | [Stream 串流] 接收到 2 迭代請求 -> 開始進行處理 ...
 1018ms | [Stream 串流] 已經處理完成 2 請求 -> 請求下一筆
 1018ms | [Stream 串流] 準備要產生迭代工作 3...
 1318ms | [Stream 串流] 產生出結果給呼叫端 3
 1318ms | [Stream 串流] 接收到 3 迭代請求 -> 開始進行處理 ...
 1534ms | [Stream 串流] 已經處理完成 3 請求 -> 請求下一筆
 1534ms | [Stream 串流] 準備要產生迭代工作 4...
 1835ms | [Stream 串流] 產生出結果給呼叫端 4
 1836ms | [Stream 串流] 接收到 4 迭代請求 -> 開始進行處理 ...
 2036ms | [Stream 串流] 已經處理完成 4 請求 -> 請求下一筆
 2036ms | [Stream 串流] 準備要產生迭代工作 5...
 2352ms | [Stream 串流] 產生出結果給呼叫端 5
 2352ms | [Stream 串流] 接收到 5 迭代請求 -> 開始進行處理 ...
 2569ms | [Stream 串流] 已經處理完成 5 請求 -> 請求下一筆
 2571ms | [Stream 串流] 結束



 2574ms | [Batch 批次] 開始進行等候 Task<List<int>> (需要等待所有的迭代都完成後,才會繼續往下處理)
 2575ms | [Batch 批次] 準備要產生迭代工作 1...
 2886ms | [Batch 批次] 產生結果集合 1 (此時將還不會回傳)
 2886ms | [Batch 批次] 準備要產生迭代工作 2...
 3187ms | [Batch 批次] 產生結果集合 2 (此時將還不會回傳)
 3187ms | [Batch 批次] 準備要產生迭代工作 3...
 3503ms | [Batch 批次] 產生結果集合 3 (此時將還不會回傳)
 3503ms | [Batch 批次] 準備要產生迭代工作 4...
 3805ms | [Batch 批次] 產生結果集合 4 (此時將還不會回傳)
 3805ms | [Batch 批次] 準備要產生迭代工作 5...
 4122ms | [Batch 批次] 產生結果集合 5 (此時將還不會回傳)
 4123ms | [Batch 批次] 全部都處理完成,並且回傳結果
 4125ms | [Batch 批次] 準備進行處理所有的迭代工作 (count=5) -> 開始進行處理所有工作...
 4126ms | [Batch 批次] 正在處理 1 個工作...
 4337ms | [Batch 批次] 已經處理完成 1 個工作
 4337ms | [Batch 批次] 正在處理 2 個工作...
 4538ms | [Batch 批次] 已經處理完成 2 個工作
 4539ms | [Batch 批次] 正在處理 3 個工作...
 4751ms | [Batch 批次] 已經處理完成 3 個工作
 4752ms | [Batch 批次] 正在處理 4 個工作...
 4953ms | [Batch 批次] 已經處理完成 4 個工作
 4954ms | [Batch 批次] 正在處理 5 個工作...
 5156ms | [Batch 批次] 已經處理完成 5 個工作
 5157ms | [Batch 批次] 結束 

=== 展示結束 === 




沒有留言:

張貼留言