transmittable-thread-local源码分析

前言

ThreadLocal解决了在多个线程针对一个变量维护不同值的功能,如果你想在同一个线程内传递一些值,那么就可以用到这个类,它的好处是无侵入性,这样我们就不需要再每个方法内透传这个参数,比如Dubbo的RpcContext。另外我们也可以利用这个类来解决在多线程情况下使用线程不安全的类的问题,比如SimpleDateFormat。ThreadLocal的子类InheritableThreadLocal在ThreadLocal的基础上,解决了和线程相关的副本从父线程向子线程传递的问题。如果不使用InheritableThreadLocal,这个变量在父线程和子线程是两个副本。
但是还有另外一种特殊情况,就是我们比较常用的线程池,线程池中的线程会被复用,线程在创建的时候会把父线程当前的inheritableThreadLocals拷贝过去(如果存在,浅拷贝),之后我们再在代码中设置了InternalThreadLocal后,在线程池中的线程就再也获取不到这个新的InheritableThreadLocal了。影响最大的问题就是,我们调用链跟踪系统的traceid等信息,会在线程池中的线程丢失,我们也会丢失一部分调用信息。阿里开源的transmittable-thread-local框架就正是解决这个问题。

我们先来看下InheritableThreadLocal是怎么实现让子线程能访问到父线程的InheritableThreadLocal变量,并且通过这部分源码,也能看出来为什么线程池中的线程一旦创建完成之后被复用时为什么会丢失InheritableThreadLocal。

首先我们在Thread类的构造函数能发现下面这段代码

if (inheritThreadLocals && parent.inheritableThreadLocals != null)
            this.inheritableThreadLocals =
                ThreadLocal.createInheritedMap(parent.inheritableThreadLocals);

意思就是说父线程的inheritableThreadLocals存在时,子线程的inheritableThreadLocals会浅拷贝父线程的inheritableThreadLocals

然后我们看InheritableThreadLocal的重载方法

ThreadLocalMap getMap(Thread t) {
       return t.inheritableThreadLocals;
    }

getMap中的返回从threadLocals变为了inheritableThreadLocals。

因为线程的复用,所以这个inheritableThreadLocals只能维持在这个线程创建时候的状态。

下面是测试这个问题的测试用例

    @Data
    @AllArgsConstructor
    static class Pet {
        private String name;
    }

    @Test
    public void testThreadLocalInPool() throws InterruptedException {
        final ThreadLocal<Pet> tl1 = new InheritableThreadLocal<>();
        final ThreadLocal<Pet> tl2 = new InheritableThreadLocal<>();


        Pet pet = new Pet("xiaomao");
        ExecutorService executorService = Executors.newFixedThreadPool(2);
        tl1.set(pet);
        for(int i =0 ;i<2;i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " " + tl1.get());
            });
        }

        Thread.sleep(2000L);

        //inheritableThreadLocal是浅拷贝
        pet.setName("xiaogou");
        for(int i =0 ;i<2;i++) {
            executorService.execute(() -> {
                System.out.println(Thread.currentThread().getName() + " " + tl1.get());
            });
        }


        //线程池中线程一旦创建完成,InheritableThreadLocal就再也传不进去
        pet.setName("xiaoji");
        tl2.set(pet);
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println(Thread.currentThread().getName()+" "+tl2.get());
            }
        });
        
    }

原理

我们先贴一个不使用ttl框架应该怎么解决线程池传递threadlocal变量的解决方案。

       private static ThreadLocal<Map<String,String>> holder = new InheritableThreadLocal<Map<String,String>>(){
        @Override
        protected Map<String,String> initialValue() {
            return new HashMap<>();
        }
    };


    @Data
    public static class WrapedRunnable implements Runnable{

        private Map<String,String> attachment;

        private Runnable runnable;

        public WrapedRunnable(Runnable runnable) {
            this.runnable=runnable;
            this.attachment = new HashMap<>(holder.get());
        }

        @Override
        public void run() {
            holder.set(this.attachment);
            runnable.run();
        }
    }

    @Test
    public void testMandatoryTTL(){

        Executor executor = Executors.newFixedThreadPool(1);
        executor.execute(()->{
            System.out.println("init");
        });

        HashMap<String,String> attachment = new HashMap<>();
        attachment.put("123","456");
        holder.set(attachment);

        //普通方式
        executor.execute(()->{
            System.out.println(holder.get().containsKey("123"));
        });

        //处理后的方式
        executor.execute(new WrapedRunnable(()->{
            System.out.println(holder.get().containsKey("123"));
        }));



    }

上面的这种方式和ttl的设计思想差不多,但是ttl肯定更加优雅,通用性更高。

下面这张图是ttl核心设计逻辑的时序图。我分析过源码后,大家就能很容易看懂它的设计思想了。


源码讲解

后面出现的 tl=ThreadLocal itl=InheritableThreadLocal ttl=TransmittableThreadLocal

就如上面我自己写的那个传递方式一样,ttl也会把需要传递的threadlocal缓存起来,然后在包装类的run方法内重放,设置到子线程。这个缓存的逻辑封装在TransmittableThreadLocal类中。

TransmittableThreadLocal

TransmittableThreadLocal继承了InheritableThreadLocal,重载了get和set方法

@Override
    public final T get() {
        T value = super.get();
        if (null != value) addValue();
        return value;
    }

    @Override
    public final void set(T value) {
        super.set(value);
        // may set null to remove value
        if (null == value) removeValue();
        else addValue();
    }

可以看到在调用父类的逻辑上,新增了addValue和removeValue的逻辑,这个就是缓存的逻辑

private void addValue() {
        if (!holder.get().containsKey(this)) {
            holder.get().put(this, null); // WeakHashMap supports null value.
        }
    }

    private void removeValue() {
        holder.get().remove(this);
    }

会把当前这个threadlocal缓存到holder上面。

下面介绍下这个很关键的holder

holder

private static InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>> holder =
            new InheritableThreadLocal<Map<TransmittableThreadLocal<?>, ?>>() {
                @Override
                protected Map<TransmittableThreadLocal<?>, ?> initialValue() {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>();
                }

                @Override
                protected Map<TransmittableThreadLocal<?>, ?> childValue(Map<TransmittableThreadLocal<?>, ?> parentValue) {
                    return new WeakHashMap<TransmittableThreadLocal<?>, Object>(parentValue);
                }
            };

首先这个holder本身是InheritableThreadLocal类型的,所以它也是和线程相关联的。可以在父子线程间传递,但是对于线程池内已经创建的线程肯定是传递不进去的。所以在初始化wrapper类的时候,那个时候还是父线程,在wrapper类构造的时候,要把这些threadlocal捕获出来,这个捕获相关逻辑见下一个Transmitter的分析。其次这个holder内保存的是一个WeakHashMap<TransmittableThreadLocal<?>, Object>,所以这个WeakHashMap的key是在没被强引用的情况下可以被回收的。另外需要注意的是,这个WeakHashMap设计者是为了利用到它的key可以被回收的特性,就是当做set在使用。

Transmitter

Transmitter内有3个核心方法

方法 作用
capture 捕获父线程的ttl
replay 重放父线程ttl
restore 恢复之前子线程的ttl

capture用于捕获父线程的ttl,捕获操作要在父线程执行

public static Object capture() {
            Map<TransmittableThreadLocal<?>, Object> captured = new HashMap<TransmittableThreadLocal<?>, Object>();
            for (TransmittableThreadLocal<?> threadLocal : holder.get().keySet()) {
                captured.put(threadLocal, threadLocal.copyValue());
            }
            return captured;
        }

replay传入capture方法捕获的ttl,然后在子线程重放,也就是调用ttl的set方法,会设置到当前的线程中去,最后会把子线程之前存在的ttl返回

public static Object replay(@Nonnull Object captured) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> capturedMap = (Map<TransmittableThreadLocal<?>, Object>) captured;
            Map<TransmittableThreadLocal<?>, Object> backup = new HashMap<TransmittableThreadLocal<?>, Object>();

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // backup
                backup.put(threadLocal, threadLocal.get());

                // clear the TTL values that is not in captured
                // avoid the extra TTL values after replay when run task
                //清除之前上下文,不在capturedMap中,都清除
                if (!capturedMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // set values to captured TTL
            //这边是在子线程设置ttl的逻辑
            setTtlValuesTo(capturedMap);

            // call beforeExecute callback
            doExecuteCallback(true);

            return backup;
        }

setTtlValuesTo用于在子线程设置ttl,逻辑如下

private static void setTtlValuesTo(@Nonnull Map<TransmittableThreadLocal<?>, Object> ttlValues) {
            for (Map.Entry<TransmittableThreadLocal<?>, Object> entry : ttlValues.entrySet()) {
                @SuppressWarnings("unchecked")
                TransmittableThreadLocal<Object> threadLocal = (TransmittableThreadLocal<Object>) entry.getKey();
                //这边是设置到当前线程
                threadLocal.set(entry.getValue());
            }
        }

其实就是调用ttl的set方法,看过ThreadLocal源码的你应该懂。

最后就是执行结束,restore之前的上下文,用到replay返回的back。

public static void restore(@Nonnull Object backup) {
            @SuppressWarnings("unchecked")
            Map<TransmittableThreadLocal<?>, Object> backupMap = (Map<TransmittableThreadLocal<?>, Object>) backup;
            // call afterExecute callback
            doExecuteCallback(false);

            for (Iterator<? extends Map.Entry<TransmittableThreadLocal<?>, ?>> iterator = holder.get().entrySet().iterator();
                 iterator.hasNext(); ) {
                Map.Entry<TransmittableThreadLocal<?>, ?> next = iterator.next();
                TransmittableThreadLocal<?> threadLocal = next.getKey();

                // clear the TTL values that is not in backup
                // avoid the extra TTL values after restore
                // 清除之前的上下文,不在backupMap中的都清除了
                if (!backupMap.containsKey(threadLocal)) {
                    iterator.remove();
                    threadLocal.superRemove();
                }
            }

            // restore TTL values
            // 恢复到运行之前的状态
            setTtlValuesTo(backupMap);
        }

要把capture,repaly和restore的逻辑串起来,那么就需要看下面的TtlRunnable类,这个就是我一直说的包装类。

TtlRunnable

我们先看TtlRunnable的构造函数

 private TtlRunnable(@Nonnull Runnable runnable, boolean releaseTtlValueReferenceAfterRun) {
        //捕获父线程ttl
        this.capturedRef = new AtomicReference<Object>(capture());
        this.runnable = runnable;
        this.releaseTtlValueReferenceAfterRun = releaseTtlValueReferenceAfterRun;
    }

在构造函数,也就是父线程,会通过capture捕获父线程的ttl,然后保存在capturedRef中。

在run方法中,replay,restore逻辑一目了然,不多解释。

public void run() {
        Object captured = capturedRef.get();
        if (captured == null || releaseTtlValueReferenceAfterRun && !capturedRef.compareAndSet(captured, null)) {
            throw new IllegalStateException("TTL value reference is released after run!");
        }

        Object backup = replay(captured);
        try {
            runnable.run();
        } finally {
            restore(backup);
        }
    }

所以我们在项目中想用到ttl的时候,可以这么使用

@Data
    @AllArgsConstructor
    static class Pet {
        private String name;
    }


    @Test
    public void compareTLAndTTL() throws InterruptedException {
        Executor executor = Executors.newFixedThreadPool(1);
        executor.execute(()->{
            System.out.println("init");
        });

        ThreadLocal<Pet> tl = new ThreadLocal<>();
        tl.set(new Pet("xiaogou"));

        executor.execute(()->{
            //这边根本拿不到父线程的tl
            System.out.println(tl.get());
        });


        TransmittableThreadLocal<Pet> ttl = new TransmittableThreadLocal<>();
        ttl.set(new Pet("xiaomao"));

        executor.execute(TtlRunnable.get(()->{
            System.out.println(ttl.get());
            //证明ttl是浅拷贝
            ttl.get().setName("xiaogou");
        }));

        Thread.sleep(1000L);

        System.out.println(ttl.get());

    }

输出如下

init
null
SPITest.Pet(name=xiaomao)
SPITest.Pet(name=xiaogou)

但是这样使用起来也太麻烦了,我们需要修改我们的使用方式,有没有无侵入的使用方式?我们可以把上面包装Runnable的逻辑封装到线程池中去。因此用到了ExecutorTtlWrapper。

ExecutorTtlWrapper

class ExecutorTtlWrapper implements Executor, TtlEnhanced {
    private final Executor executor;

    ExecutorTtlWrapper(@Nonnull Executor executor) {
        this.executor = executor;
    }

    @Override
    public void execute(@Nonnull Runnable command) {
        executor.execute(TtlRunnable.get(command));
    }

    @Nonnull
    public Executor unwrap() {
        return executor;
    }
}

代码很简单,不多解释了。
我们可以通过TtlExecutors这个工具类来快捷获取这些包装TtlRunbale逻辑的线程池。但是这样还是比较麻烦的,因此用到下面这个TtlAgent类,它利用了jvm的Instrument机制,可以在编译的时候修改字节码,在jdk的线程池源码中加入TtlRunnable封装的逻辑。

TtlAgent

instrument的原理以及如何配置不是本文重点,大家知道它干了什么就好了,可以在参考贴的链接学习,这个技术在很多中间件用到

public static void premain(String agentArgs, @Nonnull Instrumentation inst) {
        //解析key-value配置
        kvs = splitCommaColonStringToKV(agentArgs);
        //根据kv配置 设置日志打印方式
        Logger.setLoggerImplType(getLogImplTypeFromAgentArgs(kvs));
        final Logger logger = Logger.getLogger(TtlAgent.class);

        try {
            logger.info("[TtlAgent.premain] begin, agentArgs: " + agentArgs + ", Instrumentation: " + inst);
            //获取kv中关于是否禁止向子线程传递ttl的配置
            final boolean disableInheritable = isDisableInheritableForThreadPool();

            final List<JavassistTransformlet> transformletList = new ArrayList<JavassistTransformlet>();
            //修改java.util.concurrent.ThreadPoolExecutor,java.util.concurrent.ScheduledThreadPoolExecutor的代码
            transformletList.add(new TtlExecutorTransformlet(disableInheritable));
            //修改另外一个线程池
            transformletList.add(new TtlForkJoinTransformlet(disableInheritable));
            //根据配置决定是否修改TimeTask源码,阿里规范不建议使用这个类做定时任务
            if (isEnableTimerTask()) transformletList.add(new TtlTimerTaskTransformlet());
            //把我们的转换器设置到inst中去
            final ClassFileTransformer transformer = new TtlTransformer(transformletList);
            inst.addTransformer(transformer, true);
            logger.info("[TtlAgent.premain] addTransformer " + transformer.getClass() + " success");

            logger.info("[TtlAgent.premain] end");

            ttlAgentLoaded = true;
        } catch (Exception e) {
            String msg = "Fail to load TtlAgent , cause: " + e.toString();
            logger.log(Level.SEVERE, msg, e);
            throw new IllegalStateException(msg, e);
        }
    }

premain用于我们向jvm注册我们的转换器,根据转换器内的逻辑,我们可以修改对应的class文件源码。

我们直接来看下TtlExecutorTransformlet中是怎么修改源码的,核心代码如下

PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put("java.lang.Runnable", "com.alibaba.ttl.TtlRunnable");
        PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.put("java.util.concurrent.Callable", "com.alibaba.ttl.TtlCallable");

CtClass[] parameterTypes = method.getParameterTypes();
        StringBuilder insertCode = new StringBuilder();
        for (int i = 0; i < parameterTypes.length; i++) {
            final String paramTypeName = parameterTypes[i].getName();
            if (PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.containsKey(paramTypeName)) {
                String code = String.format("$%d = %s.get($%d, false, true);", i + 1, PARAM_TYPE_NAME_TO_DECORATE_METHOD_CLASS.get(paramTypeName), i + 1);
                logger.info("insert code before method " + signatureOfMethod(method) + " of class " + method.getDeclaringClass().getName() + ": " + code);
                insertCode.append(code);
            }
        }

大概意思就是,在ThreadPoolExecutor类中,如果方法参数含有Runnable或者Callable,会在方法体第一行,加上一段代码

runnable = com.alibaba.ttl.TtlRunnable.get(runnable,false,true) 
callable   = com.alibaba.ttl.TtlCallable(runnable,false,true) 

这样就实现了无感知包装Runnable的逻辑。

具体如何使用这个agent 我们需要增加如下的jvm启动参数

-javaagent:/path/to/transmittable-thread-local-2.x.x.jar=ttl.agent.logger:STDOUT,ttl.agent.disable.inheritable.for.thread.pool:true

等于号后面的是额外配置参数,具体如何配置可以看TtlAgent类的注释

最后

通过agent相当于无侵入引入了ttl,但是ttl的创建这一步还是需要我们手动的,不可能去改写tl或者itl的字节码,tl,itl,ttl三者在jvm内共存
ttl框架主要还是用于中间件,但是我们还是需要了解的,学习一个知识点需要深入,万一以后遇到这种坑了呢。

参考

ThreadLocal原理及内存泄露预防
transmittable-thread-local github
TransmittableThreadLocal详解
agent官方文档

最后编辑于
©著作权归作者所有,转载或内容合作请联系作者
  • 序言:七十年代末,一起剥皮案震惊了整个滨河市,随后出现的几起案子,更是在滨河造成了极大的恐慌,老刑警刘岩,带你破解...
    沈念sama阅读 199,711评论 5 468
  • 序言:滨河连续发生了三起死亡事件,死亡现场离奇诡异,居然都是意外死亡,警方通过查阅死者的电脑和手机,发现死者居然都...
    沈念sama阅读 83,932评论 2 376
  • 文/潘晓璐 我一进店门,熙熙楼的掌柜王于贵愁眉苦脸地迎上来,“玉大人,你说我怎么就摊上这事。” “怎么了?”我有些...
    开封第一讲书人阅读 146,770评论 0 330
  • 文/不坏的土叔 我叫张陵,是天一观的道长。 经常有香客问我,道长,这世上最难降的妖魔是什么? 我笑而不...
    开封第一讲书人阅读 53,799评论 1 271
  • 正文 为了忘掉前任,我火速办了婚礼,结果婚礼上,老公的妹妹穿的比我还像新娘。我一直安慰自己,他们只是感情好,可当我...
    茶点故事阅读 62,697评论 5 359
  • 文/花漫 我一把揭开白布。 她就那样静静地躺着,像睡着了一般。 火红的嫁衣衬着肌肤如雪。 梳的纹丝不乱的头发上,一...
    开封第一讲书人阅读 48,069评论 1 276
  • 那天,我揣着相机与录音,去河边找鬼。 笑死,一个胖子当着我的面吹牛,可吹牛的内容都是我干的。 我是一名探鬼主播,决...
    沈念sama阅读 37,535评论 3 390
  • 文/苍兰香墨 我猛地睁开眼,长吁一口气:“原来是场噩梦啊……” “哼!你这毒妇竟也来了?” 一声冷哼从身侧响起,我...
    开封第一讲书人阅读 36,200评论 0 254
  • 序言:老挝万荣一对情侣失踪,失踪者是张志新(化名)和其女友刘颖,没想到半个月后,有当地人在树林里发现了一具尸体,经...
    沈念sama阅读 40,353评论 1 294
  • 正文 独居荒郊野岭守林人离奇死亡,尸身上长有42处带血的脓包…… 初始之章·张勋 以下内容为张勋视角 年9月15日...
    茶点故事阅读 35,290评论 2 317
  • 正文 我和宋清朗相恋三年,在试婚纱的时候发现自己被绿了。 大学时的朋友给我发了我未婚夫和他白月光在一起吃饭的照片。...
    茶点故事阅读 37,331评论 1 329
  • 序言:一个原本活蹦乱跳的男人离奇死亡,死状恐怖,灵堂内的尸体忽然破棺而出,到底是诈尸还是另有隐情,我是刑警宁泽,带...
    沈念sama阅读 33,020评论 3 315
  • 正文 年R本政府宣布,位于F岛的核电站,受9级特大地震影响,放射性物质发生泄漏。R本人自食恶果不足惜,却给世界环境...
    茶点故事阅读 38,610评论 3 303
  • 文/蒙蒙 一、第九天 我趴在偏房一处隐蔽的房顶上张望。 院中可真热闹,春花似锦、人声如沸。这庄子的主人今日做“春日...
    开封第一讲书人阅读 29,694评论 0 19
  • 文/苍兰香墨 我抬头看了看天上的太阳。三九已至,却和暖如春,着一层夹袄步出监牢的瞬间,已是汗流浃背。 一阵脚步声响...
    开封第一讲书人阅读 30,927评论 1 255
  • 我被黑心中介骗来泰国打工, 没想到刚下飞机就差点儿被人妖公主榨干…… 1. 我叫王不留,地道东北人。 一个月前我还...
    沈念sama阅读 42,330评论 2 346
  • 正文 我出身青楼,却偏偏与公主长得像,于是被迫代替她去往敌国和亲。 传闻我的和亲对象是个残疾皇子,可洞房花烛夜当晚...
    茶点故事阅读 41,904评论 2 341

推荐阅读更多精彩内容

  • 1. 背景 ThreadLocal源码解读,网上面早已经泛滥了,大多比较浅,甚至有的连基本原理都说的很有问题,包括...
    时之令阅读 634评论 1 5
  • 1. 背景 ThreadLocal源码解读,网上面早已经泛滥了,大多比较浅,甚至有的连基本原理都说的很有问题,包括...
    小陈阿飞阅读 1,351评论 2 56
  • 第一章:Java程序设计概述 Java和C++最大的不同在于Java采用的指针模型可以消除重写内存和损坏数据的可能...
    loneyzhou阅读 1,229评论 1 7
  • 原理 产生线程安全问题的根源在于多线程之间的数据共享。如果没有数据共享,就没有多线程并发安全问题。ThreadLo...
    Java耕耘者阅读 298评论 0 0
  • 今天早上,我发现了一个很奇怪的地方,就是老何通常都会比我早来思维班,但是他今天比我晚来,所以,我觉得老何出现了什么...
    霸气小男生阅读 475评论 0 0