2019年7月1日 星期一

C# 非同步作業中執行緒集區使用到大量的執行緒的設計考量

C# 非同步作業中執行緒集區使用到大量的執行緒的設計考量

不論是直接使用執行緒或者透過執行緒集區取得執行緒,甚至透過 C# TPL 的 Task 工作物件來設計非同步的應用程式,在許多時候會有可能遇到突然間需要用到大量的執行緒來處理相關的作業需求,不過,卻會造成執行上產生許多問題,例如,執行上會變得更加緩慢等等。
在這篇文章中,將會模擬要執行 200 非同步的作業,這些非同步的作業內都是相同的,首先,顯示出現在的執行 ID 是多少,接著會模擬一個非同步作業,在這裡會使用 Thread.Sleep 方法來模擬休息 2 秒鐘,使用同步的方式來直接等候這個作業完成。最後,當所有的作業都完成後,將會計算出總共花費了多少時間以及這次計算過程總共使用到多少的執行緒集區內的背景執行緒。

了解更多關於 [使用 async 和 await 進行非同步程式設計] 的使用方式
了解更多關於 [Thread Class] 的使用方式
了解更多關於 [Task Class] 的使用方式




理論上,整個程式若使用同步方式來執行,將會花費 200 x 2 = 400 秒的時間,現在,將會使用執行緒集區的背景執行緒、Task.Run方法、Task.Run方法 並且增加執行緒集區的預設執行緒數量、Task.Factory.StartNew 並指定產生新的執行緒方式、與 await 等候非同步的工作方式,比較看看這些設計方式哪個比較好。
為了要能夠知道這些執行緒是否都已經正常結束,在這裡將會透過 CountdownEvent 同步處理原始物件,一開始執行,將會設定該 CountdownEvent 的數量為 200 (也就是要執行背景執行緒的數量,當所有的執行緒都已將建立完成後,將會透過 CountdownEvent.Wait() 來等候所有的執行緒執行完成,當計數到達零時收到訊號,CountdownEvent.Wait() 將會解除封鎖,繼續執行,此時,就會列印出總共花費時間與使用到多少的背景執行緒數量。
在這篇文章中的範例程式碼,可以透過 GitHub 取得

使用執行緒集區的背景執行緒

首先,將會透過迴圈,執行 200 工作,使用 ThreadPool.QueueUserWorkItem 方法,透過執行緒集區 Thread Pool 取得一個背景執行緒,用來執行所需要的作業,底下為此方法的執行結果:
執行緒集區中的背景工作執行緒最大數目 : 2047 / 執行緒集區中的非同步 I/O 執行緒最大數目 : 1000
需要建立的背景工作執行緒最小數目 : 8 / 需要建立的非同步 I/O 執行緒最小數目 : 8

此次使用到 29 個背景執行緒
此次共花費 00:00:23.0056464 時間
將會發現到雖然透過 ThreadPool 物件,送出了 200 要求背景執行緒的請求,可是,卻僅使用到 29 個執行緒來執行這個程式,這是因為這台電腦為四核心的 CPU,總共的邏輯處理器共有 8 個邏輯處理器,因此,在程式啟動的時候,將會預設在執行緒集區內建立 8 個可用的執行緒。
而這個 ThreadPool.QueueUserWorkItem 方法並不表示要立即啟動一個執行緒,來執行所指定的委派 delegate 方法,而是先將這個請求放到執行緒集區 Thread Pool 佇列 Queue 中;若 ThreadPool 發現到集區內有可用的執行緒,將會指派該執行緒來執行從佇列中取得的委派物件,並且開始執行該委派方法。
然而,現在執行緒集區內僅有最多 8 個可用執行緒可以使用,很快地就會被分派完了,並且每個執行緒將會需要 2 秒鐘的時間才會執行完畢,這個時候,對於其他的 196 在執行緒集區佇列內等候執行的委派物件該怎麼辦呢?這樣的情況,可以稱作為 執行緒執行緒飢餓 thread starvation
執行緒集區會動態的增加集區內可用的執行緒數量,智慧型的因應這樣的情況,不過,Thread Pool 並不是立即、馬上的產生出所有佇列內需要用到的執行緒數量,而是大約每 500ms 來產生出一個新的背景執行緒出來,直到把佇列等候者都消化完畢為止。會有這樣的設計,這是因為要避免 執行緒醒來風暴 thread wakeup storms 情況的產生。所以,在執行 Case1() 方法的時候,將會看到執行緒是陸陸續續的產生出來,而不是一次有 200 個執行緒來執行這個程式的需求。
C Sharp / C#
class Program
{
    public static int MaxThreads;
    static int MaxLoop = 200;
    static int SimulateTaskTime = 2000;
    static CountdownEvent  countdownEvent = new CountdownEvent(MaxLoop);
    static void Main(string[] args)
    {
        Case1();
    }

    private static void Case1()
    {
        int CurrentAvailableThreads = GetAvailableThreads();
        List<Task> allTask = new List<Task>();
        Stopwatch stopwatch = new Stopwatch();
        stopwatch.Start();
        for (int i = 0; i < MaxLoop; i++)
        {
            ThreadPool.QueueUserWorkItem(x =>
            {
                int tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
                if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
                Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
                Thread.Sleep(SimulateTaskTime);
                tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
                if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
                countdownEvent.Signal();
            });
        }

        countdownEvent.Wait();
        stopwatch.Stop();
        Console.WriteLine($"此次使用到 {MaxThreads} 個背景執行緒");
        Console.WriteLine($"此次共花費 {stopwatch.Elapsed} 時間");
        Console.WriteLine("Press any key for continuing...");
        Console.ReadKey();
    }

    public static int GetAvailableThreads()
    {
        int workerThreads;
        int completionPortThreads;
        ThreadPool.GetAvailableThreads(out workerThreads, out completionPortThreads);
        return workerThreads;
    }
    public static void PrintThreadPoolInformation()
    {
        int workerThreads;
        int completionPortThreads;
        ThreadPool.GetMaxThreads(out workerThreads, out completionPortThreads);
        Console.WriteLine($"執行緒集區中的背景工作執行緒最大數目 : {workerThreads} / 執行緒集區中的非同步 I/O 執行緒最大數目 : { completionPortThreads}");
        ThreadPool.GetMinThreads(out workerThreads, out completionPortThreads);
        Console.WriteLine($"需要建立的背景工作執行緒最小數目 : {workerThreads} / 需要建立的非同步 I/O 執行緒最小數目 : { completionPortThreads}");
        Console.WriteLine($"");
    }
}

使用 Task.Run方法

在這個測試中,將會把測試過程寫在 Case2 方法內。
在這裡將會把剛剛用的 ThreadPool.QueueUserWorkItem 方法,改寫成為 Task.Run 方法,使用非同步工作的方式來執行同樣的需求,底下是執行結果
執行緒集區中的背景工作執行緒最大數目 : 2047 / 執行緒集區中的非同步 I/O 執行緒最大數目 : 1000
需要建立的背景工作執行緒最小數目 : 8 / 需要建立的非同步 I/O 執行緒最小數目 : 8

此次使用到 29 個背景執行緒
此次共花費 00:00:23.0241135 時間
你將會發現到 Case2 的方法與 Case1 的方法執行起來結果是差不多的,當然,這樣的結果一點都不意外,因為,當使用 Task.Run 方法來指派一個委派方法,其實會透過 執行緒集區 來取得一個背景執行緒,用來執行 Task.Run 所指定的委派方法,所以,整體執行過程與會遇到的問題,將會與使用 ThreadPool.QueueUserWorkItem 的情況下相同的。
但是要如何解決這樣的問題呢?因為理論上同時要讓 200 作業進行,這些作業都只要 2 秒鐘就會執行完畢,應該是只需要 2 秒種就會執行完成了,可是,剛剛的兩個測試,卻花費了 23 秒鐘的時間。
C Sharp / C#
private static void Case2()
{
    int CurrentAvailableThreads = GetAvailableThreads();
    List<Task> allTask = new List<Task>();
    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();
    for (int i = 0; i < MaxLoop; i++)
    {
        allTask.Add(Task.Run(() =>
        {
            int tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
            if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(SimulateTaskTime);
            tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
            if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
            countdownEvent.Signal();
        }));
    }

    countdownEvent.Wait();
    stopwatch.Stop();

    PrintThreadPoolInformation();
    Console.WriteLine($"此次使用到 {MaxThreads} 個背景執行緒");
    Console.WriteLine($"此次共花費 {stopwatch.Elapsed} 時間");
    Console.WriteLine("Press any key for continuing...");
    Console.ReadKey();
}

使用 Task.Run方法 並且增加執行緒集區的預設執行緒數量

既然在這台測試電腦上,預設執行緒集區建立好的執行緒數量為 8 個,那麼,是否可以把它加大呢?(在此,先不考量將預設執行緒數量加大後所帶來的後遺症與副作用)這裡可以使用 ThreadPool.SetMinThreads(16, 16); 方法來增加執行緒集區內預設可用執行緒的最小數量,現在修正為 16 個預設執行緒,底下為執行結果:
執行緒集區中的背景工作執行緒最大數目 : 2047 / 執行緒集區中的非同步 I/O 執行緒最大數目 : 1000
需要建立的背景工作執行緒最小數目 : 16 / 需要建立的非同步 I/O 執行緒最小數目 : 16

此次使用到 32 個背景執行緒
此次共花費 00:00:18.0309260 時間
從執行結果中,可以看到當查詢 Thread Pool,確實已經有 16 個預設執行緒隨時可以使用,不過,這樣的執行結果卻發現到這使將會使用到做多 32 個執行緒,比起沒有設定 ThreadPool.SetMinThreads(16, 16); 之前多了三個執行緒,而且執行完成時間也降為 18 秒。
現在,再來把可用執行緒數量調整到 32 個可用執行緒,現在來看看會有甚麼結果產生。從底下的執行結果,發現到這次將會總共使用到 42 個執行緒,比起前一次又多了 10 個執行緒,而且,完成時間降低到 12 秒。
執行緒集區中的背景工作執行緒最大數目 : 2047 / 執行緒集區中的非同步 I/O 執行緒最大數目 : 1000
需要建立的背景工作執行緒最小數目 : 32 / 需要建立的非同步 I/O 執行緒最小數目 : 32

此次使用到 42 個背景執行緒
此次共花費 00:00:12.0712832 時間
那麼,若將可用執行緒數量調整成為 200 的話,會有甚麼執行結果呢?哇,採用預設 200 執行緒的方式,整體執行結果將僅需要 2.84 秒 就可以執行完成了,真是太神奇了。
執行緒集區中的背景工作執行緒最大數目 : 2047 / 執行緒集區中的非同步 I/O 執行緒最大數目 : 1000
需要建立的背景工作執行緒最小數目 : 200 / 需要建立的非同步 I/O 執行緒最小數目 : 200

此次使用到 200 個背景執行緒
此次共花費 00:00:02.8351719 時間
C Sharp / C#
private static void Case3()
{
    ThreadPool.SetMinThreads(16, 16);
    int CurrentAvailableThreads = GetAvailableThreads();
    List<Task> allTask = new List<Task>();
    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();
    for (int i = 0; i < MaxLoop; i++)
    {
        allTask.Add(Task.Run(() =>
        {
            int tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
            if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(SimulateTaskTime);
            tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
            if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
            countdownEvent.Signal();
        }));
    }

    countdownEvent.Wait();
    stopwatch.Stop();

    PrintThreadPoolInformation();
    Console.WriteLine($"此次使用到 {MaxThreads} 個背景執行緒");
    Console.WriteLine($"此次共花費 {stopwatch.Elapsed} 時間");
    Console.WriteLine("Press any key for continuing...");
    Console.ReadKey();
}

使用 Task.Factory.StartNew 並指定產生新的執行緒方式

既然加大執行緒數量會有這麼大的明顯執行效能的改善,那麼,為什麼不把每一個 Task 都配備一個專屬執行緒,而不是透過 ThreadPool 內來取得,這樣不是會跑得更快嗎?在這裡將原先的 Task.Run 方法,修改使用 Task.Factory.StartNew 這個工廠方法來產生出一個 Task 物件,不過,在這裡將會加入了 TaskCreationOptions.LongRunning 參數,告知 StartNew 方法,幫這個工作產生一個獨立、專屬的背景執行緒。底下是這樣修改方式的執行結果:
執行緒集區中的背景工作執行緒最大數目 : 2047 / 執行緒集區中的非同步 I/O 執行緒最大數目 : 1000
需要建立的背景工作執行緒最小數目 : 8 / 需要建立的非同步 I/O 執行緒最小數目 : 8

此次使用到 0 個背景執行緒
此次共花費 00:00:03.1899241 時間
雖然看到了 [此次使用到 0 個背景執行緒] 文字,不用太緊張,這裡顯示的是量代表是有使用到執行緒集區內的執行緒數量,不過,在這裡將會是直接產生出一個背景執行緒來處理 Task 物件所需要的事情;另外,將會看到這樣的修正將會使用到約 3 秒鐘的時間就完成了所有 200 作業,可謂效能極其優異呀~~
C Sharp / C#
private static void Case4()
{
    int CurrentAvailableThreads = GetAvailableThreads();
    List<Task> allTask = new List<Task>();
    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();
    for (int i = 0; i < MaxLoop; i++)
    {
        allTask.Add(Task.Factory.StartNew(() =>
        {
            int tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
            if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
            Thread.Sleep(SimulateTaskTime);
            tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
            if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
            countdownEvent.Signal();
        }, TaskCreationOptions.LongRunning));
    }

    countdownEvent.Wait();
    stopwatch.Stop();

    PrintThreadPoolInformation();
    Console.WriteLine($"此次使用到 {MaxThreads} 個背景執行緒");
    Console.WriteLine($"此次共花費 {stopwatch.Elapsed} 時間");
    Console.WriteLine("Press any key for continuing...");
    Console.ReadKey();
}

使用 await 等候非同步的工作

最好的解決方式那就是妥善使用執行緒集區內的執行緒,並且不要長期把佔執行緒集區的背景執行緒,當借用從執行緒集區內借用了一個執行緒,若需要一個等候長期時間的方法,那麼,就使用 awaite 關鍵字,當要等候一個非同步工作的時候,立即歸還該執行緒,當非同步工作完成之後,再從執行緒集區內取回一個執行緒,繼續來執行後續的工作。底下是將原先 Task.Run 內的委派方法,改寫成為使用 async await 的做法的執行結果:
執行緒集區中的背景工作執行緒最大數目 : 2047 / 執行緒集區中的非同步 I/O 執行緒最大數目 : 1000
需要建立的背景工作執行緒最小數目 : 8 / 需要建立的非同步 I/O 執行緒最小數目 : 8

此次使用到 9 個背景執行緒
此次共花費 00:00:02.2039664 時間
從執行結果中可以看到,這裡總共用到了最多 9 個執行緒,比起原先執行緒集區內的預設 8 個執行緒,僅僅多出一個執行緒。不過,整體執行時間將僅需要 2.2 秒鐘的時間。
還記得甚麼時候要將方法改寫成為一個非同步作業方法嗎?原則上,當一個方法執行時間超過了 50ms 時間以上,建議將這個方法改寫成為非同步作業處理方式,因此,在這個範例中,執行緒內需要花費 2 秒鐘的時間來使用同步方式等候執行結果,所以,把原先使用同步作業方法 Thread.Sleep 改寫成為非同步工作的 Task.Delay 方法,並且加上 await 來等候這個非同步工作;因此,當執行非同步工作的時候,因為該執行緒沒有任何事情可以繼續來處理 (因為要等候非同步工作完成之後,才能繼續處理) ,所以,使用 await 關鍵字,讓非同步工作使用非同步的方式來執行,並且立即 return,也就是把這個執行緒歸還給執行緒集區,當非同步工作完成之後,在從執行緒集區內取得一個執行緒,完成非同步工作後的方法。
那麼,為什麼這樣的設計方式會比較好呢?原因在於這個方法不會使用過多的執行緒數量來完成所需要的工作,因為,當有大量的執行緒產生的時候,將會造成記憶體需求增加 (每個執行緒預設需要 1MB 的記憶體空間),而且這些大量的執行緒,也會造成系統的內容交換 Content Switch 次數增加,當然,也會影響到該系統內的其他程式的運作與效能的降低。
C Sharp / C#
private static void Case5()
{
    int CurrentAvailableThreads = GetAvailableThreads();
    List<Task> allTask = new List<Task>();
    Stopwatch stopwatch = new Stopwatch();
    stopwatch.Start();
    for (int i = 0; i < MaxLoop; i++)
    {
        allTask.Add(Task.Run(async () =>
        {
            int tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
            if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
            Console.WriteLine($"{Thread.CurrentThread.ManagedThreadId}");
            await Task.Delay(SimulateTaskTime);
            tmpThreadCC = CurrentAvailableThreads - GetAvailableThreads();
            if (MaxThreads < tmpThreadCC) MaxThreads = tmpThreadCC;
            countdownEvent.Signal();
        }));
    }

    countdownEvent.Wait();
    stopwatch.Stop();

    PrintThreadPoolInformation();
    Console.WriteLine($"此次使用到 {MaxThreads} 個背景執行緒");
    Console.WriteLine($"此次共花費 {stopwatch.Elapsed} 時間");
    Console.WriteLine("Press any key for continuing...");
    Console.ReadKey();
}


了解更多關於 [使用 async 和 await 進行非同步程式設計] 的使用方式
了解更多關於 [Thread Class] 的使用方式
了解更多關於 [Task Class] 的使用方式






沒有留言:

張貼留言