顯示具有 Task 標籤的文章。 顯示所有文章
顯示具有 Task 標籤的文章。 顯示所有文章

2025年3月5日 星期三

使用新執行緒,而不是透過執行緒集區執行緒完成非同步工作

使用新執行緒,而不是透過執行緒集區執行緒完成非同步工作

當在進行多執行緒專案設計的時候,為了要能夠發揮最大硬體效能與執行效率,通常會使用多個執行緒來進行設計,這樣可以讓多個工作同時進行,而不是等待前一個工作完成後,再進行下一個工作。在 .NET 中,可以透過 Task 類別來建立新的執行緒,這樣可以讓多個工作同時進行,而不是等待前一個工作完成後,再進行下一個工作。

不過,有些時候,並不希望能夠使用大數量的多個執行緒來進行工作處理,這可能是因為硬體效能無法處理這樣需求,或者是有執行上的瓶頸在如 I/O 上,大量的使用執行緒反而會無法滿足專案的需求,因此,這裡需要有個方法來做到最多可以使用 N 個執行緒,若其中一個執行緒執行完畢後,便可以讓另外一個請求來取得執行緒繼續工作,限制執行緒的使用數量。

在 .NET / C# 中,面對這樣的需求,會有多個方式來處理,在這篇文章中,將會使用 SemaphoreSlim 類別來限制執行緒的使用數量,這樣可以讓多個工作同時進行,而不是等待前一個工作完成後,再進行下一個工作。

建立測試專案

請依照底下的操作,建立起這篇文章需要用到的練習專案

  • 打開 Visual Studio 2022 IDE 應用程式
  • 從 [Visual Studio 2022] 對話窗中,點選右下方的 [建立新的專案] 按鈕
  • 在 [建立新專案] 對話窗右半部
    • 切換 [所有語言 (L)] 下拉選單控制項為 [C#]
    • 切換 [所有專案類型 (T)] 下拉選單控制項為 [主控台]
  • 在中間的專案範本清單中,找到並且點選 [主控台應用程式] 專案範本選項

    專案,用於建立可在 Windows、Linux 及 macOS 於 .NET 執行的命令列應用程式

  • 點選右下角的 [下一步] 按鈕
  • 在 [設定新的專案] 對話窗
  • 找到 [專案名稱] 欄位,輸入 csSemaphoreSlimExcessiveRelease 作為專案名稱
  • 在剛剛輸入的 [專案名稱] 欄位下方,確認沒有勾選 [將解決方案與專案至於相同目錄中] 這個檢查盒控制項
  • 點選右下角的 [下一步] 按鈕
  • 現在將會看到 [其他資訊] 對話窗
  • 在 [架構] 欄位中,請選擇最新的開發框架,這裡選擇的 [架構] 是 : .NET 8.0 (長期支援)
  • 在這個練習中,需要去勾選 [不要使用最上層陳述式(T)] 這個檢查盒控制項

    這裡的這個操作,可以由讀者自行決定是否要勾選這個檢查盒控制項

  • 請點選右下角的 [建立] 按鈕

稍微等候一下,這個 背景工作服務 專案將會建立完成

修改 Program.cs 類別內容

在這篇文章中,將會把會用到的新類別與程式碼,都寫入到 [Program.cs] 這個檔案中,請依照底下的操作,修改 [Program.cs] 這個檔案的內容

  • 在專案中找到並且打開 [Program.cs] 檔案
  • 將底下的程式碼取代掉 Program.cs 檔案中內容
namespace csSemaphoreSlimExcessiveRelease;

internal class Program
{
    static void Main(string[] args)
    {
        SemaphoreSlim semaphoreSlim1 = new SemaphoreSlim(3, 6);

        List<Task> allTasks = new();

        Execute10Tasks(semaphoreSlim1, allTasks);

        semaphoreSlim1.Release();
        semaphoreSlim1.Release();
        semaphoreSlim1.Release();
        Execute10Tasks(semaphoreSlim1, allTasks);
    }

    private static void Execute10Tasks(SemaphoreSlim semaphoreSlim1, List<Task> allTasks)
    {
        allTasks.Clear();
        for (int i = 0; i < 10; i++)
        {
            int index = i;
            Console.WriteLine($"Index{index} " +
                $"semaphore 剩下 {semaphoreSlim1.CurrentCount}");

            semaphoreSlim1.Wait();
            var task = Task.Run(() =>
            {
                Console.WriteLine($"{DateTime.Now.TimeOfDay} 工作 {index} " +
                    $"獲得 semaphore({semaphoreSlim1.CurrentCount})");
                Thread.Sleep(3000);
                semaphoreSlim1.Release();
                Console.WriteLine($"{DateTime.Now.TimeOfDay} 工作 {index} " +
                    $"釋放 semaphore({semaphoreSlim1.CurrentCount})");
            });
            allTasks.Add(task);
        }

        Task.WaitAll(allTasks.ToArray());
        Console.WriteLine();
        Console.WriteLine();
        Console.WriteLine();
    }
}

在 [Main] 方法內,首先使用敘述 SemaphoreSlim semaphoreSlim1 = new SemaphoreSlim(3, 6); 來建立一個 SemaphoreSlim 類別的物件,這個物件會限制最多可以有 3 個執行緒可以同時執行,而最多可以有 6 個執行緒可以等待執行。對於這個 SemaphoreSlim 類別的建構子,有兩個參數,第一個參數是 initialCount,這個參數是指定初始的 semaphore 數量,第二個參數是 maxCount,這個參數是指定最大的 semaphore 數量。

而對於 [SemaphoreSlim] 物件的用法將會是有兩個主要的方法,一個是 Wait 方法,這個方法是用來等待 semaphore 的釋放,當 semaphore 數量為 0 時,這個方法會將目前的執行緒暫停,直到 semaphore 數量大於 0 時,才會繼續執行。另外一個方法是 Release 方法,這個方法是用來釋放 semaphore,當 semaphore 數量增加時,等待的執行緒將會繼續執行。該物件還會有個 [CurrentCount] 屬性,這個屬性是用來取得目前 semaphore 的數量。也就是說,若成功呼叫 [Wait] 方法,[CurrentCount] 屬性的值將會減少,若成功呼叫 [Release] 方法,[CurrentCount] 屬性的值將會增加。

接下來將會建立一個集合工作物件,型別為 List<Task>,這個集合工作物件將會用來存放所有的工作物件。接著呼叫 Execute10Tasks 方法,這個方法會執行 10 個工作,這個方法會接收兩個參數,第一個參數是 SemaphoreSlim 類別的物件,第二個參數是 List<Task> 集合工作物件。

當進入到 [Execute10Tasks] 方法內,首先會把 allTasks 集合工作物件清空,接著進入到迴圈內,這個迴圈會執行 10 次,每次迴圈都會執行一個工作(只要沒有超過限制的執行緒數量)。

在迴圈內,會先印出目前 semaphore 的數量,接著呼叫 Wait 方法,這個方法會等待 semaphore 還有有效的旗標數量,這個方法才算執行完畢,可以繼續往下執行,而在此時,semaphore 的 [CurrentCount] 將會減一,當 semaphore 的 [CurrentCount] 數量為 0 時,這個方法會將目前的執行緒暫停,直到 semaphore 數量大於 0 時,才會繼續執行。

接著呼叫 Task.Run 方法,這個方法會建立一個工作,這個工作會執行一個匿名方法,這個匿名方法會印出目前時間與工作索引,接著暫停 3 秒,最後呼叫 Release 方法,這個方法是用來釋放 semaphore,也就是當這個方法成功執行完成後,semaphore 的 [CurrentCount]將會加一,不過,一旦 semaphore 的 [CurrentCount] 數量抵達建構式中指定的 maxCount (這個參數是指定最大的 semaphore 數量),若再次呼叫 [Release] 方法,將會造成 System.Threading.SemaphoreFullException: 'Adding the specified count to the semaphore would cause it to exceed its maximum count.' 例外異常拋出。

在下圖中,可以看到當建立 [SemaphoreSlim] 物件時候,建構子,第一個參數是 initialCount,這個參數是指定初始的 semaphore 數量,這裡宣告的4,代表初始的 semaphore 數量為 4。第二個參數是 maxCount,這個參數是指定最大的 semaphore 數量,這裡宣告的6,代表最大的 semaphore 數量為 6。所以,當 [Execute10Tasks] 執行完後,若執行到第三次的 semaphoreSlim1.Release(),需要將[CurrentCount] 數量加一,這樣將會造成超過最大可用的 semaphore 數量,這時候將會拋出 System.Threading.SemaphoreFullException: 'Adding the specified count to the semaphore would cause it to exceed its maximum count.' 例外異常拋出。

當 semaphore 數量增加時,等待的執行緒將會繼續執行。最後將這個工作加入到 allTasks 集合工作物件中。

執行程式

  • 按下 F5 鍵,開始執行這個程式
  • 程式將會開始執行,並且在主控台視窗內,將會看到類似下圖的輸出結果

從底下的執行結果可以看出

Index0 semaphore 剩下 3
Index1 semaphore 剩下 2
Index2 semaphore 剩下 1
Index3 semaphore 剩下 0
08:54:30.1628019 工作 2 獲得 semaphore(0)
08:54:30.1620930 工作 1 獲得 semaphore(0)
08:54:30.1628072 工作 0 獲得 semaphore(0)
08:54:33.1838500 工作 0 釋放 semaphore(2)
08:54:33.1838557 工作 1 釋放 semaphore(2)
08:54:33.1838680 工作 2 釋放 semaphore(2)
Index4 semaphore 剩下 2
08:54:33.1846819 工作 3 獲得 semaphore(2)
Index5 semaphore 剩下 1
08:54:33.1847788 工作 4 獲得 semaphore(1)
Index6 semaphore 剩下 0
08:54:33.1850070 工作 5 獲得 semaphore(0)
08:54:36.1931672 工作 3 釋放 semaphore(0)
08:54:36.1931882 工作 4 釋放 semaphore(1)
08:54:36.1932071 工作 5 釋放 semaphore(2)
Index7 semaphore 剩下 2
08:54:36.1936082 工作 6 獲得 semaphore(2)
Index8 semaphore 剩下 1
08:54:36.1938060 工作 7 獲得 semaphore(1)
Index9 semaphore 剩下 0
08:54:36.1938699 工作 8 獲得 semaphore(0)
08:54:39.2087783 工作 8 釋放 semaphore(2)
08:54:39.2087766 工作 6 釋放 semaphore(2)
08:54:39.2087766 工作 7 釋放 semaphore(2)
08:54:39.2092105 工作 9 獲得 semaphore(2)
08:54:42.2177655 工作 9 釋放 semaphore(3)



Index0 semaphore 剩下 6
Index1 semaphore 剩下 5
08:54:42.2186507 工作 0 獲得 semaphore(5)
Index2 semaphore 剩下 4
08:54:42.2187209 工作 1 獲得 semaphore(4)
Index3 semaphore 剩下 3
08:54:42.2188507 工作 2 獲得 semaphore(3)
Index4 semaphore 剩下 2
08:54:42.2189604 工作 3 獲得 semaphore(2)
Index5 semaphore 剩下 1
Index6 semaphore 剩下 0
08:54:42.2194051 工作 4 獲得 semaphore(0)
08:54:42.2266215 工作 5 獲得 semaphore(0)
08:54:45.2192551 工作 3 釋放 semaphore(3)
08:54:45.2192551 工作 2 釋放 semaphore(3)
Index7 semaphore 剩下 3
08:54:45.2192735 工作 0 釋放 semaphore(3)
08:54:45.2192551 工作 1 釋放 semaphore(3)
Index8 semaphore 剩下 2
08:54:45.2196008 工作 7 獲得 semaphore(2)
08:54:45.2197678 工作 8 獲得 semaphore(1)
Index9 semaphore 剩下 1
08:54:45.2196802 工作 6 獲得 semaphore(2)
08:54:45.2198453 工作 4 釋放 semaphore(2)
08:54:45.2236722 工作 9 獲得 semaphore(1)
08:54:45.2342956 工作 5 釋放 semaphore(2)
08:54:48.2335085 工作 7 釋放 semaphore(6)
08:54:48.2335037 工作 6 釋放 semaphore(6)
08:54:48.2335039 工作 8 釋放 semaphore(6) 

08:54:48.2335037 工作 9 釋放 semaphore(6) 




2022年7月27日 星期三

C# 非同步 : Task.Run 要在哪裡傳入與檢查 CancellationToken

Task.Run 要在哪裡傳入與檢查 CancellationToken

這篇文章將會說明,對於一開始學習 Task.Run 方法與想要透過 CancellationToken 取消權杖來取消工作的開發者,將會有個迷失點,這個問題在於,當查看 Task.Run 方法 文件的時候,將會看到 Run(Action, CancellationToken) 這個函式簽章,而這個方法將透過執行緒集區佇列內進行排隊,取得一個可用執行緒,並且將 Action 這個委派方法交由這個執行緒來執行,並傳回代表該工作的 Task 物件;這個方法的第二個參數,則是一個 [CancellationToken] 型別的參數,代表一個取消權杖可用來在工作尚未開始之前取消工作。

若沒有看到上面最後一段說明,絕大部分的開發者都會以為,只要在呼叫 Task.Run 方法的時候,將取消權杖傳入進去,在任何時候就可以透過產生該取消權證的 CancellationTokenSource 這個型別的物件,發出呼叫 CancellationTokenSource.Cancel 方法 , 送出與傳遞取消要求,此時,這個 Task.Run 這個非同步工作,就會取消執行了。

從上面的說明可以看出不是這樣的運作的,當呼叫 Task.Run 方法所傳入的取消權杖,僅會在要呼叫 Task.Run 方法前,這個取消權杖就已將發出取消請求了,所以,一旦呼叫了 Task.Run 的方法,就不會去取得一個非同步的執行緒,並且開始執行所指定的委派方法,如此,可以節省系統資源的使用。

現在來測試這樣的說明,首先,建立一個 主控台應用程式,其程式碼如下:

static void Main(string[] args)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;
    cts.Cancel();
    Console.WriteLine($"1 建立非同步工作");
    var task = Task.Run(() =>
    {
        Console.WriteLine($"  2 非同步工作已經開始執行");
        Thread.Sleep(3000);
        Console.WriteLine($"  3 非同步工作已經結束執行");
    }, token);
    Console.WriteLine($"4 主執行緒休息 2 秒");
    Thread.Sleep(2000);
    Console.WriteLine($"5 主執行緒即將結束執行");
    Console.WriteLine("Press any key for continuing...");
    Console.ReadKey();
}

這是一個非常經典的取消權杖的程式設計準則,在程式要開始執行之前,首先建立一個 CancellationTokenSource 物件,這個物件可用於取得取消權杖與可以發出取消請求的方法,接著,將取消權杖透過 CancellationToken token = cts.Token 敘述來取得,有了取消權杖,之後面可以將取消權杖傳遞到其他的非同步方法或者非同工作內。

在這個時候,將會呼叫 cts.Cancel() 方法,對於該取消權杖發出取消工作的請求,此時,該取消權杖處於已經取消的狀態下,接著,透過 Task.Run 方法建立一個非同步工作,這個非同步工作所指定的委派方法將會休息三秒鐘後,就會結束執行。

同時,在主執行緒端,一旦非同步工作建立與開始執行後,將會休息 2 秒鐘

現在,執行這個專案,將會得到底下的執行結果

1 建立非同步工作
4 主執行緒休息 2 秒
5 主執行緒即將結束執行
Press any key for continuing...

從執行結果的輸出內容可以看出,這個非同步工作物件所指定的委派方法,是沒有被執行的,也就是說,雖然呼叫了 Task.Run 方法,系統並沒有從執行緒集區內取得一個執行緒來執行指定的委派方法程式碼。

現在將測試程式碼修改成為如下:

static void Main(string[] args)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;
    Console.WriteLine($"1 建立非同步工作");
    var task = Task.Run(() =>
    {
        Console.WriteLine($"  2 非同步工作已經開始執行");
        Thread.Sleep(3000);
        Console.WriteLine($"  3 非同步工作已經結束執行");
    }, token);
    Console.WriteLine($"4 主執行緒休息 1 秒");
    cts.CancelAfter(2000);
    Thread.Sleep(1000);
    Console.WriteLine($"5 主執行緒即將結束執行");
    Console.WriteLine("Press any key for continuing...");
    Console.ReadKey();
}

前半段幾乎沒有變動,除了將一開始就發出取消權杖的呼叫,將其程式碼移除了。

在非同步工作建立之後,使用了 cts.CancelAfter(2000); 敘述,將會於 2 秒鐘之後,對取消權杖發出取消的請求通知,這個敘述執行完後,並不會暫停2秒鐘,而會繼續往下執行,現在,在主執行緒內,將會休息一秒鐘後,準備要結束執行。

底下是執行這個專案的螢幕輸出結果

1 建立非同步工作
4 主執行緒休息 1 秒
  2 非同步工作已經開始執行
5 主執行緒即將結束執行
Press any key for continuing...
  3 非同步工作已經結束執行

從執行結果可以看出,雖然在呼叫 Task.Run 方法的時候,有傳入取消權杖,不過,請求取消的動作,是在呼叫 Task.Run 之後,因此,在 Task.Run 所傳入的取消權杖是沒有效用的,結論就是,想要建立一個非同步工作,讓這個非同步工作具有取消權杖的效果,建議如下:

  • 在 Task.Run 方法內,傳入取消權杖
  • 在 Task.Run 的委派方法內,也要輪詢檢查取消權杖是否已經發出取消請求

接著,將測試程式碼修改如下:

static void Main(string[] args)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;
    Console.WriteLine($"1 建立非同步工作");
    var task = Task.Run(() =>
    {
        Console.WriteLine($"  2 非同步工作已經開始執行");
        token.ThrowIfCancellationRequested();
        Thread.Sleep(3000);
        token.ThrowIfCancellationRequested();
        Console.WriteLine($"  3 非同步工作已經結束執行");
    }, token);
    Console.WriteLine($"4 主執行緒休息 1 秒");
    cts.CancelAfter(2000);
    Thread.Sleep(1000);
    Console.WriteLine($"5 主執行緒即將結束執行");
    Console.WriteLine("Press any key for continuing...");
    Console.ReadKey();
}

在這裡將會在非同步工作內的委派方法中,使用輪詢 Polling 方式,檢查取消權杖的狀態,這裡使用了 token.ThrowIfCancellationRequested(); 敘述,一旦有取消請求發出,則將會拋出例外異常,結束這個非同步工作的執行。

底下是執行結果

1 建立非同步工作
4 主執行緒休息 1 秒
  2 非同步工作已經開始執行
5 主執行緒即將結束執行
Press any key for continuing...

從執行結果可以看出,當發出取消請求之後,這個取消權杖被設定為取消狀態,而這個非同的工作也就拋出例外異常,而中止了。

最後,再來將程式碼改成底下內容:

static async Task Main(string[] args)
{
    CancellationTokenSource cts = new CancellationTokenSource();
    CancellationToken token = cts.Token;
    Console.WriteLine($"1 建立非同步工作 {DateTime.Now.TimeOfDay}");
    var task = Task.Run(async () =>
    {
        Console.WriteLine($"  2 非同步工作已經開始執行 {DateTime.Now.TimeOfDay}");
        token.ThrowIfCancellationRequested();
        await Task.Delay(3000, token);
        token.ThrowIfCancellationRequested();
        Console.WriteLine($"  3 非同步工作已經結束執行 {DateTime.Now.TimeOfDay}");
    }, token);
    Console.WriteLine($"4 主執行緒休息 1 秒 {DateTime.Now.TimeOfDay}");
    cts.CancelAfter(2000);
    try
    {
        await task;
    }
    catch (Exception ex)
    {
        Console.WriteLine($"5 非同步工作取消了 {ex.GetType().ToString()} : {DateTime.Now.TimeOfDay}");
    }
    Console.WriteLine($"6 主執行緒即將結束執行 {DateTime.Now.TimeOfDay}");
    Console.WriteLine("Press any key for continuing...");
    Console.ReadKey();
}

在上面的程式碼,對 Console.WriteLine 方法內,將會加入當時呼叫的時間點內容,這裡是要來判斷在非同步工作內委派方法,在等待 3 秒鐘的時候,是否不會在 3 秒內的任何時間點,若接收到取消通知,都會立即中止等候,直接結束這個非同步工作。

對於在非同步工作內的委派方法,原先使用 Thread.Sleep(3000) 的封鎖等待的呼叫,將會改成 await Task.Delay(3000, token); 非封鎖式的呼叫,並且在 Delay 方法的後面,傳入取消權杖,要 Task.Delay 這個方法要能夠關注取消權杖是否發出請求。

在主執行緒端,也將休息2秒的敘述,改成 await task; 這個敘述,並且將其包裹在 try catch 敘述內,因為,一旦這個非同步工作有取消動作產生,將會拋出例外異常,所以,將會在此捕捉這個例外異常,查看非同步工作內的委派方法,何時觸發了取消動作。

現在,可以來執行這個專案,得到底下的執行結果

1 建立非同步工作 16:22:37.3960409
4 主執行緒休息 1 秒 16:22:37.4027808
  2 非同步工作已經開始執行 16:22:37.4033487
5 非同步工作取消了 System.Threading.Tasks.TaskCanceledException : 16:22:39.4463615
6 主執行緒即將結束執行 16:22:39.4464650
Press any key for continuing...

從執行結果可以看出,非同步工作內的委派方法於 37 秒的時候開始執行,接著要休息 3 秒鐘,因此,這個非同步工作應該會於 40 秒的時候結束執行。

不過,在主執行緒端,將會於 2 秒鐘之後發出取消請求,因此,這個非同步工作將會於 39 秒的時候拋出例外異常,這裡看到的例外異常是 TaskCanceledException ,這代表表示用來傳達工作取消的例外狀況;從捕捉到例外異常的時間點,也正好是 39 秒,符合這個程式的設計。

結論

當進行非同步工作設計的時候,若有取消請求的設計需求,還是需要在非同步工作內的委派方法裡面,輪詢取消權杖,適時做出處置,這點功夫是不能缺少的。