浅析 .NET 中 AsyncLocal 的实现原理
2021-04-23 01:27
标签:另一个 delegate 基本 基础 blob tor dstar 静态类 sed 目录 对于写过 ASP.NET Core 的童鞋来说,可以通过 HttpContextAccessor 在 Controller 之外的地方获取到HttpContext,而它实现的关键其实是在于一个AsyncLocal 本文源码参考为发文时间点为止最新的 github 开源代码,和之前实现有些许不同,但设计思想基本一致。 代码库地址:https://github.com/dotnet/runtime 如果想要整个.NET程序中共享一个变量,我们可以将想要共享的变量放在某个类的静态属性上来实现。 而在多线程的运行环境中,则可能会希望能将这个变量的共享范围缩小到单个线程内。例如在web应用中,服务器为每个同时访问的请求分配一个独立的线程,我们要在这些独立的线程中维护自己的当前访问用户的信息时,就需要需要线程本地存储了。 例如下面这样一个例子。 输出结果: Thread:4; Value:这是来自线程4的数据 除了可以使用 ThreadStaticAttribute 外,我们还可以使用 ThreadLocal 输出结果: Use ThreadStaticAttribute; Thread:3; Value:这是来自线程3的数据 上面的例子都只是在同一个线程中对线程进行存和取,但日常开发的过程中,我们会有很多异步的场景,这些场景可能会导致执行代码的线程发生切换。 比如下面的例子 输出结果: ThreadStatic: 在线程发生了切换之后,只有 AsyncLocal 还能够保留原来的值,当然,.NET Framework 中的 CallContext 也可以实现这个需求,下面给出一个相对完整的总结。 我们主要对照 .NET Core 源码进行学习,源码地址:https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Threading/AsyncLocal.cs AsyncLocal 其内部代码相对简单 真正的数据存取是通过 需要注意的是这边通过 IAsyncLocal 这一接口实现了 AsyncLocal 与 ExcutionContext 的解耦。 ExcutionContext 只关注数据的存取本身,接口定义的类型都是 object,而不关心具体的类型 T。 在.NET 中,每个线程都关联着一个 执行上下文(execution context) 。 可以通过Thread.CurrentThread.ExecutionContext 属性进行访问,或者通过 ExecutionContext.Capture() 获取(前者的实现) 。 AsyncLocal 最终就是把数据保存在 ExecutionContext 上的,为了更深入地理解 AsyncLocal 我们需要先理解一下它。 源码地址:https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs ExecutionContext 被保存 Thread 的 internal 修饰的 _executionContext 字段上。但Thread.CurrentThread.ExecutionContext 并不直接暴露 _executionContext 而与 ExecutionContext.Capture() 共用一套逻辑。 下面是经过整理的 Thread 的与 ExecutionContext 相关的部分,Thread 属于部分类,_executionContext 字段定义在 Thread.CoreCLR.cs 文件中 在同一个线程中,所有 AsyncLocal 所保存的 Value 都保存在 ExecutionContext 的 m_localValues 字段上。 为了优化查找值时的性能,微软为 IAsyncLocalValueMap 提供了6个实现 随着 ExecutionContext 所关联的 AsyncLocal 数量的增加,IAsyncLocalValueMap 的实现将会在ExecutionContext的SetLocalValue方法中被不断替换。查询的时间复杂度和空间复杂度依次递增。代码的实现与 AsyncLocal 同属于 一个文件。当然元素数量减少时也会替换成之前的实现。 Map 的创建是以静态类 AsyncLocalValueMap 的 Create 方法作为创建的入口的。 此后每次更新元素时都必须调用 IAsyncLocalValueMap 实现类的 Set 方法,原实例是不会发生变化的,需保存 Set 的返回值。 接下来以 ThreeElementAsyncLocalValueMap 为例进行解释 需要注意的是这边会涉及到两个 Immutable 结构,一个是 ExecutionContext 本身,另一个是 IAsyncLocalValueMap 的实现类。同一个 key 前后两次 value 发生变化后,会产生新的 ExecutionContext 的实例和 IAsyncLocalMap 实现类实例(在 IAsyncLocalValueMap 实现类的 Set 方法中完成)。 值的获取实现相对简单 在线程发生切换的时候,ExecutionContext 会在前一个线程中被默认捕获,流向下一个线程,它所保存的数据也就随之流动。 在所有会发生线程切换的地方,基础类库(BCL) 都为我们封装好了对执行上下文的捕获。 例如: 输出结果: new Thread: AsyncLocal保存的数据 ExecutionContext 为我们提供了 SuppressFlow(禁止流动) 和 RestoreFlow (恢复流动)这两个静态方法来控制当前线程的执行上下文是否像辅助线程流动。并可以通过 IsFlowSuppressed 静态方法来进行判断。 输出结果: 默认: SuppressFlow: RestoreFlow: 需要注意的是,在线程A中创建线程B之前调用 ExecutionContext.SuppressFlow 只会影响 ExecutionContext 从线程A => 线程B的传递,线程B => 线程C 不受影响。 上面举例了四种场景,由于每一种场景的传递过程都比较复杂,目前先介绍其中一个。 但不管什么场景,都会涉及到 ExcutionContext 的 Run 方法。在Run 方法中会调用 RunInternal 方法, RunInternal 调用下面一个 RestoreChangedContextToThread 方法将 ExcutionContext.Run 方法传入的 ExcutionContext 赋值给当前线程的 _executionContext 字段。 这边可以分为三个步骤: 在 Thread 的 Start 方法中捕获当前的 ExecutionContext,将其传递给 Thread 的构造函数中实例化的 ThreadHelper 实例,ExecutionContext 会暂存在 ThreadHelper 的实例字段中,线程创建完成后会调用ExecutionContext.RunInternal 将其赋值给新创建的线程。 代码位置: https://github.com/dotnet/runtime/blob/5fca04171171f118bca0f93aa9741f205b8cdc29/src/coreclr/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs#L200 https://github.com/dotnet/runtime/blob/5fca04171171f118bca0f93aa9741f205b8cdc29/src/coreclr/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs#L26 AsyncLocal 本身不保存数据,数据保存在 ExecutionContext 实例的 m_localValues 的私有字段上,字段类型定义是 IAsyncLocalMap ,以 IAsyncLocal => object 的 Map 结构进行保存,且实现类型随着元素数量的变化而变化。 ExecutionContext 实例 保存在 Thread.CurrentThread._executionContext 上,实现与当前线程的关联。 对于 IAsyncLocalMap 的实现类,如果 AsyncLocal 注册了回调,value 传 null 不会被忽略。 没注册回调时分为两种情况:如果 key 存在,则做删除处理,map 类型可能出现降级。如果 key 不存在,则直接忽略。 ExecutionContext 和 IAsyncLocalMap 的实现类都被设计成不可变(immutable)。同一个 key 前后两次 value 发生变化后,会产生新的 ExecutionContext 的实例和 IAsyncLocalMap 实现类实例。 ExecutionContext 与当前线程绑定,默认流动到辅助线程,可以禁止流动和恢复流动,且禁止流动仅影响当前线程向其辅助线程的传递,不影响后续。 浅析 .NET 中 AsyncLocal 的实现原理 标签:另一个 delegate 基本 基础 blob tor dstar 静态类 sed 原文地址:https://www.cnblogs.com/blurhkh/p/12240767.html
前言
public class HttpContextAccessor : IHttpContextAccessor
{
private static AsyncLocal
1、线程本地存储
class Program
{
[ThreadStatic]
private static string _value;
static void Main(string[] args)
{
Parallel.For(0, 4, _ =>
{
var threadId = Thread.CurrentThread.ManagedThreadId;
_value ??= $"这是来自线程{threadId}的数据";
Console.WriteLine($"Thread:{threadId}; Value:{_value}");
});
}
}
Thread:1; Value:这是来自线程1的数据
Thread:5; Value:这是来自线程5的数据
Thread:6; Value:这是来自线程6的数据class Program
{
[ThreadStatic]
private static string _threadStatic;
private static ThreadLocal
Use ThreadStaticAttribute; Thread:4; Value:这是来自线程4的数据
Use ThreadStaticAttribute; Thread:1; Value:这是来自线程1的数据
Use CallContext; Thread:1; Value:这是来自线程1的数据
Use ThreadLocal; Thread:1; Value:这是来自线程1的数据
Use AsyncLocal; Thread:1; Value:这是来自线程1的数据
Use ThreadStaticAttribute; Thread:5; Value:这是来自线程5的数据
Use CallContext; Thread:5; Value:这是来自线程5的数据
Use ThreadLocal; Thread:5; Value:这是来自线程5的数据
Use AsyncLocal; Thread:5; Value:这是来自线程5的数据
Use CallContext; Thread:3; Value:这是来自线程3的数据
Use CallContext; Thread:4; Value:这是来自线程4的数据
Use ThreadLocal; Thread:4; Value:这是来自线程4的数据
Use AsyncLocal; Thread:4; Value:这是来自线程4的数据
Use ThreadLocal; Thread:3; Value:这是来自线程3的数据
Use AsyncLocal; Thread:3; Value:这是来自线程3的数据class Program
{
[ThreadStatic]
private static string _threadStatic;
private static ThreadLocal
ThreadLocal:
AsyncLocal: AsyncLocal保存的数据
实现方式
.NET FrameWork 可用
.NET Core 可用
是否支持数据向辅助线程的
ThreadStaticAttribute
是
是
否
ThreadLocal
是
是
否
CallContext.SetData(string name, object data)
是
否
仅当参数 data 对应的类型实现了 ILogicalThreadAffinative 接口时支持
CallContext.LogicalSetData(string name, object data)
是
否
是
AsyncLocal
是
是
是
2、AsyncLocal 实现
2.1、主体 AsyncLocal
public sealed class AsyncLocal
ExecutionContext.GetLocalValue
和 ExecutionContext.SetLocalValue
实现的。public class ExecutionContext
{
internal static object? GetLocalValue(IAsyncLocal local);
internal static void SetLocalValue(
IAsyncLocal local,
object? newValue,
bool needChangeNotifications);
}
2.2、AsyncLocal
2.2.1、 ExecutionContext 与 线程的绑定关系
class ExecutionContext
{
public static ExecutionContext? Capture()
{
ExecutionContext? executionContext = Thread.CurrentThread._executionContext;
if (executionContext == null)
{
executionContext = Default;
}
else if (executionContext.m_isFlowSuppressed)
{
executionContext = null;
}
return executionContext;
}
}
class Thread
{
// 保存当前线程所关联的 执行上下文
internal ExecutionContext? _executionContext;
[ThreadStatic]
private static Thread? t_currentThread;
public static Thread CurrentThread => t_currentThread ?? InitializeCurrentThread();
public ExecutionContext? ExecutionContext => ExecutionContext.Capture();
}
2.2.2、ExecutionContext 的私有变量
public sealed class ExecutionContext : IDisposable, ISerializable
{
// 默认执行上下文
internal static readonly ExecutionContext Default = new ExecutionContext(isDefault: true);
// 执行上下文禁止流动后的默认上下文
internal static readonly ExecutionContext DefaultFlowSuppressed = new ExecutionContext(AsyncLocalValueMap.Empty, Array.Empty
2.2.3、IAsyncLocalValueMap 接口及其实现
public class ExecutionContext
{
private readonly IAsyncLocalValueMap m_localValues;
}
类型
元素个数
EmptyAsyncLocalValueMap
0
OneElementAsyncLocalValueMap
1
TwoElementAsyncLocalValueMap
2
ThreeElementAsyncLocalValueMap
3
MultiElementAsyncLocalValueMap
4 ~ 16
ManyElementAsyncLocalValueMap
> 16
// 这个接口是用来在 ExecutionContext 中保存 IAsyncLocal => object 的映射关系。
// 其实现被设定为不可变的(immutable),随着元素的数量增加而变化,空间复杂度和时间复杂度也随之增加。
internal interface IAsyncLocalValueMap
{
bool TryGetValue(IAsyncLocal key, out object? value);
// 通过此方法新增 AsyncLocal 或修改现有的 AsyncLocal
// 如果数量无变化,返回同类型的 IAsyncLocalValueMap 实现类实例
// 如果数量发生变化(增加或减少,将value设值为null时会减少),则可能返回不同类型的 IAsyncLocalValueMap 实现类实例
IAsyncLocalValueMap Set(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent);
}
internal static class AsyncLocalValueMap
{
// EmptyAsyncLocalValueMap 设计上只在这边实例化,其他地方当作常量使用
public static IAsyncLocalValueMap Empty { get; } = new EmptyAsyncLocalValueMap();
public static bool IsEmpty(IAsyncLocalValueMap asyncLocalValueMap)
{
Debug.Assert(asyncLocalValueMap != null);
Debug.Assert(asyncLocalValueMap == Empty || asyncLocalValueMap.GetType() != typeof(EmptyAsyncLocalValueMap));
return asyncLocalValueMap == Empty;
}
public static IAsyncLocalValueMap Create(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent)
{
// 创建最初的实例
// 如果 AsyncLocal 注册了回调,则需要保存 null 的 Value,以便下次设置非null的值时因为值发生变化而触发回调
return value != null || !treatNullValueAsNonexistent ?
new OneElementAsyncLocalValueMap(key, value) :
Empty;
}
}
private sealed class ThreeElementAsyncLocalValueMap : IAsyncLocalValueMap
{
// 申明三个私有字段保存 key
private readonly IAsyncLocal _key1, _key2, _key3;
// 申明三个私有字段保存
private readonly object? _value1, _value2, _value3;
public ThreeElementAsyncLocalValueMap(IAsyncLocal key1, object? value1, IAsyncLocal key2, object? value2, IAsyncLocal key3, object? value3)
{
_key1 = key1; _value1 = value1;
_key2 = key2; _value2 = value2;
_key3 = key3; _value3 = value3;
}
public IAsyncLocalValueMap Set(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent)
{
// 如果 AsyncLocal 注册过回调,treatNullValueAsNonexistent 的值是 false,
// 意思是就算 value 是 null,也认为它是有效的
if (value != null || !treatNullValueAsNonexistent)
{
// 如果现在的 map 已经保存过传入的 key ,则返回一个更新了 value 值的新 map 实例
if (ReferenceEquals(key, _key1)) return new ThreeElementAsyncLocalValueMap(key, value, _key2, _value2, _key3, _value3);
if (ReferenceEquals(key, _key2)) return new ThreeElementAsyncLocalValueMap(_key1, _value1, key, value, _key3, _value3);
if (ReferenceEquals(key, _key3)) return new ThreeElementAsyncLocalValueMap(_key1, _value1, _key2, _value2, key, value);
// 如果当前Key不存在map里,则需要一个能存放第四个key的map
var multi = new MultiElementAsyncLocalValueMap(4);
multi.UnsafeStore(0, _key1, _value1);
multi.UnsafeStore(1, _key2, _value2);
multi.UnsafeStore(2, _key3, _value3);
multi.UnsafeStore(3, key, value);
return multi;
}
else
{
// value 是 null,对应的 key 会被忽略或者从 map 中去除,这边会有两种情况
// 1、如果当前的 key 存在于 map 当中,则将这个 key 去除,map 类型降级为 TwoElementAsyncLocalValueMap
return
ReferenceEquals(key, _key1) ? new TwoElementAsyncLocalValueMap(_key2, _value2, _key3, _value3) :
ReferenceEquals(key, _key2) ? new TwoElementAsyncLocalValueMap(_key1, _value1, _key3, _value3) :
ReferenceEquals(key, _key3) ? new TwoElementAsyncLocalValueMap(_key1, _value1, _key2, _value2) :
// 2、当前 key 不存在于 map 中,则会被直接忽略
(IAsyncLocalValueMap)this;
}
}
// 至多对比三次就能找到对应的 value
public bool TryGetValue(IAsyncLocal key, out object? value)
{
if (ReferenceEquals(key, _key1))
{
value = _value1;
return true;
}
else if (ReferenceEquals(key, _key2))
{
value = _value2;
return true;
}
else if (ReferenceEquals(key, _key3))
{
value = _value3;
return true;
}
else
{
value = null;
return false;
}
}
}
2.2.4、ExecutionContext - SetLocalValue
internal static void SetLocalValue(IAsyncLocal local, object? newValue, bool needChangeNotifications)
{
// 获取当前执行上下文
ExecutionContext? current = Thread.CurrentThread._executionContext;
object? previousValue = null;
bool hadPreviousValue = false;
if (current != null)
{
Debug.Assert(!current.IsDefault);
Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context");
// 判断当前作为 Key 的 AsyncLocal 是否已经有对应的 Value
hadPreviousValue = current.m_localValues.TryGetValue(local, out previousValue);
}
// 如果前后两次 Value 没有发生变化,则继续处理
if (previousValue == newValue)
{
return;
}
// 对于 treatNullValueAsNonexistent: !needChangeNotifications 的说明
// 如果 AsyncLocal 注册了回调,则 needChangeNotifications 为 ture,m_localValues 会保存 null 值以便下次触发change回调
IAsyncLocal[]? newChangeNotifications = null;
IAsyncLocalValueMap newValues;
bool isFlowSuppressed = false;
if (current != null)
{
Debug.Assert(!current.IsDefault);
Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context");
isFlowSuppressed = current.m_isFlowSuppressed;
// 这一步很关键,通过调用 m_localValues.Set 对 map 进行修改,这会产生一个新的 map 实例。
newValues = current.m_localValues.Set(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
newChangeNotifications = current.m_localChangeNotifications;
}
else
{
// 如果当前上下文不存在,创建第一个 IAsyncLocalValueMap 实例
newValues = AsyncLocalValueMap.Create(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
}
// 如果 AsyncLocal 注册了回调,则需要保存 AsyncLocal 的引用
// 这边会有两种情况,一个是数组未创建过,一个是数组已存在
if (needChangeNotifications)
{
if (hadPreviousValue)
{
Debug.Assert(newChangeNotifications != null);
Debug.Assert(Array.IndexOf(newChangeNotifications, local) >= 0);
}
else if (newChangeNotifications == null)
{
newChangeNotifications = new IAsyncLocal[1] { local };
}
else
{
int newNotificationIndex = newChangeNotifications.Length;
// 这个方法会创建一个新数组并将原来的元素拷贝过去
Array.Resize(ref newChangeNotifications, newNotificationIndex + 1);
newChangeNotifications[newNotificationIndex] = local;
}
}
// 如果 AsyncLocal 存在有效值,且允许执行上下文流动,则创建新的 ExecutionContext实例,新实例会保存所有的AsyncLocal的值和所有需要通知的 AsyncLocal 引用。
Thread.CurrentThread._executionContext =
(!isFlowSuppressed && AsyncLocalValueMap.IsEmpty(newValues)) ?
null : // No values, return to Default context
new ExecutionContext(newValues, newChangeNotifications, isFlowSuppressed);
if (needChangeNotifications)
{
// 调用先前注册好的委托
local.OnValueChanged(previousValue, newValue, contextChanged: false);
}
}
2.2.5、ExecutionContext - GetLocalValue
internal static object? GetLocalValue(IAsyncLocal local)
{
ExecutionContext? current = Thread.CurrentThread._executionContext;
if (current == null)
{
return null;
}
Debug.Assert(!current.IsDefault);
Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context");
current.m_localValues.TryGetValue(local, out object? value);
return value;
}
3、ExecutionContext 的流动
class Program
{
static AsyncLocal
ThreadPool.QueueUserWorkItem: AsyncLocal保存的数据
Task.Run: AsyncLocal保存的数据
after await: AsyncLocal保存的数据3.1、流动的禁止和恢复
class Program
{
static AsyncLocal
new Thread: AsyncLocal保存的数据
ThreadPool.QueueUserWorkItem: AsyncLocal保存的数据
Task.Run: AsyncLocal保存的数据
after await: AsyncLocal保存的数据
new Thread:
ThreadPool.QueueUserWorkItem:
Task.Run:
after await:
new Thread: AsyncLocal保存的数据
ThreadPool.QueueUserWorkItem: AsyncLocal保存的数据
Task.Run: AsyncLocal保存的数据
after await: AsyncLocal保存的数据class Program
{
static AsyncLocal
3.2、ExcutionContext 的流动实现
public static void Run(ExecutionContext executionContext, ContextCallback callback, object? state)
{
if (executionContext == null)
{
ThrowNullContext();
}
// 内部会调用 RestoreChangedContextToThread 方法
RunInternal(executionContext, callback, state);
}
internal static void RestoreChangedContextToThread(Thread currentThread, ExecutionContext? contextToRestore, ExecutionContext? currentContext)
{
Debug.Assert(currentThread == Thread.CurrentThread);
Debug.Assert(contextToRestore != currentContext);
// 在这边把之前的 ExecutionContext 赋值给了当前线程
currentThread._executionContext = contextToRestore;
if ((currentContext != null && currentContext.HasChangeNotifications) ||
(contextToRestore != null && contextToRestore.HasChangeNotifications))
{
OnValuesChanged(currentContext, contextToRestore);
}
}
3.2.1、new Thread(ThreadStart start).Start() 为例说明 ExecutionContext 的流动
public void Start()
{
#if FEATURE_COMINTEROP_APARTMENT_SUPPORT
// Eagerly initialize the COM Apartment state of the thread if we're allowed to.
StartupSetApartmentStateInternal();
#endif // FEATURE_COMINTEROP_APARTMENT_SUPPORT
// Attach current thread's security principal object to the new
// thread. Be careful not to bind the current thread to a principal
// if it's not already bound.
if (_delegate != null)
{
// If we reach here with a null delegate, something is broken. But we'll let the StartInternal method take care of
// reporting an error. Just make sure we don't try to dereference a null delegate.
Debug.Assert(_delegate.Target is ThreadHelper);
// 由于 _delegate 指向 ThreadHelper 的实例方法,所以 _delegate.Target 指向 ThreadHelper 实例。
var t = (ThreadHelper)_delegate.Target;
ExecutionContext? ec = ExecutionContext.Capture();
t.SetExecutionContextHelper(ec);
}
StartInternal();
}
class ThreadHelper
{
internal ThreadHelper(Delegate start)
{
_start = start;
}
internal void SetExecutionContextHelper(ExecutionContext? ec)
{
_executionContext = ec;
}
// 这个方法是对 Thread 构造函数传入的委托的包装
internal void ThreadStart()
{
Debug.Assert(_start is ThreadStart);
ExecutionContext? context = _executionContext;
if (context != null)
{
// 将 ExecutionContext 与 CurrentThread 进行绑定
ExecutionContext.RunInternal(context, s_threadStartContextCallback, this);
}
else
{
InitializeCulture();
((ThreadStart)_start)();
}
}
}
4、总结
5、参考
文章标题:浅析 .NET 中 AsyncLocal 的实现原理
文章链接:http://soscw.com/index.php/essay/78324.html