前言
TransmittableThreadLocal 简称TTL,是阿里的一个开源工具包,用于解决线程池场景中的上下文变量传递,GitHub链接
和ThreadLocal,InheritableThreadLocal的区别
ThreadLocal
: 当前线程上下文值传递
InheritableThreadLocal:
父子线程上下文值传递
TTL
:外部线程与线程池之间的值传递
应用场景
分布式链路追踪/全链路测试/压测-流量打标
日志收集记录系统上下文
SaaS场景无感知传递数据源标识
使用方式
引入依赖
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>transmittable-thread-local</artifactId>
<version>2.14.4</version>
</dependency>
常规使用,因为继承了InheritableThreadLocal类,所以包含了InheritableThreadLocal的功能
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// 在父线程中设置
context.set("value-set-in-parent");
// 在子线程中可以读取,值是"value-set-in-parent"
String value = context.get();
使用
TtlRunnable
和TtlCallable
修饰传入线程池的Runnable和Callable(即使是同一个Runnable任务多次提交到线程池时,每次提交时都需要通过修饰操作(即TtlRunnable.get(task))以抓取这次提交时的TTL上下文的值;即如果同一个任务下一次提交时不执行修饰而仍然使用上一次的TtlRunnable,则提交的任务运行时会是之前修饰操作所抓取的上下文)。
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
// 额外的处理,生成修饰了的对象ttlRunnable
Runnable ttlRunnable = TtlRunnable.get(task);
executorService.submit(ttlRunnable);
// Task中可以读取,值是"value-set-in-parent"
String value = context.get();
// ======================================================================
// 修饰Callable。
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// 在父线程中设置
context.set("value-set-in-parent");
Callable call = new CallableTask();
// 额外的处理,生成修饰了的对象ttlCallable
Callable ttlCallable = TtlCallable.get(call);
executorService.submit(ttlCallable);
// Call中可以读取,值是"value-set-in-parent"
String value = context.get();
修饰线程池
ExecutorService executorService = ...
// 额外的处理,生成修饰了的对象executorService
executorService = TtlExecutors.getTtlExecutorService(executorService);
TransmittableThreadLocal<String> context = new TransmittableThreadLocal<>();
// =====================================================
// 在父线程中设置
context.set("value-set-in-parent");
Runnable task = new RunnableTask();
Callable call = new CallableTask();
executorService.submit(task);
executorService.submit(call);
// =====================================================
// Task或是Call中可以读取,值是"value-set-in-parent"
String value = context.get();
无侵入
Java Agent
方式,在应用启动命令后添加参数,需参照官方配置。
原理解析
假如我们不依赖外部组件,自己要实现从线程到线程池的值传递,我们需要做以下几步工作
用户线程把任务提交到线程池时,拷贝一份用户线程ThreadLocal副本,将副本和任务作映射;
线程池开始任务调度时,备份当前执行线程内ThreadLocal值,根据映射关系,将当前任务对应的ThreadLocal副本拷进去;
为了保持线程隔离和状态一致性,任务调度完成后,需要根据备份还原执行线程原来的ThreadLocal值,并清除新增的TTL值,避免内存泄漏。
接下来截取部分核心代码,看看TTL是如何做的
通过 TtlRunnable.get(task) 方法追踪到TtlRunnable类里
public final class TtlRunnable implements Runnable, TtlWrapper<Runnable>, TtlEnhanced, TtlAttachments {
private final AtomicReference<Object> capturedRef = new AtomicReference(Transmitter.capture());
private final Runnable runnable;
private final boolean releaseTtlValueReferenceAfterRun;
// 存储额外信息,内部使用ConcurrentHashMap
private final TtlAttachmentsDelegate ttlAttachment = new TtlAttachmentsDelegate();
private TtlRunnable(@NonNull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
this.runnable = runnable;
this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
}
/**
* capturedRef:该对象由当前创建TtlRunnable对象的线程也就是主线程初始化,相对于后面执行任务的线程来说就是父线程,所以此处记录的是父线程的值副本。
* releaseTtlValueReferenceAfterRun: 是否在任务执行完后释放capturedRef,默认为false,在某些追求高性能而且没有后续值依赖的场景中用到,也能防止TTL对象被重复使用,释放时使用CAS保证并发情况下的安全和效率。
*/
public void run() {
Object captured = this.capturedRef.get();
if (captured != null && (!this.releaseTtlValueReferenceAfterRun || this.capturedRef.compareAndSet(captured, (Object)null))) {
// 将captured的值放入子线程中, 并返回子线程已经存在的ThreadLocal变量备份。
Object backup = Transmitter.replay(captured);
try {
// 执行任务
this.runnable.run();
} finally {
// 根据备份恢子线程的值
Transmitter.restore(backup);
}
} else {
throw new IllegalStateException("TTL value reference is released after run!");
}
}
// 使用装饰器模式,返回包装后的Runnable对象
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable) {
return get(runnable, false, false);
}
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
return get(runnable, releaseTtlValueReferenceAfterRun, false);
}
@Nullable
public static TtlRunnable get(@Nullable Runnable runnable, boolean releaseTtlValueReferenceAfterRun, boolean idempotent) {
if (null == runnable) {
return null;
// 判断是否已包装过
} else if (runnable instanceof TtlEnhanced) {
if (idempotent) {
return (TtlRunnable)runnable;
} else {
throw new IllegalStateException("Already TtlRunnable!");
}
} else {
// 返回包装过的对象
return new TtlRunnable(runnable, releaseTtlValueReferenceAfterRun);
}
}
}
分析以上代码理出了主干,接下来看看实现细节
capturedRef的初始化
private final AtomicReference<Object> capturedRef = new AtomicReference(Transmitter.capture());
Transmitter是TransmittableThreadLocal中的静态内部类
replay方法也在Transmitter中,放到一起看
public static class Transmitter {
private static volatile WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> threadLocalHolder = new WeakHashMap();
private static final Object threadLocalHolderUpdateLock = new Object();
private static final Object threadLocalClearMark = new Object();
private static final TtlCopier<Object> shadowCopier = new TtlCopier<Object>() {
public Object copy(Object parentValue) {
return parentValue;
}
};
// 拷贝父线程副本,包括TTL副本、 普通ThreadLocal副本
@NonNull
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
// 从TransmittableThreadLocal类的holder对象获取TTL副本,后面会看看这个holder的结构及其值的放入时机
private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap();
Iterator var1 = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();
while(var1.hasNext()) {
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)var1.next();
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
// 从TransmittableThreadLocal类的threadLocalHolder对象获取ThreadLocal副本,后面会看看这个holder的结构及其值的放入时机
private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap();
Iterator var1 = threadLocalHolder.entrySet().iterator();
while(var1.hasNext()) {
Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry = (Map.Entry)var1.next();
ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
TtlCopier<Object> copier = (TtlCopier)entry.getValue();
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}
// 把上面拷贝好的的父线程副本重放到当前被装饰过后的线程(也可称为执行线程)中。
@NonNull
public static Object replay(@NonNull Object captured) {
Snapshot capturedSnapshot = (Snapshot)captured;
return new Snapshot(replayTtlValues(capturedSnapshot.ttl2Value), replayThreadLocalValues(capturedSnapshot.threadLocal2Value));
}
// 从holder对象中备份线程原来的TTL值,并执行重放
@NonNull
private static HashMap<TransmittableThreadLocal<Object>, Object> replayTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> captured) {
HashMap<TransmittableThreadLocal<Object>, Object> backup = new HashMap();
Iterator<TransmittableThreadLocal<Object>> iterator = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();
while(iterator.hasNext()) {
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)iterator.next();
// 线程本身初始化时的值,运行中因为业务调度需要放进去的值
backup.put(threadLocal, threadLocal.get());
// 清除在捕获阶段没拿到的值,防止在任务开始过后拿到由于其他因素导致父线程增加的TTL值,保证状态一致 性,明确TTL传递的值都是由业务代码显示设置。
if (!captured.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 遍历每一个捕获的TTL实例,调用其set方法(实际是调用了ThreadLocal类的set方法),将值设置到当前线程TTL实例中
setTtlValuesTo(captured);
TransmittableThreadLocal.doExecuteCallback(true);
return backup;
}
// 备份线程原来的ThreadLocal值,并执行重放
private static HashMap<ThreadLocal<Object>, Object> replayThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> captured) {
HashMap<ThreadLocal<Object>, Object> backup = new HashMap();
Iterator var2 = captured.entrySet().iterator();
while(var2.hasNext()) {
Map.Entry<ThreadLocal<Object>, Object> entry = (Map.Entry)var2.next();
ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
backup.put(threadLocal, threadLocal.get());
Object value = entry.getValue();
// 清除被标识为删除状态的值,当外部显示调用clear方法时会存在这种情况
if (value == threadLocalClearMark) {
threadLocal.remove();
} else {
threadLocal.set(value);
}
}
return backup;
}
@NonNull
public static Object clear() {
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap();
HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap();
Iterator var2 = threadLocalHolder.entrySet().iterator();
while(var2.hasNext()) {
Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry = (Map.Entry)var2.next();
ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
threadLocal2Value.put(threadLocal, threadLocalClearMark);
}
return replay(new Snapshot(ttl2Value, threadLocal2Value));
}
// 恢复前面备份的ThreadLocal快照和TTL快照
public static void restore(@NonNull Object backup) {
Snapshot backupSnapshot = (Snapshot)backup;
restoreTtlValues(backupSnapshot.ttl2Value);
restoreThreadLocalValues(backupSnapshot.threadLocal2Value);
}
// 移除新增的TTL值,将备份的快照还原回去
private static void restoreTtlValues(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> backup) {
TransmittableThreadLocal.doExecuteCallback(false);
Iterator<TransmittableThreadLocal<Object>> iterator = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();
while(iterator.hasNext()) {
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)iterator.next();
// 移除本次手动注册的的,移除不在备份里的
if (!backup.containsKey(threadLocal)) {
iterator.remove();
threadLocal.superRemove();
}
}
// 把备份的值恢复到当前线程中
setTtlValuesTo(backup);
}
private static void setTtlValuesTo(@NonNull HashMap<TransmittableThreadLocal<Object>, Object> ttlValues) {
Iterator var1 = ttlValues.entrySet().iterator();
while(var1.hasNext()) {
Map.Entry<TransmittableThreadLocal<Object>, Object> entry = (Map.Entry)var1.next();
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)entry.getKey();
// 实际是调用了ThreadLocal类的set方法
threadLocal.set(entry.getValue());
}
}
private static void restoreThreadLocalValues(@NonNull HashMap<ThreadLocal<Object>, Object> backup) {
Iterator var1 = backup.entrySet().iterator();
while(var1.hasNext()) {
Map.Entry<ThreadLocal<Object>, Object> entry = (Map.Entry)var1.next();
ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
threadLocal.set(entry.getValue());
}
}
从以上分析我们可以看出,TTL是依靠TransmittableThreadLocal类的holder、TransmittableThreadLocal中静态内部类Transmitter的threadLocalHolder来实现TTL线程run前后的set和还原,可以把它俩理解为线程级别的缓存,接下来看他们的值是什么时候拷贝进去的。
holder值的放入, TransmittableThreadLocal类里
// holder使用WeakHashMap弱引用,为了避免内存泄漏,内存不足时弱引用自动被回收
// 使用InheritableThreadLocal,当父线程显示设置TTL值进去后,子线程也能拿到
private static final InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>> holder = new InheritableThreadLocal<WeakHashMap<TransmittableThreadLocal<Object>, ?>>() {
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> initialValue() {
// 初始化的时候会调用initialValue返回一个WeekHashMap
return new WeakHashMap();
}
protected WeakHashMap<TransmittableThreadLocal<Object>, ?> childValue(WeakHashMap<TransmittableThreadLocal<Object>, ?> parentValue) {
// 返回的是子线程在第一次get的时候的初始值,如果不重写,默认就是返回父线程的值
return new WeakHashMap(parentValue);
}
};
// 主线程业务代码显示调用set方法时,会把当前TTL对象往holder中存一份
public final void set(T value) {
if (!this.disableIgnoreNullValueSemantics && null == value) {
this.remove();
} else {
super.set(value);
this.addThisToHolder();
}
}
private void addThisToHolder() {
if (!((WeakHashMap)holder.get()).containsKey(this)) {
((WeakHashMap)holder.get()).put(this, (Object)null);
}
}
threadLocalHolder值的放入,静态内部类Transmitter里
// 该方法内部没有地方调用,应该是设计出来方便外部扩展的,允许外部手动注册ThreadLocal实例。所以在没有手动调用registerThreadLocal方法的情况下,captureThreadLocalValues方法捕获不到任何值。
public static <T> boolean registerThreadLocal(@NonNull ThreadLocal<T> threadLocal, @NonNull TtlCopier<T> copier, boolean force) {
if (threadLocal instanceof TransmittableThreadLocal) {
TransmittableThreadLocal.logger.warning("register a TransmittableThreadLocal instance, this is unnecessary!");
return true;
} else {
synchronized(threadLocalHolderUpdateLock) {
if (!force && threadLocalHolder.containsKey(threadLocal)) {
return false;
} else {
WeakHashMap<ThreadLocal<Object>, TtlCopier<Object>> newHolder = new WeakHashMap(threadLocalHolder);
newHolder.put(threadLocal, copier);
threadLocalHolder = newHolder;
return true;
}
}
}
}
我们再看一下从holder中拷贝的过程
// 拷贝父线程副本,包括TTL副本、 普通ThreadLocal副本
@NonNull
public static Object capture() {
return new Snapshot(captureTtlValues(), captureThreadLocalValues());
}
// 从TransmittableThreadLocal类的holder对象获取TTL副本,主线程显示set值的时候会加一份到holder中,子线程也就是执行线程共享
private static HashMap<TransmittableThreadLocal<Object>, Object> captureTtlValues() {
HashMap<TransmittableThreadLocal<Object>, Object> ttl2Value = new HashMap();
Iterator var1 = ((WeakHashMap)TransmittableThreadLocal.holder.get()).keySet().iterator();
while(var1.hasNext()) {
TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal)var1.next();
// TTL实现了TtlCopier接口,copy时拷贝的是值的引用
ttl2Value.put(threadLocal, threadLocal.copyValue());
}
return ttl2Value;
}
// 从TransmittableThreadLocal类的threadLocalHolder对象获取ThreadLocal副本,只有外部显示注册才会有值
private static HashMap<ThreadLocal<Object>, Object> captureThreadLocalValues() {
HashMap<ThreadLocal<Object>, Object> threadLocal2Value = new HashMap();
Iterator var1 = threadLocalHolder.entrySet().iterator();
while(var1.hasNext()) {
Map.Entry<ThreadLocal<Object>, TtlCopier<Object>> entry = (Map.Entry)var1.next();
ThreadLocal<Object> threadLocal = (ThreadLocal)entry.getKey();
TtlCopier<Object> copier = (TtlCopier)entry.getValue();
// 拷贝值的引用
threadLocal2Value.put(threadLocal, copier.copy(threadLocal.get()));
}
return threadLocal2Value;
}
以上我们通过TtlRunnable.get(task) 方法为入口分析了TTl的整个实现流程,再看看包装线程池的方式,其内部也是在使用submit或execute方法的时候把线程包装一遍
public void execute(@NonNull Runnable command) {
this.executor.execute(TtlRunnable.get(command, false, this.idempotent));
}
@NonNull
public <T> Future<T> submit(@NonNull Callable<T> task) {
return this.executorService.submit(TtlCallable.get(task, false, this.idempotent));
}
总结
通过以上分析,总结一下TTL的实现原理:
通过装饰器模式包装线程池或线程(最终都是包装线程),增强线程的run方法,在run执行前拷贝父线程值重放到当前线程,并对当前线程值做一个备份,在run结束之后将备份还原到当前线程。
通过TransmittableThreadLocal类的holder、TransmittableThreadLocal中静态内部类Transmitter的threadLocalHolder来实现TTL线程run前后的set和还原,可以把它俩理解为线程级别的缓存,Transmitter专门用于操作TTL本地线程缓存的重放、恢复备份、清除等操作。
整个时序图如下
其实现和我们一开始自己想的流程差不多,不过其实现的严谨性比如重放前和还原前的值对比,扩展性上比如
ttlAttachment,registerThreadLocal的提供值得学习和研究。
注意事项
run之后执行restore,一方面是为了保持线程状态一致性,防止潜在的业务隐患,其次在restore里面会显示调用remove回收,避免内存泄漏;
因为拷贝类型为引用拷贝,如果子线程修改了数据,主线程也有感知;
TTL的缓存对象holder为static类型,加上上面restore里面的主动释放,正常来说不会出现内存泄露问题,不过我们在项目中待使用结束后最好显示调用clear方法清空,以防万一,也是一个好习惯。
评论区