浅析 .NET 中 AsyncLocal 的实现原理

2021-04-23 01:27

阅读:768

标签:另一个   delegate   基本   基础   blob   tor   dstar   静态类   sed   

目录

  • 前言
  • 1、线程本地存储
  • 2、AsyncLocal 实现
    • 2.1、主体 AsyncLocal
    • 2.2、AsyncLocal 在 ExecutionContext 中的数据存取实现
      • 2.2.1、 ExecutionContext 与 线程的绑定关系
      • 2.2.2、ExecutionContext 的私有变量
      • 2.2.3、IAsyncLocalValueMap 接口及其实现
      • 2.2.4、ExecutionContext - SetLocalValue
      • 2.2.5、ExecutionContext - GetLocalValue
  • 3、ExecutionContext 的流动
    • 3.1、流动的禁止和恢复
    • 3.2、ExcutionContext 的流动实现
      • 3.2.1、new Thread(ThreadStart start).Start() 为例说明 ExecutionContext 的流动
  • 4、总结
  • 5、参考

前言

对于写过 ASP.NET Core 的童鞋来说,可以通过 HttpContextAccessor 在 Controller 之外的地方获取到HttpContext,而它实现的关键其实是在于一个AsyncLocal 类型的静态字段。接下来就和大家来一起探讨下这个 AsyncLocal 的具体实现原理。如果有讲得不清晰或不准确的地方,还望指出。

public class HttpContextAccessor : IHttpContextAccessor
{
    private static AsyncLocal _httpContextCurrent = new AsyncLocal();

    // 其他代码这里不展示
}

本文源码参考为发文时间点为止最新的 github 开源代码,和之前实现有些许不同,但设计思想基本一致。

代码库地址:https://github.com/dotnet/runtime

1、线程本地存储

如果想要整个.NET程序中共享一个变量,我们可以将想要共享的变量放在某个类的静态属性上来实现。

而在多线程的运行环境中,则可能会希望能将这个变量的共享范围缩小到单个线程内。例如在web应用中,服务器为每个同时访问的请求分配一个独立的线程,我们要在这些独立的线程中维护自己的当前访问用户的信息时,就需要需要线程本地存储了。

例如下面这样一个例子。

class Program
{
    [ThreadStatic]
    private static string _value;
    static void Main(string[] args)
    {
        Parallel.For(0, 4, _ =>
        {
            var threadId = Thread.CurrentThread.ManagedThreadId;

            _value ??= $"这是来自线程{threadId}的数据";
            Console.WriteLine($"Thread:{threadId}; Value:{_value}");
        });
    }
}

输出结果:

Thread:4; Value:这是来自线程4的数据
Thread:1; Value:这是来自线程1的数据
Thread:5; Value:这是来自线程5的数据
Thread:6; Value:这是来自线程6的数据

除了可以使用 ThreadStaticAttribute 外,我们还可以使用 ThreadLocalCallContextAsyncLocal 来实现一样的功能。由于 .NET Core 不再实现 CallContext,所以下列代码只能在 .NET Framework 中执行。

class Program
{
    [ThreadStatic]
    private static string _threadStatic;
    private static ThreadLocal _threadLocal = new ThreadLocal();
    private static AsyncLocal _asyncLocal = new AsyncLocal();
    static void Main(string[] args)
    {
        Parallel.For(0, 4, _ =>
        {
            var threadId = Thread.CurrentThread.ManagedThreadId;

            var value = $"这是来自线程{threadId}的数据";
            _threadStatic ??= value;
            CallContext.SetData("value", value);
            _threadLocal.Value ??= value;
            _asyncLocal.Value ??= value;
            Console.WriteLine($"Use ThreadStaticAttribute; Thread:{threadId}; Value:{_threadStatic}");
            Console.WriteLine($"Use CallContext;           Thread:{threadId}; Value:{CallContext.GetData("value")}");
            Console.WriteLine($"Use ThreadLocal;           Thread:{threadId}; Value:{_threadLocal.Value}");
            Console.WriteLine($"Use AsyncLocal;            Thread:{threadId}; Value:{_asyncLocal.Value}");
        });

        Console.Read();
    }
}

输出结果:

Use ThreadStaticAttribute; Thread:3; Value:这是来自线程3的数据
Use ThreadStaticAttribute; Thread:4; Value:这是来自线程4的数据
Use ThreadStaticAttribute; Thread:1; Value:这是来自线程1的数据
Use CallContext; Thread:1; Value:这是来自线程1的数据
Use ThreadLocal; Thread:1; Value:这是来自线程1的数据
Use AsyncLocal; Thread:1; Value:这是来自线程1的数据
Use ThreadStaticAttribute; Thread:5; Value:这是来自线程5的数据
Use CallContext; Thread:5; Value:这是来自线程5的数据
Use ThreadLocal; Thread:5; Value:这是来自线程5的数据
Use AsyncLocal; Thread:5; Value:这是来自线程5的数据
Use CallContext; Thread:3; Value:这是来自线程3的数据
Use CallContext; Thread:4; Value:这是来自线程4的数据
Use ThreadLocal; Thread:4; Value:这是来自线程4的数据
Use AsyncLocal; Thread:4; Value:这是来自线程4的数据
Use ThreadLocal; Thread:3; Value:这是来自线程3的数据
Use AsyncLocal; Thread:3; Value:这是来自线程3的数据

上面的例子都只是在同一个线程中对线程进行存和取,但日常开发的过程中,我们会有很多异步的场景,这些场景可能会导致执行代码的线程发生切换。

比如下面的例子

class Program
{
    [ThreadStatic]
    private static string _threadStatic;
    private static ThreadLocal _threadLocal = new ThreadLocal();
    private static AsyncLocal _asyncLocal = new AsyncLocal();
    static void Main(string[] args)
    {
        _threadStatic = "ThreadStatic保存的数据";
        _threadLocal.Value = "ThreadLocal保存的数据";
        _asyncLocal.Value = "AsyncLocal保存的数据";
        PrintValuesInAnotherThread();
        Console.ReadKey();
    }

    private static void PrintValuesInAnotherThread()
    {
        Task.Run(() =>
        {
            Console.WriteLine($"ThreadStatic: {_threadStatic}");
            Console.WriteLine($"ThreadLocal: {_threadLocal.Value}");
            Console.WriteLine($"AsyncLocal: {_asyncLocal.Value}");
        });
    }
}

输出结果:

ThreadStatic:
ThreadLocal:
AsyncLocal: AsyncLocal保存的数据

在线程发生了切换之后,只有 AsyncLocal 还能够保留原来的值,当然,.NET Framework 中的 CallContext 也可以实现这个需求,下面给出一个相对完整的总结。

实现方式 .NET FrameWork 可用 .NET Core 可用 是否支持数据向辅助线程的
ThreadStaticAttribute
ThreadLocal
CallContext.SetData(string name, object data) 仅当参数 data 对应的类型实现了 ILogicalThreadAffinative 接口时支持
CallContext.LogicalSetData(string name, object data)
AsyncLocal

2、AsyncLocal 实现

我们主要对照 .NET Core 源码进行学习,源码地址:https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Threading/AsyncLocal.cs

2.1、主体 AsyncLocal

AsyncLocal 为我们提供了两个功能

  • 通过 Value 属性存取值
  • 通过构造函数注册回调函数监听任意线程中对值做出的改动,需记着这个功能,后面介绍源码的时候会有很多地方涉及

其内部代码相对简单

public sealed class AsyncLocal : IAsyncLocal
{
    private readonly Action>? m_valueChangedHandler;
    
    // 无参构造
    public AsyncLocal()
    {
    }
    
    // 可以注册回调的构造函数,当 Value 在任意线程被改动,将调用回调
    public AsyncLocal(Action>? valueChangedHandler)
    {
        m_valueChangedHandler = valueChangedHandler;
    }
    
    [MaybeNull]
    public T Value
    {
        get
        {
            // 从 ExecutionContext 中以自身为 Key 获取值
            object? obj = ExecutionContext.GetLocalValue(this);
            return (obj == null) ? default : (T)obj;
        }
        // 是否注册回调将回影响到 ExecutionContext 是否保存其引用
        set => ExecutionContext.SetLocalValue(this, value, m_valueChangedHandler != null);
    }
    
    // 在 ExecutionContext 如果判断到值发生了变化,此方法将被调用
    void IAsyncLocal.OnValueChanged(object? previousValueObj, object? currentValueObj, bool contextChanged)
    {
        Debug.Assert(m_valueChangedHandler != null);
        T previousValue = previousValueObj == null ? default! : (T)previousValueObj;
        T currentValue = currentValueObj == null ? default! : (T)currentValueObj;
        m_valueChangedHandler(new AsyncLocalValueChangedArgs(previousValue, currentValue, contextChanged));
    }
}

internal interface IAsyncLocal
{
    void OnValueChanged(object? previousValue, object? currentValue, bool contextChanged);
}

真正的数据存取是通过 ExecutionContext.GetLocalValueExecutionContext.SetLocalValue 实现的。

public class ExecutionContext
{
    internal static object? GetLocalValue(IAsyncLocal local);
    internal static void SetLocalValue(
        IAsyncLocal local,
        object? newValue,
        bool needChangeNotifications);
}

需要注意的是这边通过 IAsyncLocal 这一接口实现了 AsyncLocalExcutionContext 的解耦。 ExcutionContext 只关注数据的存取本身,接口定义的类型都是 object,而不关心具体的类型 T

2.2、AsyncLocal 在 ExecutionContext 中的数据存取实现

在.NET 中,每个线程都关联着一个 执行上下文(execution context) 。 可以通过Thread.CurrentThread.ExecutionContext 属性进行访问,或者通过 ExecutionContext.Capture() 获取(前者的实现) 。

AsyncLocal 最终就是把数据保存在 ExecutionContext 上的,为了更深入地理解 AsyncLocal 我们需要先理解一下它。

源码地址:https://github.com/dotnet/runtime/blob/master/src/libraries/System.Private.CoreLib/src/System/Threading/ExecutionContext.cs

2.2.1、 ExecutionContext 与 线程的绑定关系

ExecutionContext 被保存 Thread 的 internal 修饰的 _executionContext 字段上。但Thread.CurrentThread.ExecutionContext 并不直接暴露 _executionContext 而与 ExecutionContext.Capture() 共用一套逻辑。

class ExecutionContext
{
    public static ExecutionContext? Capture()
    {
        ExecutionContext? executionContext = Thread.CurrentThread._executionContext;
        if (executionContext == null)
        {
            executionContext = Default;
        }
        else if (executionContext.m_isFlowSuppressed)
        {
            executionContext = null;
        }

        return executionContext;
    }
}

下面是经过整理的 Thread 的与 ExecutionContext 相关的部分,Thread 属于部分类,_executionContext 字段定义在 Thread.CoreCLR.cs 文件中

class Thread
{
    // 保存当前线程所关联的 执行上下文
    internal ExecutionContext? _executionContext;

    [ThreadStatic]
    private static Thread? t_currentThread;
    
    public static Thread CurrentThread => t_currentThread ?? InitializeCurrentThread();
    
    public ExecutionContext? ExecutionContext => ExecutionContext.Capture();
}

2.2.2、ExecutionContext 的私有变量

public sealed class ExecutionContext : IDisposable, ISerializable
{
    // 默认执行上下文
    internal static readonly ExecutionContext Default = new ExecutionContext(isDefault: true);
    // 执行上下文禁止流动后的默认上下文
    internal static readonly ExecutionContext DefaultFlowSuppressed = new ExecutionContext(AsyncLocalValueMap.Empty, Array.Empty(), isFlowSuppressed: true);
    // 保存所有注册了修改回调的 AsyncLocal 的 Value 值,本文暂不涉及对此字段的具体讨论
    private readonly IAsyncLocalValueMap? m_localValues;
    // 保存所有注册了回调的 AsyncLocal 的对象引用
    private readonly IAsyncLocal[]? m_localChangeNotifications;
    // 当前线程是否禁止上下文流动
    private readonly bool m_isFlowSuppressed;
    // 当前上下文是否是默认上下文
    private readonly bool m_isDefault;
}

2.2.3、IAsyncLocalValueMap 接口及其实现

在同一个线程中,所有 AsyncLocal 所保存的 Value 都保存在 ExecutionContextm_localValues 字段上。

public class ExecutionContext
{
    private readonly IAsyncLocalValueMap m_localValues;
}

为了优化查找值时的性能,微软为 IAsyncLocalValueMap 提供了6个实现

类型 元素个数
EmptyAsyncLocalValueMap 0
OneElementAsyncLocalValueMap 1
TwoElementAsyncLocalValueMap 2
ThreeElementAsyncLocalValueMap 3
MultiElementAsyncLocalValueMap 4 ~ 16
ManyElementAsyncLocalValueMap > 16

随着 ExecutionContext 所关联的 AsyncLocal 数量的增加,IAsyncLocalValueMap 的实现将会在ExecutionContext的SetLocalValue方法中被不断替换。查询的时间复杂度和空间复杂度依次递增。代码的实现与 AsyncLocal 同属于 一个文件。当然元素数量减少时也会替换成之前的实现。

// 这个接口是用来在 ExecutionContext 中保存 IAsyncLocal => object 的映射关系。
// 其实现被设定为不可变的(immutable),随着元素的数量增加而变化,空间复杂度和时间复杂度也随之增加。
internal interface IAsyncLocalValueMap
{
    bool TryGetValue(IAsyncLocal key, out object? value);
    // 通过此方法新增 AsyncLocal 或修改现有的 AsyncLocal
    // 如果数量无变化,返回同类型的 IAsyncLocalValueMap 实现类实例
    // 如果数量发生变化(增加或减少,将value设值为null时会减少),则可能返回不同类型的 IAsyncLocalValueMap 实现类实例
    IAsyncLocalValueMap Set(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent);
}

Map 的创建是以静态类 AsyncLocalValueMap 的 Create 方法作为创建的入口的。

internal static class AsyncLocalValueMap
{
    // EmptyAsyncLocalValueMap 设计上只在这边实例化,其他地方当作常量使用
    public static IAsyncLocalValueMap Empty { get; } = new EmptyAsyncLocalValueMap();

    public static bool IsEmpty(IAsyncLocalValueMap asyncLocalValueMap)
    {
        Debug.Assert(asyncLocalValueMap != null);
        Debug.Assert(asyncLocalValueMap == Empty || asyncLocalValueMap.GetType() != typeof(EmptyAsyncLocalValueMap));

        return asyncLocalValueMap == Empty;
    }

    public static IAsyncLocalValueMap Create(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent)
    {
        // 创建最初的实例
        // 如果 AsyncLocal 注册了回调,则需要保存 null 的 Value,以便下次设置非null的值时因为值发生变化而触发回调
        return value != null || !treatNullValueAsNonexistent ?
            new OneElementAsyncLocalValueMap(key, value) :
            Empty;
    }
}

此后每次更新元素时都必须调用 IAsyncLocalValueMap 实现类的 Set 方法,原实例是不会发生变化的,需保存 Set 的返回值。

接下来以 ThreeElementAsyncLocalValueMap 为例进行解释

private sealed class ThreeElementAsyncLocalValueMap : IAsyncLocalValueMap
{
    // 申明三个私有字段保存 key
    private readonly IAsyncLocal _key1, _key2, _key3;
    // 申明三个私有字段保存
    private readonly object? _value1, _value2, _value3;

    public ThreeElementAsyncLocalValueMap(IAsyncLocal key1, object? value1, IAsyncLocal key2, object? value2, IAsyncLocal key3, object? value3)
    {
        _key1 = key1; _value1 = value1;
        _key2 = key2; _value2 = value2;
        _key3 = key3; _value3 = value3;
    }

    public IAsyncLocalValueMap Set(IAsyncLocal key, object? value, bool treatNullValueAsNonexistent)
    {
        // 如果 AsyncLocal 注册过回调,treatNullValueAsNonexistent 的值是 false,
        // 意思是就算 value 是 null,也认为它是有效的
        if (value != null || !treatNullValueAsNonexistent)
        {
            // 如果现在的 map 已经保存过传入的 key ,则返回一个更新了 value 值的新 map 实例
            if (ReferenceEquals(key, _key1)) return new ThreeElementAsyncLocalValueMap(key, value, _key2, _value2, _key3, _value3);
            if (ReferenceEquals(key, _key2)) return new ThreeElementAsyncLocalValueMap(_key1, _value1, key, value, _key3, _value3);
            if (ReferenceEquals(key, _key3)) return new ThreeElementAsyncLocalValueMap(_key1, _value1, _key2, _value2, key, value);

            // 如果当前Key不存在map里,则需要一个能存放第四个key的map
            var multi = new MultiElementAsyncLocalValueMap(4);
            multi.UnsafeStore(0, _key1, _value1);
            multi.UnsafeStore(1, _key2, _value2);
            multi.UnsafeStore(2, _key3, _value3);
            multi.UnsafeStore(3, key, value);
            return multi;
        }
        else
        {
            // value 是 null,对应的 key 会被忽略或者从 map 中去除,这边会有两种情况
            // 1、如果当前的 key 存在于 map 当中,则将这个 key 去除,map 类型降级为 TwoElementAsyncLocalValueMap
            return
                ReferenceEquals(key, _key1) ? new TwoElementAsyncLocalValueMap(_key2, _value2, _key3, _value3) :
                ReferenceEquals(key, _key2) ? new TwoElementAsyncLocalValueMap(_key1, _value1, _key3, _value3) :
                ReferenceEquals(key, _key3) ? new TwoElementAsyncLocalValueMap(_key1, _value1, _key2, _value2) :
                // 2、当前 key 不存在于 map 中,则会被直接忽略
                (IAsyncLocalValueMap)this;
        }
    }

    // 至多对比三次就能找到对应的 value
    public bool TryGetValue(IAsyncLocal key, out object? value)
    {
        if (ReferenceEquals(key, _key1))
        {
            value = _value1;
            return true;
        }
        else if (ReferenceEquals(key, _key2))
        {
            value = _value2;
            return true;
        }
        else if (ReferenceEquals(key, _key3))
        {
            value = _value3;
            return true;
        }
        else
        {
            value = null;
            return false;
        }
    }
}

2.2.4、ExecutionContext - SetLocalValue

需要注意的是这边会涉及到两个 Immutable 结构,一个是 ExecutionContext 本身,另一个是 IAsyncLocalValueMap 的实现类。同一个 key 前后两次 value 发生变化后,会产生新的 ExecutionContext 的实例和 IAsyncLocalMap 实现类实例(在 IAsyncLocalValueMap 实现类的 Set 方法中完成)。

internal static void SetLocalValue(IAsyncLocal local, object? newValue, bool needChangeNotifications)
{
    // 获取当前执行上下文
    ExecutionContext? current = Thread.CurrentThread._executionContext;

    object? previousValue = null;
    bool hadPreviousValue = false;
    if (current != null)
    {
        Debug.Assert(!current.IsDefault);
        Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context");
        
        // 判断当前作为 Key 的 AsyncLocal 是否已经有对应的 Value 
        hadPreviousValue = current.m_localValues.TryGetValue(local, out previousValue);
    }

    // 如果前后两次 Value 没有发生变化,则继续处理
    if (previousValue == newValue)
    {
        return;
    }

    // 对于 treatNullValueAsNonexistent: !needChangeNotifications 的说明
    // 如果 AsyncLocal 注册了回调,则 needChangeNotifications 为 ture,m_localValues 会保存 null 值以便下次触发change回调
    IAsyncLocal[]? newChangeNotifications = null;
    IAsyncLocalValueMap newValues;
    bool isFlowSuppressed = false;
    if (current != null)
    {
        Debug.Assert(!current.IsDefault);
        Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context");

        isFlowSuppressed = current.m_isFlowSuppressed;
        // 这一步很关键,通过调用 m_localValues.Set 对 map 进行修改,这会产生一个新的 map 实例。
        newValues = current.m_localValues.Set(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
        newChangeNotifications = current.m_localChangeNotifications;
    }
    else
    {
        // 如果当前上下文不存在,创建第一个 IAsyncLocalValueMap 实例
        newValues = AsyncLocalValueMap.Create(local, newValue, treatNullValueAsNonexistent: !needChangeNotifications);
    }

    // 如果 AsyncLocal 注册了回调,则需要保存 AsyncLocal 的引用
    // 这边会有两种情况,一个是数组未创建过,一个是数组已存在
    if (needChangeNotifications)
    {
        if (hadPreviousValue)
        {
            Debug.Assert(newChangeNotifications != null);
            Debug.Assert(Array.IndexOf(newChangeNotifications, local) >= 0);
        }
        else if (newChangeNotifications == null)
        {
            newChangeNotifications = new IAsyncLocal[1] { local };
        }
        else
        {
            int newNotificationIndex = newChangeNotifications.Length;
            // 这个方法会创建一个新数组并将原来的元素拷贝过去
            Array.Resize(ref newChangeNotifications, newNotificationIndex + 1);
            newChangeNotifications[newNotificationIndex] = local;
        }
    }

    // 如果 AsyncLocal 存在有效值,且允许执行上下文流动,则创建新的 ExecutionContext实例,新实例会保存所有的AsyncLocal的值和所有需要通知的 AsyncLocal 引用。
    Thread.CurrentThread._executionContext =
        (!isFlowSuppressed && AsyncLocalValueMap.IsEmpty(newValues)) ?
        null : // No values, return to Default context
        new ExecutionContext(newValues, newChangeNotifications, isFlowSuppressed);

    if (needChangeNotifications)
    {
        // 调用先前注册好的委托
        local.OnValueChanged(previousValue, newValue, contextChanged: false);
    }
}

2.2.5、ExecutionContext - GetLocalValue

值的获取实现相对简单

internal static object? GetLocalValue(IAsyncLocal local)
{
    ExecutionContext? current = Thread.CurrentThread._executionContext;
    if (current == null)
    {
        return null;
    }

    Debug.Assert(!current.IsDefault);
    Debug.Assert(current.m_localValues != null, "Only the default context should have null, and we shouldn't be here on the default context");
    current.m_localValues.TryGetValue(local, out object? value);
    return value;
}

3、ExecutionContext 的流动

在线程发生切换的时候,ExecutionContext 会在前一个线程中被默认捕获,流向下一个线程,它所保存的数据也就随之流动。

在所有会发生线程切换的地方,基础类库(BCL) 都为我们封装好了对执行上下文的捕获。

例如:

  • new Thread(ThreadStart start).Start()
  • Task.Run(Action action)
  • ThreadPool.QueueUserWorkItem(WaitCallback callBack)
  • await 语法糖
class Program
{
    static AsyncLocal _asyncLocal = new AsyncLocal();

    static async Task Main(string[] args)
    {
        _asyncLocal.Value = "AsyncLocal保存的数据";

        new Thread(() =>
        {
            Console.WriteLine($"new Thread: {_asyncLocal.Value}");
        })
        {
            IsBackground = true
        }.Start();

        ThreadPool.QueueUserWorkItem(_ =>
        {
            Console.WriteLine($"ThreadPool.QueueUserWorkItem: {_asyncLocal.Value}");
        });

        Task.Run(() =>
        {
            Console.WriteLine($"Task.Run: {_asyncLocal.Value}");
        });

        await Task.Delay(100);
        Console.WriteLine($"after await: {_asyncLocal.Value}");
    }
}

输出结果:

new Thread: AsyncLocal保存的数据
ThreadPool.QueueUserWorkItem: AsyncLocal保存的数据
Task.Run: AsyncLocal保存的数据
after await: AsyncLocal保存的数据

3.1、流动的禁止和恢复

ExecutionContext 为我们提供了 SuppressFlow(禁止流动) 和 RestoreFlow (恢复流动)这两个静态方法来控制当前线程的执行上下文是否像辅助线程流动。并可以通过 IsFlowSuppressed 静态方法来进行判断。

class Program
{
    static AsyncLocal _asyncLocal = new AsyncLocal();

    static async Task Main(string[] args)
    {
        _asyncLocal.Value = "AsyncLocal保存的数据";

        Console.WriteLine("默认:");
        PrintAsync(); // 不 await,后面的线程不会发生切换

        Thread.Sleep(1000); // 确保上面的方法内的所有线程都执行完

        ExecutionContext.SuppressFlow();
        Console.WriteLine("SuppressFlow:");
        PrintAsync();

        Thread.Sleep(1000);

        Console.WriteLine("RestoreFlow:");

        ExecutionContext.RestoreFlow();
        await PrintAsync();

        Console.Read();
    }

    static async ValueTask PrintAsync()
    {
        new Thread(() =>
        {
            Console.WriteLine($"    new Thread: {_asyncLocal.Value}");
        })
        {
            IsBackground = true
        }.Start();

        Thread.Sleep(100); // 保证输出顺序

        ThreadPool.QueueUserWorkItem(_ =>
        {
            Console.WriteLine($"    ThreadPool.QueueUserWorkItem: {_asyncLocal.Value}");
        });

        Thread.Sleep(100);

        Task.Run(() =>
        {
            Console.WriteLine($"    Task.Run: {_asyncLocal.Value}");
        });

        await Task.Delay(100);
        Console.WriteLine($"    after await: {_asyncLocal.Value}");

        Console.WriteLine();
    }
}

输出结果:

默认:
new Thread: AsyncLocal保存的数据
ThreadPool.QueueUserWorkItem: AsyncLocal保存的数据
Task.Run: AsyncLocal保存的数据
after await: AsyncLocal保存的数据

SuppressFlow:
new Thread:
ThreadPool.QueueUserWorkItem:
Task.Run:
after await:

RestoreFlow:
new Thread: AsyncLocal保存的数据
ThreadPool.QueueUserWorkItem: AsyncLocal保存的数据
Task.Run: AsyncLocal保存的数据
after await: AsyncLocal保存的数据

需要注意的是,在线程A中创建线程B之前调用 ExecutionContext.SuppressFlow 只会影响 ExecutionContext 从线程A => 线程B的传递,线程B => 线程C 不受影响。

class Program
{
    static AsyncLocal _asyncLocal = new AsyncLocal();
    static void Main(string[] args)
    {
        _asyncLocal.Value = "A => B";
        ExecutionContext.SuppressFlow();
        new Thread((() =>
        {
            Console.WriteLine($"线程B:{_asyncLocal.Value}"); // 输出线程B:

            _asyncLocal.Value = "B => C";
            new Thread((() =>
            {
                Console.WriteLine($"线程C:{_asyncLocal.Value}"); // 输出线程C:B => C
            }))
            {
                IsBackground = true
            }.Start();
        }))
        {
            IsBackground = true
        }.Start();

        Console.Read();
    }
}

3.2、ExcutionContext 的流动实现

上面举例了四种场景,由于每一种场景的传递过程都比较复杂,目前先介绍其中一个。

但不管什么场景,都会涉及到 ExcutionContext 的 Run 方法。在Run 方法中会调用 RunInternal 方法,

public static void Run(ExecutionContext executionContext, ContextCallback callback, object? state)
{
    if (executionContext == null)
    {
        ThrowNullContext();
    }

    // 内部会调用 RestoreChangedContextToThread 方法
    RunInternal(executionContext, callback, state);
}

RunInternal 调用下面一个 RestoreChangedContextToThread 方法将 ExcutionContext.Run 方法传入的 ExcutionContext 赋值给当前线程的 _executionContext 字段。

internal static void RestoreChangedContextToThread(Thread currentThread, ExecutionContext? contextToRestore, ExecutionContext? currentContext)
{
    Debug.Assert(currentThread == Thread.CurrentThread);
    Debug.Assert(contextToRestore != currentContext);

    // 在这边把之前的 ExecutionContext 赋值给了当前线程
    currentThread._executionContext = contextToRestore;
    if ((currentContext != null && currentContext.HasChangeNotifications) ||
        (contextToRestore != null && contextToRestore.HasChangeNotifications))
    {
        OnValuesChanged(currentContext, contextToRestore);
    }
}

3.2.1、new Thread(ThreadStart start).Start() 为例说明 ExecutionContext 的流动

这边可以分为三个步骤:

在 Thread 的 Start 方法中捕获当前的 ExecutionContext,将其传递给 Thread 的构造函数中实例化的 ThreadHelper 实例,ExecutionContext 会暂存在 ThreadHelper 的实例字段中,线程创建完成后会调用ExecutionContext.RunInternal 将其赋值给新创建的线程。

代码位置:

https://github.com/dotnet/runtime/blob/5fca04171171f118bca0f93aa9741f205b8cdc29/src/coreclr/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs#L200

        public void Start()
        {
#if FEATURE_COMINTEROP_APARTMENT_SUPPORT
            // Eagerly initialize the COM Apartment state of the thread if we're allowed to.
            StartupSetApartmentStateInternal();
#endif // FEATURE_COMINTEROP_APARTMENT_SUPPORT

            // Attach current thread's security principal object to the new
            // thread. Be careful not to bind the current thread to a principal
            // if it's not already bound.
            if (_delegate != null)
            {
                // If we reach here with a null delegate, something is broken. But we'll let the StartInternal method take care of
                // reporting an error. Just make sure we don't try to dereference a null delegate.
                Debug.Assert(_delegate.Target is ThreadHelper);
                // 由于 _delegate 指向 ThreadHelper 的实例方法,所以 _delegate.Target 指向 ThreadHelper 实例。
                var t = (ThreadHelper)_delegate.Target;

                ExecutionContext? ec = ExecutionContext.Capture();
                t.SetExecutionContextHelper(ec);
            }

            StartInternal();
        }

https://github.com/dotnet/runtime/blob/5fca04171171f118bca0f93aa9741f205b8cdc29/src/coreclr/src/System.Private.CoreLib/src/System/Threading/Thread.CoreCLR.cs#L26

class ThreadHelper
{
    internal ThreadHelper(Delegate start)
    {
        _start = start;
    }

    internal void SetExecutionContextHelper(ExecutionContext? ec)
    {
        _executionContext = ec;
    }

    // 这个方法是对 Thread 构造函数传入的委托的包装
    internal void ThreadStart()
    {
        Debug.Assert(_start is ThreadStart);

        ExecutionContext? context = _executionContext;
        if (context != null)
        {
            // 将 ExecutionContext 与 CurrentThread 进行绑定
            ExecutionContext.RunInternal(context, s_threadStartContextCallback, this);
        }
        else
        {
            InitializeCulture();
            ((ThreadStart)_start)();
        }
    }
}

4、总结

  1. AsyncLocal 本身不保存数据,数据保存在 ExecutionContext 实例的 m_localValues 的私有字段上,字段类型定义是 IAsyncLocalMap ,以 IAsyncLocal => object 的 Map 结构进行保存,且实现类型随着元素数量的变化而变化。

  2. ExecutionContext 实例 保存在 Thread.CurrentThread._executionContext 上,实现与当前线程的关联。

  3. 对于 IAsyncLocalMap 的实现类,如果 AsyncLocal 注册了回调,value 传 null 不会被忽略。

    没注册回调时分为两种情况:如果 key 存在,则做删除处理,map 类型可能出现降级。如果 key 不存在,则直接忽略。

  4. ExecutionContext 和 IAsyncLocalMap 的实现类都被设计成不可变(immutable)。同一个 key 前后两次 value 发生变化后,会产生新的 ExecutionContext 的实例和 IAsyncLocalMap 实现类实例。

  5. ExecutionContext 与当前线程绑定,默认流动到辅助线程,可以禁止流动和恢复流动,且禁止流动仅影响当前线程向其辅助线程的传递,不影响后续。

5、参考

  1. https://devblogs.microsoft.com/pfxteam/executioncontext-vs-synchronizationcontext/
  2. 《CLR via C#》27.3 章节
  3. github 代码库 https://github.com/dotnet/runtime

浅析 .NET 中 AsyncLocal 的实现原理

标签:另一个   delegate   基本   基础   blob   tor   dstar   静态类   sed   

原文地址:https://www.cnblogs.com/blurhkh/p/12240767.html


评论


亲,登录后才可以留言!