您现在的位置是:网站首页> 编程资料编程资料

.NET 6线程池ThreadPool实现概述_自学过程_

2023-05-24 416人已围观

简介 .NET 6线程池ThreadPool实现概述_自学过程_

前言

在即将发布的 .NET 6 runtime 中,默认的线程池实现从 C++ 代码改为了 C#,更方便我们学习线程池的设计了。
https://github.com/dotnet/runtime/tree/release/6.0/src/libraries/System.Threading.ThreadPool

新的线程池实现位于 PortableThreadPool 中,原 ThreadPool 中的对外公开的接口会直接调用 PortableThreadPool 中的实现。

通过设置环境变量 ThreadPool_UsePortableThreadPool 为 0 可以设置成使用老的线程池实现。
https://github.com/dotnet/runtime/pull/43841/commits/b0d47b84a6845a70f011d1b0d3ce5adde9a4d7b7

本文以 .NET 6 runtime 源码作为学习材料,对线程池的设计进行介绍。从目前的理解上来看,其整体的设计与原来 C++ 的实现并没有特别大的出入。

注意:

  • 本文不涉及细节的代码实现,主要为大家介绍其整体设计。所展示的代码并非原封不动的源码,而是为了方便理解的简化版。
  • ThreadPool.SetMaxThreads(int workerThreads, int completionPortThreads) 中的 completionPortThreads 所相关的 IOCP线程池 是 .NET Framework 时代的遗留产物,用于管理 Windows 平台专有的 IOCP 的回调线程池。目前没看到有什么地方在用它了,completionPortThreads 这个参数也已经没有意义,底层IO库是自己维护的IO等待线程池。本文只涉及 worker thread 池的介绍。
  • 本文理解并不完整也不一定完全正确,有异议的地方欢迎留言讨论。
  • 为了解释问题,一部分代码会运行在 .NET 6 之前的环境中。

任务的调度

线程池的待执行任务被存放在一个队列系统中。这个系统包括一个 全局队列,以及绑定在每一个 Worker Thread 上 的 本地队列 。而线程池中的每一个线程都在执行 while(true) 的循环,从这个队列系统中领取并执行任务。

ThreadPool.QueueUserWorkItem 的重载方法 ThreadPool.QueueUserWorkItem(Action callBack, TState state, bool preferLocal) 里有一个 preferLocal 参数。

调用不带 preferLocal 参数的 ThreadPool.QueueUserWorkItem 方法重载,任务会被放到全局队列。

preferLocal 为 true 的时候,如果调用 ThreadPool.QueueUserWorkItem 代码的线程正好是个线程池里的某个线程,则该任务就会进入该线程的本地队列中。除此之外的情况则会被放到全局队列中等待未来被某个 Worker Thread 捡走。

在线程池外的线程中调用,不管 preferLocal 传的是什么,任务都会被放到全局队列。

基本调度单元

本地队列和全局队列的元素类型被定义为 object,实际的任务类型分为两类,在从队列系统取到任务之后会判断类型并执行对应的方法。

IThreadPoolWorkItem 实现类的实例。

 /// Represents a work item that can be executed by the ThreadPool. public interface IThreadPoolWorkItem { void Execute(); }

执行 Execute 方法也就代表着任务的执行。

IThreadPoolWorkItem 的具体实现有很多,例如通过 ThreadPool.QueueUserWorkItem(WaitCallback callBack) 传入的 callBack 委托实例会被包装到一个 QueueUserWorkItemCallback 实例里。QueueUserWorkItemCallbackIThreadPoolWorkItem 的实现类。

Task

 class Task { internal void InnerInvoke(); }

执行 InnerInvoke 会执行 Task 所包含的委托。

全局队列

全局队列 是由 ThreadPoolWorkQueue 维护的,同时它也是整个队列系统的入口,直接被 ThreadPool 所引用。

 public static class ThreadPool { internal static readonly ThreadPoolWorkQueue s_workQueue = new ThreadPoolWorkQueue(); public static bool QueueUserWorkItem(WaitCallback callBack, object state) { object tpcallBack = new QueueUserWorkItemCallback(callBack!, state); s_workQueue.Enqueue(tpcallBack, forceGlobal: true); return true; } } internal sealed class ThreadPoolWorkQueue { // 全局队列 internal readonly ConcurrentQueue workItems = new ConcurrentQueue(); // forceGlobal 为 true 时,push 到全局队列,否则就放到本地队列 public void Enqueue(object callback, bool forceGlobal); }

本地队列

线程池中的每一个线程都会绑定一个 ThreadPoolWorkQueueThreadLocals 实例,在 workStealingQueue 这个字段上保存着本地队列。

 internal sealed class ThreadPoolWorkQueueThreadLocals { // 绑定在线程池线程上 [ThreadStatic] public static ThreadPoolWorkQueueThreadLocals threadLocals; // 持有全局队列的引用,以便能在需要的时候将任务转移到全局队列上 public readonly ThreadPoolWorkQueue workQueue; // 本地队列的直接维护者 public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue; public readonly Thread currentThread; public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq) { workQueue = tpq; workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue(); // WorkStealingQueueList 会集中管理 workStealingQueue ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue); currentThread = Thread.CurrentThread; } // 提供将本地队列中的任务转移到全局队列中去的功能, // 当 ThreadPool 通过后文将会介绍的 HillClimbing 算法判断得出当前线程是多余的线程后, // 会调用此方法对任务进行转移 public void TransferLocalWork() { while (workStealingQueue.LocalPop() is object cb) { workQueue.Enqueue(cb, forceGlobal: true); } } ~ThreadPoolWorkQueueThreadLocals() { if (null != workStealingQueue) { // TransferLocalWork 真正的目的并非是为了在这里被调用,这边只是确保任务不会丢的 fallback 逻辑 TransferLocalWork(); ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue); } } }

偷窃机制

这里思考一个问题,为什么本地队列的名字会被叫做 WorkStealingQueue 呢?

所有 Worker ThreadWorkStealingQueue 都被集中在 WorkStealingQueueList 中。对线程池中其他所有线程可见。

Worker Threadwhile(true) 中优先会从自身的 WorkStealingQueue 中取任务。如果本地队列已经被清空,就会从全局队列中取任务。例如下图的 Thread1 取全局队列中领取了一个任务。

同时 Thread3 也没活干了,但是全局队列中的任务被 Thread1 抢走了。这时候就会去 从 Thread2 的本地队列中抢 Thread2 的活。

Worker Thread 的生命周期管理

接下来我们把格局放大,关注点从 Worker Thread 的打工日常转移到对它们的生命周期管理上来。

为了更方便的解释线程管理的机制,这边使用下面使用一些代码做演示。
代码参考自 https://devblogs.microsoft.com/dotnet/performance-improvements-in-net-6/。

线程注入实验

Task.Run 会将 Task 调度到线程池中执行,下面的示例代码中等效于 ThreadPool.QueueUserWorkItem(WaitCallback callBack),会把 Task 放到队列系统的全局队列中(顺便一提,如果在一个线程池线程中执行 Task.Run 会将 Task 调度到此线程池线程的本地队列中)。

.NET 5 实验一 默认线程池配置

 static void Main(string[] args) { var sw = Stopwatch.StartNew(); var tcs = new TaskCompletionSource(); var tasks = new List(); for (int i = 1; i <= Environment.ProcessorCount * 2; i++) { int id = i; Console.WriteLine($"Loop Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tasks.Add(Task.Run(() => { Console.WriteLine($"Task Id: {id:00} | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tcs.Task.Wait(); })); } tasks.Add(Task.Run(() => { Console.WriteLine($"Task SetResult | {sw.Elapsed.TotalSeconds:0.000} | Busy Threads: {GetBusyThreads()}"); tcs.SetResult(); })); Task.WaitAll(tasks.ToArray()); Console.WriteLine($"Done: | {sw.Elapsed.TotalSeconds:0.000}"); } static int GetBusyThreads() { ThreadPool.GetAvailableThreads(out var available, out _); ThreadPool.GetMaxThreads(out var max, out _); return max - available; }

首先在代码在 .NET 5 环境中运行以下代码,CPU 逻辑核心数 12。

 Loop Id: 01 | 0.000 | Busy Threads: 0 Loop Id: 02 | 0.112 | Busy Threads: 1 Loop Id: 03 | 0.112 | Busy Threads: 2 Loop Id: 04 | 0.113 | Busy Threads: 4 Loop Id: 05 | 0.113 | Busy Threads: 7 Loop Id: 06 | 0.113 | Busy Threads: 10 Loop Id: 07 | 0.113 | Busy Threads: 10 Task Id: 01 | 0.113 | Busy Threads: 11 Task Id: 02 | 0.113 | Busy Threads: 12 Task Id