侧边栏壁纸
博主头像
博主等级

  • 累计撰写 19 篇文章
  • 累计创建 34 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

TransmittableThreadLocal原理及使用场景

前尘一梦
2022-09-30 / 0 评论 / 0 点赞 / 53 阅读 / 20843 字

前言

TransmittableThreadLocal 简称TTL,是阿里的一个开源工具包,用于解决线程池场景中的上下文变量传递,GitHub链接

ThreadLocal,InheritableThreadLocal的区别

ThreadLocal : 当前线程上下文值传递

InheritableThreadLocal: 父子线程上下文值传递

TTL:外部线程与线程池之间的值传递

应用场景

  • 分布式链路追踪/全链路测试/压测-流量打标

  • 日志收集记录系统上下文

  • SaaS场景无感知传递数据源标识

使用方式

  • 引入依赖

<dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>transmittable-thread-local</artifactId>
    <version>2.14.4</version>
</dependency>
  1. 常规使用,因为继承了InheritableThreadLocal类,所以包含了InheritableThreadLocal的功能

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// 在父线程中设置
context.set("value-set-in-parent");

// 在子线程中可以读取,值是"value-set-in-parent"
String value = context.get();
  1. 使用TtlRunnableTtlCallable 修饰传入线程池的Runnable和Callable(即使是同一个Runnable任务多次提交到线程池时,每次提交时都需要通过修饰操作(即TtlRunnable.get(task))以抓取这次提交时的TTL上下文的值;即如果同一个任务下一次提交时不执行修饰而仍然使用上一次的TtlRunnable,则提交的任务运行时会是之前修饰操作所抓取的上下文)。

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// 在父线程中设置
context.set("value-set-in-parent");

Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);

// Task中可以读取,值是"value-set-in-parent"
String value = context.get();

// ======================================================================
// 修饰Callable。
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// 在父线程中设置
context.set("value-set-in-parent");

Callable call = new CallableTask();
// 额外的处理,生成修饰了的对象ttlCallable
Callable ttlCallable = TtlCallable.get(call);
executorService.submit(ttlCallable);


// Call中可以读取,值是"value-set-in-parent"
String value = context.get();
  1. 修饰线程池

ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);

TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();

// =====================================================

// 在父线程中设置
context.set("value-set-in-parent");

Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);

// =====================================================

// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();
  1. 无侵入Java Agent 方式,在应用启动命令后添加参数,需参照官方配置。

原理解析

假如我们不依赖外部组件,自己要实现从线程到线程池的值传递,我们需要做以下几步工作

  1. 用户线程把任务提交到线程池时,拷贝一份用户线程ThreadLocal副本,将副本和任务作映射;

  2. 线程池开始任务调度时,备份当前执行线程内ThreadLocal值,根据映射关系,将当前任务对应的ThreadLocal副本拷进去;

  3. 为了保持线程隔离和状态一致性,任务调度完成后,需要根据备份还原执行线程原来的ThreadLocal值,并清除新增的TTL值,避免内存泄漏。

接下来截取部分核心代码,看看TTL是如何做的

通过 TtlRunnable.get(task) 方法追踪到TtlRunnable类里

public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
    private final AtomicReference<Object> capturedRef = new AtomicReference(Transmitter.capture());
    private final Runnable runnable;
    private final boolean releaseTtlValueReferenceAfterRun;
    // 存储额外信息,内部使用ConcurrentHashMap
    private final TtlAttachmentsDelegate ttlAttachment = new TtlAttachmentsDelegate();

    private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }

    /**
     * capturedRef:该对象由当前创建TtlRunnable对象的线程也就是主线程初始化,相对于后面执行任务的线程来说就是父线程,所以此处记录的是父线程的值副本。
     * releaseTtlValueReferenceAfterRun: 是否在任务执行完后释放capturedRef,默认为false,在某些追求高性能而且没有后续值依赖的场景中用到,也能防止TTL对象被重复使用,释放时使用CAS保证并发情况下的安全和效率。
     */
    public void run() {
        Object captured = this.capturedRef.get();
        if (captured != null && (!this.releaseTtlValueReferenceAfterRun || this.capturedRef.compareAndSet(captured, (Object)null))) {
            // 将captured的值放入子线程中, 并返回子线程已经存在的ThreadLocal变量备份。
            Object backup = Transmitter.replay(captured);

            try {
                // 执行任务
                this.runnable.run();
            } finally {
                // 根据备份恢子线程的值
                Transmitter.restore(backup);
            }

        } else {
            throw new IllegalStateException("TTL value reference is released after run!");
        }
    }

    // 使用装饰器模式,返回包装后的Runnable对象
    @Nullable
    public static TtlRunnable get(@Nullable Runnable runnable) {
        return get(runnable, false, false);
    }

    @Nullable
    public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        return get(runnable, releaseTtlValueReferenceAfterRun, false);
    }


    @Nullable
    public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
        if (null == runnable) {
            return null;
        // 判断是否已包装过
        } else if (runnable instanceof TtlEnhanced) {
            if (idempotent) {
                return (TtlRunnable)runnable;
            } else {
                throw new IllegalStateException("Already TtlRunnable!");
            }
        } else {
          // 返回包装过的对象
            return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
        }
    }
}

分析以上代码理出了主干,接下来看看实现细节

  • capturedRef的初始化

    • private final AtomicReference<Object> capturedRef = new AtomicReference(Transmitter.capture());

  • Transmitter是TransmittableThreadLocal中的静态内部类

  • replay方法也在Transmitter中,放到一起看

    public static class Transmitter {
        private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap();
        private static final Object threadLocalHolderUpdateLock = new Object();
        private static final Object threadLocalClearMark = new Object();
        private static final TtlCopier<Object> shadowCopier = new TtlCopier<Object>() {
            public Object copy(Object parentValue) {
                return parentValue;
            }
        };

        // 拷贝父线程副本,包括TTL副本、 普通ThreadLocal副本
        @NonNull
        public static Object capture() {
            return new Snapshot(captureTtlValues(), captureThreadLocalValues());
        }

        // 从TransmittableThreadLocal类的holder对象获取TTL副本,后面会看看这个holder的结构及其值的放入时机
        private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
            HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap();
            Iterator var1 = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();

            while(var1.hasNext()) {
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)var1.next();
                ttl2Value.put(threadLocal, threadLocal.copyValue());
            }

            return ttl2Value;
        }
        // 从TransmittableThreadLocal类的threadLocalHolder对象获取ThreadLocal副本,后面会看看这个holder的结构及其值的放入时机
        private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
            HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap();
            Iterator var1 = threadLocalHolder.entrySet().iterator();

            while(var1.hasNext()) {
                Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry = (Map.Entry)var1.next();
                ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
                TtlCopier<Object> copier = (TtlCopier)entry.getValue();
                threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
            }

            return threadLocal2Value;
        }
        // 把上面拷贝好的的父线程副本重放到当前被装饰过后的线程(也可称为执行线程)中。
        @NonNull
        public static Object replay(@NonNull Object captured) {
            Snapshot capturedSnapshot = (Snapshot)captured;
            return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
        }

        // 从holder对象中备份线程原来的TTL值,并执行重放
        @NonNull
        private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
            HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap();
            Iterator<TransmittableThreadLocal<Object>> iterator = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();

            while(iterator.hasNext()) {
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)iterator.next();
                // 线程本身初始化时的值,运行中因为业务调度需要放进去的值
                backup.put(threadLocal, threadLocal.get());
                
                // 清除在捕获阶段没拿到的值,防止在任务开始过后拿到由于其他因素导致父线程增加的TTL值,保证状态一致      性,明确TTL传递的值都是由业务代码显示设置。
                if (!captured.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }
            // 遍历每一个捕获的TTL实例,调用其set方法(实际是调用了ThreadLocal类的set方法),将值设置到当前线程TTL实例中
            setTtlValuesTo(captured);
            TransmittableThreadLocal.doExecuteCallback(true);
            return backup;
        }

        // 备份线程原来的ThreadLocal值,并执行重放
        private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) {
            HashMap<ThreadLocal<Object>, Object> backup = new HashMap();
            Iterator var2 = captured.entrySet().iterator();

            while(var2.hasNext()) {
                Map.Entry<ThreadLocal<Object>, Object> entry = (Map.Entry)var2.next();
                ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
                backup.put(threadLocal, threadLocal.get());
                Object value = entry.getValue();
                // 清除被标识为删除状态的值,当外部显示调用clear方法时会存在这种情况
                if (value == threadLocalClearMark) {
                    threadLocal.remove();
                } else {
                    threadLocal.set(value);
                }
            }

            return backup;
        }

        @NonNull
        public static Object clear() {
            HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap();
            HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap();
            Iterator var2 = threadLocalHolder.entrySet().iterator();

            while(var2.hasNext()) {
                Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry = (Map.Entry)var2.next();
                ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
                threadLocal2Value.put(threadLocal, threadLocalClearMark);
            }

            return replay(new Snapshot(ttl2Value, threadLocal2Value));
        }
      
        // 恢复前面备份的ThreadLocal快照和TTL快照
        public static void restore(@NonNull Object backup) {
            Snapshot backupSnapshot = (Snapshot)backup;
            restoreTtlValues(backupSnapshot.ttl2Value);
            restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
        }
        
        // 移除新增的TTL值,将备份的快照还原回去
        private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
            TransmittableThreadLocal.doExecuteCallback(false);
            Iterator<TransmittableThreadLocal<Object>> iterator = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();

            while(iterator.hasNext()) {
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)iterator.next();
          // 移除本次手动注册的的,移除不在备份里的
                if (!backup.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }
           // 把备份的值恢复到当前线程中
            setTtlValuesTo(backup);
        }

        private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
            Iterator var1 = ttlValues.entrySet().iterator();

            while(var1.hasNext()) {
                Map.Entry<TransmittableThreadLocal<Object>, Object> entry = (Map.Entry)var1.next();
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)entry.getKey();
                // 实际是调用了ThreadLocal类的set方法
                threadLocal.set(entry.getValue());
            }

        }

        private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {
            Iterator var1 = backup.entrySet().iterator();

            while(var1.hasNext()) {
                Map.Entry<ThreadLocal<Object>, Object> entry = (Map.Entry)var1.next();
                ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
                threadLocal.set(entry.getValue());
            }

        }

从以上分析我们可以看出,TTL是依靠TransmittableThreadLocal类的holder、TransmittableThreadLocal中静态内部类Transmitter的threadLocalHolder来实现TTL线程run前后的set和还原,可以把它俩理解为线程级别的缓存,接下来看他们的值是什么时候拷贝进去的。

holder值的放入, TransmittableThreadLocal类里

// holder使用WeakHashMap弱引用,为了避免内存泄漏,内存不足时弱引用自动被回收
// 使用InheritableThreadLocal,当父线程显示设置TTL值进去后,子线程也能拿到
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
        protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
        // 初始化的时候会调用initialValue返回一个WeekHashMap
            return new WeakHashMap();
        }

        protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
        // 返回的是子线程在第一次get的时候的初始值,如果不重写,默认就是返回父线程的值
            return new WeakHashMap(parentValue);
        }
    };


// 主线程业务代码显示调用set方法时,会把当前TTL对象往holder中存一份
    public final void set(T value) {
        if (!this.disableIgnoreNullValueSemantics && null == value) {
            this.remove();
        } else {
            super.set(value);
            this.addThisToHolder();
        }

    }

    private void addThisToHolder() {
        if (!((WeakHashMap)holder.get()).containsKey(this)) {
            ((WeakHashMap)holder.get()).put(this, (Object)null);
        }

    }

threadLocalHolder值的放入,静态内部类Transmitter里


      // 该方法内部没有地方调用,应该是设计出来方便外部扩展的,允许外部手动注册ThreadLocal实例。所以在没有手动调用registerThreadLocal方法的情况下,captureThreadLocalValues方法捕获不到任何值。
        public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, @NonNull TtlCopier<T> copier, boolean force) {
            if (threadLocal instanceof TransmittableThreadLocal) {
                TransmittableThreadLocal.logger.warning("register a TransmittableThreadLocal instance, this is unnecessary!");
                return true;
            } else {
                synchronized(threadLocalHolderUpdateLock) {
                    if (!force && threadLocalHolder.containsKey(threadLocal)) {
                        return false;
                    } else {
                        WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap(threadLocalHolder);
                        newHolder.put(threadLocal, copier);
                        threadLocalHolder = newHolder;
                        return true;
                    }
                }
            }
        }

我们再看一下从holder中拷贝的过程

  // 拷贝父线程副本,包括TTL副本、 普通ThreadLocal副本
        @NonNull
        public static Object capture() {
            return new Snapshot(captureTtlValues(), captureThreadLocalValues());
        }

        // 从TransmittableThreadLocal类的holder对象获取TTL副本,主线程显示set值的时候会加一份到holder中,子线程也就是执行线程共享
        private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
            HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap();
            Iterator var1 = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();

            while(var1.hasNext()) {
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)var1.next();
          // TTL实现了TtlCopier接口,copy时拷贝的是值的引用
                ttl2Value.put(threadLocal, threadLocal.copyValue());
            }

            return ttl2Value;
        }
        // 从TransmittableThreadLocal类的threadLocalHolder对象获取ThreadLocal副本,只有外部显示注册才会有值
        private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
            HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap();
            Iterator var1 = threadLocalHolder.entrySet().iterator();

            while(var1.hasNext()) {
                Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry = (Map.Entry)var1.next();
                ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
                TtlCopier<Object> copier = (TtlCopier)entry.getValue();
            // 拷贝值的引用
                threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
            }

            return threadLocal2Value;
        }

以上我们通过TtlRunnable.get(task) 方法为入口分析了TTl的整个实现流程,再看看包装线程池的方式,其内部也是在使用submit或execute方法的时候把线程包装一遍

public void execute(@NonNull Runnable command) {
        this.executor.execute(TtlRunnable.get(command, false, this.idempotent));
    }

@NonNull
    public <T> Future<T> submit(@NonNull Callable<T> task) {
        return this.executorService.submit(TtlCallable.get(task, false, this.idempotent));
    }

总结

通过以上分析,总结一下TTL的实现原理:

  1. 通过装饰器模式包装线程池或线程(最终都是包装线程),增强线程的run方法,在run执行前拷贝父线程值重放到当前线程,并对当前线程值做一个备份,在run结束之后将备份还原到当前线程。

  2. 通过TransmittableThreadLocal类的holder、TransmittableThreadLocal中静态内部类Transmitter的threadLocalHolder来实现TTL线程run前后的set和还原,可以把它俩理解为线程级别的缓存,Transmitter专门用于操作TTL本地线程缓存的重放、恢复备份、清除等操作。

  3. 整个时序图如下

233595980-ef7f1f8b-36cd-45b3-b55b-45f7b3d1c94f.png

  1. 其实现和我们一开始自己想的流程差不多,不过其实现的严谨性比如重放前和还原前的值对比,扩展性上比如

ttlAttachment,registerThreadLocal的提供值得学习和研究。

注意事项

  1. run之后执行restore,一方面是为了保持线程状态一致性,防止潜在的业务隐患,其次在restore里面会显示调用remove回收,避免内存泄漏;

  2. 因为拷贝类型为引用拷贝,如果子线程修改了数据,主线程也有感知;

  3. TTL的缓存对象holder为static类型,加上上面restore里面的主动释放,正常来说不会出现内存泄露问题,不过我们在项目中待使用结束后最好显示调用clear方法清空,以防万一,也是一个好习惯。

0

评论区