侧边栏壁纸
博主头像
博主等级

  • 累计撰写 19 篇文章
  • 累计创建 34 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

CompletableFuture原理与应用

前尘一梦
2024-03-09 / 1 评论 / 5 点赞 / 102 阅读 / 41415 字

前言

保险中台上线以后,随着业务量的持续提升,系统面临的压力也越来越大。保险端为中台中的核心系统,对外承接经过各渠道清洗,初步处理后的全国数十个地区,机构的投保业务流量,对内调度各个下游服务获取数据进行聚合,属于典型的长流程, I/O密集型业务,在业务流程越来越复杂,业务量越来越大的情况下,使用同步调用的数据加载方式无法满足业务方对接口的响应速度要求,于是考虑如何将同步加载改为并行加载。

并行调用的业务场景

保险端业务具有以下特点:

  • 单次投保涉及多达几十张表,上游需要将数据推送给保险公司,下游需要推给其他业务团队

  • 不同渠道,不同产品处理逻辑极具个性化,总体流程越来越复杂,于是采用变种责任链模式Pipeline把流程拆成多个节点,在Pipeline的开头依赖调用多个RPC服务返回数据组装上下文,供后面多个节点使用,PipeLine模式可看这篇

同步和并行的区别

同步

按顺序依次从各个服务获取数据

ad46bb8baa4e79e727ee5bd7af0b175c38212.png

在同步调用的场景下,接口耗时长、性能差,接口响应时长T > T1+T2+T3+……+Tn,这种情况会导致CPU浪费大量时间在阻塞等待上,无法充分利用硬件资源,接口响应速度会随着调用的RPC接口变多而变得越来越慢,系统吞吐量很容易达到瓶颈。

并行

7451722785287_.pic.jpg

此时主要通过以下两种方式来减少线程池的调度开销和阻塞时间:

  • 合并业务属性相近的RPC接口,减少调用次数

  • 通过引入CompletableFuture(下文简称CF)对业务流程进行编排,降低依赖之间的阻塞

CompletableFuture对比Future、RxJava、Reactor

对比一下Future、CompletableFuture、RxJava、Reactor的特性:

Future

CompletableFuture

RxJava

Reactor

Composable(可组合)

✔️

✔️

✔️

Asynchronous(异步)

✔️

✔️

✔️

✔️

Operator fusion(操作融合)

✔️

✔️

Lazy(延迟执行)

✔️

✔️

Backpressure(背压)

✔️

✔️

  • 可组合:可以将多个依赖操作通过不同的方式进行编排,例如CompletableFuture提供thenCompose、thenCombine等各种then开头的方法,这些方法就是对“可组合”特性的支持。

  • 操作融合:将数据流中使用的多个操作符以某种方式结合起来,进而降低开销(时间、内存)。

  • 延迟执行:操作不会立即执行,当收到明确指示时操作才会触发。例如Reactor只有当有订阅者订阅时,才会触发操作。

  • 背压:某些异步阶段的处理速度跟不上,直接失败会导致大量数据的丢失,对业务来说是不能接受的,这时需要反馈上游生产者降低调用量。

RxJava与Reactor显然更加强大,它们提供了更多的函数调用方式,支持更多特性,但同时也更复杂,项目最需要的特性是“异步”、“可编排”,所以选择CompletableFuture。

CompletableFuture

CompletableFuture的定义

CompletableFuture是由Java 8引入的,类结构图如下

75a9710d2053b2fa0654c67cd7f35a0c18774.png

CompletableFuture实现了两个接口(如上图所示):Future、CompletionStage。Future表示异步计算的结果,CompletionStage用于表示异步执行过程中的一个步骤(Stage),这个步骤可能是由另外一个CompletionStage触发的,随着当前步骤的完成,也可能会触发其他一系列CompletionStage的执行。从而我们可以根据实际业务对这些步骤进行多样化的编排组合,CompletionStage接口正是定义了这样的能力,我们可以通过其提供的thenAppy、thenCompose等函数式编程方法来组合编排这些步骤。

CompletableFuture的使用

在编排多个不同的任务的时候,任务之间可以存在依赖关系,按照依赖数量不同可分为一元依赖,二元依赖,多元依赖

ExecutorService executor = Executors.newFixedThreadPool(5);

//=====================================零依赖=====================================
//1、使用runAsync或supplyAsync发起异步调用
CompletableFuture<String> cf1 = CompletableFuture.supplyAsync(() -> {
  return "result1";
}, executor);

//2、CompletableFuture.completedFuture()直接创建一个已完成状态的CompletableFuture
CompletableFuture<String> cf2 = CompletableFuture.completedFuture("result2");

//3、先初始化一个未完成的CompletableFuture,然后通过complete()、completeExceptionally(),完成该CompletableFuture
CompletableFuture<String> cf = new CompletableFuture<>();
cf.complete("success");
//=====================================依赖一个=====================================
// 适用场景:
//   想要在一个异步操作完成后对结果进行转换或进一步处理
//   后续操作依赖于前一个异步操作的结果
CompletableFuture<String> cf3 = cf1.thenApply(result1 -> {
  //result1为CF1的结果
  //......
  return "result3";
});
CompletableFuture<String> cf5 = cf2.thenApply(result2 -> {
  //result2为CF2的结果
  //......
  return "result5";
});
//=====================================依赖两个=====================================
// 适用场景:
//   合并两个独立的 CompletableFuture,并对它们的结果进行进一步的处理
//   有两个独立的异步操作,想要在它们都完成后对它们的结果进行合并或处理
//   需要组合多个异步操作的结果来进行进一步计算
CompletableFuture<String> cf4 = cf1.thenCombine(cf2, (result1, result2) -> {
  //result1和result2分别为cf1和cf2的结果
  return "result4";
});
//=====================================依赖多个=====================================
CompletableFuture<Void> cf6 = CompletableFuture.allOf(cf3, cf4, cf5);
CompletableFuture<String> result = cf6.thenApply(v -> {
  //这里的join并不会阻塞,因为传给thenApply的函数是在CF3、CF4、CF5全部完成时,才会执行 。
  result3 = cf3.join();
  result4 = cf4.join();
  result5 = cf5.join();
  //根据result3、result4、result5组装最终result;
  return "result";
});
//=====================================组合使用=====================================
CompletableFuture<ProductSpu> future1 = CompletableFuture.supplyAsync(() -> {
//            printThread("future1");
            // 调用产品rpc
            HashMap<String, String> productSpuParam = Maps.newHashMap();
            productSpuParam.put("id", castInsuranceBO.getSpuId());
            ProductSpu productSpu = rpcComponent.rpcQuery(productSpuParam, product_spu, ProductSpu.class);
            context.setProductSpu(productSpu);
            return productSpu;
        }, wrappedCasTaskExecutor);

        CompletableFuture<Object> future1Son1 = future1.thenApplyAsync((productSpu) -> {
//            printThread("future1-1");
            // 调用保险企业rpc
            HashMap<String, String> insuranceEnterpriseParam = Maps.newHashMap();
            insuranceEnterpriseParam.put("id", productSpu.getInsuranceEnterpriseId());
            InsuranceEnterprise insuranceEnterprise = rpcComponent.rpcQuery(insuranceEnterpriseParam, insurance_enterprise, InsuranceEnterprise.class);
            context.setInsuranceEnterprise(insuranceEnterprise);
            return insuranceEnterprise;
        }, wrappedCasTaskExecutor);

        CompletableFuture<ProductSpu> future1Son2 = future1.thenApplyAsync((productSpu) -> {
//            printThread("future1-2");
            // 调用险种rpc
            HashMap<String, String> productDangerPlantedParam = Maps.newHashMap();
            productDangerPlantedParam.put("id", productSpu.getDangerPlantedId());
            ProductDangerPlanted productDangerPlanted = rpcComponent.rpcQuery(productDangerPlantedParam, product_danger_planted, ProductDangerPlanted.class);
            context.setProductDangerPlanted(productDangerPlanted);
            return productSpu;
        }, wrappedCasTaskExecutor);

        CompletableFuture<ProductDangerPlantedCateGory> future2 = CompletableFuture.supplyAsync(() -> {
//            printThread("future2");
            // 调用险种分类rpc
            HashMap<String, String> productDangerPlantedCateGoryParam = Maps.newHashMap();
            productDangerPlantedCateGoryParam.put("id", castInsuranceBO.getDangerPlantedCategoryId());
            ProductDangerPlantedCateGory productDangerPlantedCateGory = rpcComponent.rpcQuery(productDangerPlantedCateGoryParam, product_danger_planted_category, ProductDangerPlantedCateGory.class);
            context.setProductDangerPlantedCateGory(productDangerPlantedCateGory);
            return productDangerPlantedCateGory;
        }, wrappedCasTaskExecutor);

        CompletableFuture<ProductSpuExtension> future3 = CompletableFuture.supplyAsync(() -> {
//            printThread("future3");
            // 调用险种分类rpc
            // 产品扩展rpc
            HashMap<String, String> productSpuExtensionParam = Maps.newHashMap();
            productSpuExtensionParam.put("spuId", castInsuranceBO.getSpuId());
            ProductSpuExtension productSpuExtension = rpcComponent.rpcQuery(productSpuExtensionParam, product_spu_extension, ProductSpuExtension.class);
            context.setProductSpuExtension(productSpuExtension);
            return productSpuExtension;
        }, wrappedCasTaskExecutor);

        CompletableFuture<ProductSku> future4 = CompletableFuture.supplyAsync(() -> {
//            printThread("future4");
            // 调用产品sku rpc
            HashMap<String, String> productSkuParam = Maps.newHashMap();
            productSkuParam.put("id", castInsuranceBO.getSkuId());
            ProductSku productSku = rpcComponent.rpcQuery(productSkuParam, product_sku, ProductSku.class);
            context.setProductSku(productSku);
            return productSku;
        }, wrappedCasTaskExecutor);

        CompletableFuture<ProductSpuScope> future5 = CompletableFuture.supplyAsync(() -> {
//            printThread("future5");
            // 产品范围rpc
            HashMap<String, String> productSpuScopeParam = Maps.newHashMap();
            productSpuScopeParam.put("id", castInsuranceBO.getSpuScopeId());
            ProductSpuScope productSpuScope = rpcComponent.rpcQuery(productSpuScopeParam, product_spu_scope, ProductSpuScope.class);
            context.setProductSpuScope(productSpuScope);
            castInsuranceBO.setSupplementReview(productSpuScope.getSupplementReview());
            return productSpuScope;
        }, wrappedCasTaskExecutor);

        CompletableFuture<SplitCastInsuranceRules> future6 = CompletableFuture.supplyAsync(() -> {
//            printThread("future6");
            // 拆单规则
            SplitCastInsuranceRules splitCastInsuranceRules = splitCastInsuranceRulesMapper.selectOne(new LambdaQueryWrapper<SplitCastInsuranceRules>().eq(SplitCastInsuranceRules::getInsuranceEnterpriseId, castInsuranceBO.getInsuranceEnterpriseId())
                    .eq(SplitCastInsuranceRules::getInsuranceEnterpriseInstitutionsId, castInsuranceBO.getInsuranceEnterpriseInstitutionsId()));
            context.setSplitCastInsuranceRules(splitCastInsuranceRules);
            return splitCastInsuranceRules;
        }, wrappedCasTaskExecutor);
        // 等待任务全部执行完成
        CompletableFuture.allOf(future1, future1Son1, future1Son2, future2, future3, future4, future5, future6).join();
// 适用场景:
//   需要依赖外部多个RPC接口或者服务提供数据才能进行下一步操作


CompletableFuture.anyOf(future1, future1Son1, future1Son2, future2, future3, future4, future5, future6).join();
// 适用场景:
//   存在多个外部备选接口,只要比如查询快递信息,我们只要其中一个效率最高的接口返回信息即可

CompletableFuture的原理

CompletableFuture中包含两个字段:resultstack。result用于存储当前CF的结果,stack(Completion)表示当前CF完成后需要触发的依赖动作(Dependency Actions),去触发依赖它的CF的计算,依赖动作可以有多个(表示有多个依赖它的CF),以栈(Treiber stack)(使用无锁的 CAS 操作来实现线程安全,在高并发环境下性能更优,避免了锁的开销和死锁问题)的形式存储,stack表示栈顶元素。

7501722856154_.pic.jpg

7521722856691_.pic.jpg

这种方式类似“观察者模式”,依赖动作(Dependency Action)都封装在一个单独Completion子类中。下面是Completion类关系结构图。CompletableFuture中的每个方法都对应了图中的一个Completion的子类,Completion本身是观察者的基类。

  • UniCompletion继承了Completion,是一元依赖的基类,例如thenApply的实现类UniApply就继承自UniCompletion。

  • BiCompletion继承了UniCompletion,是二元依赖的基类,也是多元依赖的基类。例如thenCombine的实现类BiRelay就继承自BiCompletion。

5a889b90d0f2c2a0f6a4f294b9094194112106.png

f45b271b656f3ae243875fcb2af36a1141224.png

被观察者
  1. 每个CompletableFuture都可以被看作一个被观察者,其内部有一个Completion类型的链表成员变量stack,用来存储注册到其中的所有观察者。当被观察者执行完成后会弹栈stack属性,依次通知注册到其中的观察者。上面例子中步骤fn2就是作为观察者被封装在UniApply中。

  2. 被观察者CF中的result属性,用来存储返回结果数据。这里可能是一次RPC调用的返回值,也可能是任意对象,在上面的例子中对应步骤fn1的执行结果。

被观察者

CompletableFuture支持很多回调方法,例如thenAccept、thenApply、exceptionally等,这些方法接收一个函数类型的参数f,生成一个Completion类型的对象(即观察者),并将入参函数f赋值给Completion对应操作子类的成员变量fn,然后检查当前CF是否已处于完成状态(即result != null),如果已完成直接触发fn,否则将观察者Completion加入到CF的观察者链stack中,再次尝试触发,如果被观察者未执行完则其执行完毕之后通知触发。

  1. 观察者中的dep属性:指向其对应的CompletableFuture,在上面的例子中dep指向CF2。

  2. 观察者中的src属性:指向其依赖的CompletableFuture,在上面的例子中src指向CF1。

    @SuppressWarnings("serial")
    abstract static class UniCompletion<T,V> extends Completion {
        Executor executor;                 // executor to use (null if none)
        CompletableFuture<V> dep;          // the dependent to complete 最终要完成的任务
        CompletableFuture<T> src;          // source for action  输入,也就是需要依赖完成的cf

        UniCompletion(Executor executor, CompletableFuture<V> dep,
                      CompletableFuture<T> src) {
            this.executor = executor; this.dep = dep; this.src = src;
        }

        /**
         * Returns true if action can be run. Call only when known to
         * be triggerable. Uses FJ tag bit to ensure that only one
         * thread claims ownership.  If async, starts as task -- a
         * later call to tryFire will run action.
         */
        final boolean claim() {
            Executor e = executor;
            if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
                if (e == null)
                    return true;
                executor = null; // disable
                e.execute(this);
            }
            return false;
        }

        final boolean isLive() { return dep != null; }
    }
  1. 观察者Completion中的fn属性:用来存储具体的等待被回调的函数。这里需要注意的是不同的回调方法(thenAccept、thenApply、exceptionally等)接收的函数类型也不同,即fn的类型有很多种,在上面的例子中fn指向fn2。

@SuppressWarnings("serial")
static final class UniApply<T,V> extends UniCompletion<T,V> {
    Function<? super T,? extends V> fn;
    UniApply(Executor executor, CompletableFuture<V> dep,
             CompletableFuture<T> src,
             Function<? super T,? extends V> fn) {
        super(executor, dep, src); this.fn = fn;
    }
    final CompletableFuture<V> tryFire(int mode) {
        CompletableFuture<V> d; CompletableFuture<T> a;
        if ((d = dep) == null ||
            !d.uniApply(a = src, fn, mode > 0 ? null : this))
            return null;
        dep = null; src = null; fn = null;
        return d.postFire(a, mode);
    }
}
整体流程
一元依赖

举例分析一下thenApply方法

 public <U> CompletableFuture<U> thenApply(
        Function<? super T,? extends U> fn) {
      // 传入依赖任务完成后的函数操作,委托给uniApplyStage方法, 此处不是async,所以Executor为null
        return uniApplyStage(null, fn);
    }

   
    private <V> CompletableFuture<V> uniApplyStage(
        Executor e, Function<? super T,? extends V> f) {
        if (f == null) throw new NullPointerException();
      // d:目标cf,thenApply操作完成后生成的新的cf。
        CompletableFuture<V> d =  new CompletableFuture<V>();
     // e不为null,即需要异步执行,否则判断源cf是否完成,尝试同步执行函数f
        if (e != null || !d.uniApply(this, f, null)) {
      // 创建一个新的UniApply对象,封装目标cf`d`, 当前cf、 `this`、 函数f,  d依赖this的完成
            UniApply<T,V> c = new UniApply<T,V>(e, d, this, f);
      // 将其推送到当前cf的依赖链中
            push(c);
      // 再次判断源cf是否完成, 尝试同步执行函数f,防止因为时间差导致任务执行完成,依赖回调无法被触发
            c.tryFire(SYNC);
        }
        return d;
    }


    final <S> boolean uniApply(CompletableFuture<S> a,
                               Function<? super S,? extends T> f,
                               UniApply<S,T> c) {
        Object r; Throwable x;
        // 检查院源cf、cf的结果、执行函数,任何一个为空则返回false
        if (a == null || (r = a.result) == null || f == null)
            return false;
       // 检查当前cf的result是否为null,如果不为null,说明当前cf已经完成,不需要再处理,直接返回true
        tryComplete: if (result == null) {
       // 异常处理
            if (r instanceof AltResult) {
                if ((x = ((AltResult)r).ex) != null) {
                    completeThrowable(x, r);
                    break tryComplete;
                }
                r = null;
            }
            try {
              // 执行任务
                if (c != null && !c.claim())
                    return false;
                @SuppressWarnings("unchecked") S s = (S) r;
                // 执行任务
                completeValue(f.apply(s));
            } catch (Throwable ex) {
                completeThrowable(ex);
            }
        }
        return true;
    }


    final void push(UniCompletion<?,?> c) {
        if (c != null) {
        // 如果当前任务还没完成,循环尝试将c压入依赖链
            while (result == null && !tryPushStack(c))
                lazySetNext(c, null); // clear on failure 失败时将c的next设为null,清除设置,以准备重试
        }
    }

    final boolean tryPushStack(Completion c) {
      // 获取依赖链头部
        Completion h = stack;
      // 设置c的next为当前链表头
        lazySetNext(c, h);
      // 使用CAS把c设置为当前链表头
        return UNSAFE.compareAndSwapObject(this, STACK, h, c);
    }

    static void lazySetNext(Completion c, Completion next) {
        // 使用UNSAFE类的putOrderedObject方法以有序写的方式把c的next设置为next,
  // //有序、延迟版本的putObjectVolatile方法,不保证值的改变被其他线程立即看到(这里不需要保证)
  // 使用轻量级内存屏障,这种方式比普通的volatile 写操作性能更好
        UNSAFE.putOrderedObject(c, NEXT, next);
    }
  

        final CompletableFuture<V> tryFire(int mode) {
            CompletableFuture<V> d; CompletableFuture<T> a;
        // 这里再次判断源任务是否已经完成
            if ((d = dep) == null ||
                !d.uniApply(a = src, fn, mode > 0 ? null : this))
                return null;
            dep = null; src = null; fn = null;
            // 尝试执行源cf的回调和当前cf的回调
            return d.postFire(a, mode);
        }


    final CompletableFuture<T> postFire(CompletableFuture<?> a, int mode) {
      // a为当前cf
        if (a != null && a.stack != null) {
            if (mode < 0 || a.result == null)
       // 异常情况,清理依赖栈,防止内存泄漏
                a.cleanStack();
            else
       // 触发源cf回调
                a.postComplete();
        }
      // result为当前cf的结果
        if (result != null && stack != null) {
            if (mode < 0)
                return this;
            else
       // 触发当前cf回调
                postComplete();
        }
        return null;
    }

    final void postComplete() {
        /*
         * On each step, variable f holds current dependents to pop
         * and run.  It is extended along only one path at a time,
         * pushing others to avoid unbounded recursion.
         */
      // f为当前cf, h为当前栈顶的Completion
        CompletableFuture<?> f = this; Completion h;
      // 如果当前栈顶不为空 或 遍历到其他cf对象且其栈顶为空的时候,将f重新指向this继续获取其栈顶对象
        while ((h = f.stack) != null ||
               (f != this && (h = (f = this).stack) != null)) {
            CompletableFuture<?> d; Completion t;
      //使用cas修改栈顶为链表中下一个Completion
            if (f.casStack(h, t = h.next)) {
      // 如果f不是当前cf,且f的栈顶还有下一个元素,也就是链表节点超过1个时,重新压入栈中不处理,避免栈溢出
      // 可以理解成只处理当前cf的回调和cf依赖中链表依赖中只有一个节点的回调
                if (t != null) {
                    if (f != this) {
                        pushStack(h);
                        continue;
                    }
                    h.next = null;    // detach
                }
        // 回调执行
                f = (d = h.tryFire(NESTED)) == null ? this : d;
            }
        }
    }

整体流程图如下

606323a07fb7e31cb91f46c879d99b8d735272.gif

二元依赖

大致数据结构


b969e49a7eedbd52b014f86e86dcd3fc49634.png

thenCombine操作表示依赖两个CompletableFuture。其观察者实现类为BiApply,如上图所示,BiApply通过src和snd两个属性关联被依赖的两个CF,fn属性的类型为BiFunction。与单个依赖不同的是,在依赖的CF未完成的情况下,thenCombine会尝试将BiApply压入这两个被依赖的CF的栈中,每个被依赖的CF完成时都会尝试触发观察者BiApply,BiApply会检查两个依赖是否都完成,如果完成则开始执行。这里为了解决重复触发的问题,同样用的是上一章节提到的CAS操作,执行时会先通过CAS设置状态位,避免重复触发。

代码分析:

 public <U,V> CompletableFuture<V> thenCombine(
        CompletionStage<? extends U> other,
        BiFunction<? super T,? super U,? extends V> fn) {
      // 委托给biApplyStage方法,观察者实现类为BiApply
        return biApplyStage(null, other, fn);
    }

    private <U,V> CompletableFuture<V> biApplyStage(
        Executor e, CompletionStage<U> o,
        BiFunction<? super T,? super U,? extends V> f) {
        CompletableFuture<U> b;
        if (f == null || (b = o.toCompletableFuture()) == null)
            throw new NullPointerException();
        CompletableFuture<V> d = new CompletableFuture<V>();
      // 和thenApply同样的逻辑,判断任务是否完成,尝试执行函数
        if (e != null || !d.biApply(this, b, f, null)) {
            BiApply<T,U,V> c = new BiApply<T,U,V>(e, d, this, b, f);

            bipush(b, c);
      // 再次尝试
            c.tryFire(SYNC);
        }
        return d;
    }

    final void bipush(CompletableFuture<?> b, BiCompletion<?,?,?> c) {
        if (c != null) {
            Object r;
          // 将外面传入的BiApply压入当前cf依赖栈中
            while ((r = result) == null && !tryPushStack(c))
                lazySetNext(c, null); // clear on failure
            if (b != null && b != this && b.result == null) {
          // 将外面传入的BiApply压入另一个cf依赖栈中,如果当前cf未完成,使用CoCompletion包转BiCompletion确保
          // 组合操作在组合都完成以后才被触发,因为触发的时候会调用到CoCompletion的tryFire方法
                Completion q = (r != null) ? c : new CoCompletion(c);
                while (b.result == null && !b.tryPushStack(q))
                    lazySetNext(q, null); // clear on failure
            }
        }
    }

    @SuppressWarnings("serial")
    static final class CoCompletion extends Completion {
        // 持有组合引用
        BiCompletion<?,?,?> base;
        CoCompletion(BiCompletion<?,?,?> base) { this.base = base; }
        final CompletableFuture<?> tryFire(int mode) {
            BiCompletion<?,?,?> c; CompletableFuture<?> d;
        // 触发组合操作,tryFire最终调用到BiApply中的biApply方法
            if ((c = base) == null || (d = c.tryFire(mode)) == null)
                return null;
            base = null; // detach
            return d;
        }
        // 检查当前组合是否有效
        final boolean isLive() {
            BiCompletion<?,?,?> c;
            return (c = base) != null && c.dep != null;
        }
    }

final <R,S> boolean biApply(CompletableFuture<R> a,
                                CompletableFuture<S> b,
                                BiFunction<? super R,? super S,? extends T> f,
                                BiApply<R,S,T> c) {
        Object r, s; Throwable x;
        // 判断组合中的cf是否都已完成, 
        if (a == null || (r = a.result) == null ||
            b == null || (s = b.result) == null || f == null)
            return false;
            ...
         // 通过CAS判断任务是否已经回调执行过
         if (c != null && !c.claim())
                    return false;
         completeValue(f.apply(rr, ss));
         return true;
    }
多元依赖

依赖多个CompletableFuture的回调方法包括allOfanyOf,区别在于allOf观察者实现类为BiRelay,需要所有被依赖的CF完成后才会执行回调;而anyOf观察者实现类为OrRelay,任意一个被依赖的CF完成后就会触发。二者的实现方式都是将多个被依赖的CF构建成一棵平衡二叉树,执行结果层层通知,直到根节点,触发回调监听。

cef5469b5ec2e67ecca1b99a07260e4e22003.png

异步执行回调时机

如果是源任务即单任务使用supplyAsync/runAsync方法执行,任务会被包装成一个AsyncSupply Runnable对象放到线程池中执行,触发回调自然在run方法中

static final class AsyncSupply<T> extends ForkJoinTask<Void>
            implements Runnable, AsynchronousCompletionTask {
        CompletableFuture<T> dep; Supplier<T> fn;
        AsyncSupply(CompletableFuture<T> dep, Supplier<T> fn) {
            this.dep = dep; this.fn = fn;
        }

        public final Void getRawResult() { return null; }
        public final void setRawResult(Void v) {}
        public final boolean exec() { run(); return true; }

        public void run() {
            CompletableFuture<T> d; Supplier<T> f;
            if ((d = dep) != null && (f = fn) != null) {
                dep = null; fn = null;
                if (d.result == null) {
                    try {
                        d.completeValue(f.get());
                    } catch (Throwable ex) {
                        d.completeThrowable(ex);
                    }
                }
                // 触发回调
                d.postComplete();
            }
        }
    }

如果是回调任务则在postFire触发时通过UniCompletion中的claim方法执行

        final boolean claim() {
            Executor e = executor;
            if (compareAndSetForkJoinTaskTag((short)0, (short)1)) {
                if (e == null)
                    return true;
                executor = null; // disable
                e.execute(this);
            }
            return false;
        }

使用注意点

代码执行在哪个线程上

同步方法(即不带Async后缀的方法)有两种情况。

  • 如果注册时被依赖的操作已经执行完成,则直接由当前线程执行。

  • 如果注册时被依赖的操作还未执行完,则由回调线程执行。

异步方法:由显示传递的线程池执行,如果不传是由默认线程池执行

异步任务显示传递线程池

异步任务:当不传递Executor时,会使用ForkJoinPool中的共用线程池CommonPool(CommonPool的大小是CPU核数-1,如果是IO密集的应用,线程数可能成为瓶颈),正确姿势是自定义线程池显示传递,不同业务线程池隔离开,避免相互影响,可以使用动态线程池方便动态调节参数。

存在父子任务耗尽线程池导致死锁

public Object doGet() {
  ExecutorService threadPool1 = new ThreadPoolExecutor(10, 10, 0L, TimeUnit.MILLISECONDS, new ArrayBlockingQueue<>(100))
  CompletableFuture cf1 = CompletableFuture.supplyAsync(() -> {
  //do sth
    return CompletableFuture.supplyAsync(() -> {
        System.out.println("child");
        return "child";
      }, threadPool1).join();//子任务
    }, threadPool1);
  return cf1.join();
}

如上代码块所示,doGet方法第三行通过supplyAsync向threadPool1请求线程,并且内部子任务又向threadPool1请求线程。threadPool1大小为10,当同一时刻有10个请求到达,则threadPool1被打满,子任务请求线程时进入阻塞队列排队,但是父任务的完成又依赖于子任务,这时由于子任务得不到线程,父任务无法完成。主线程执行cf1.join()进入阻塞状态,并且永远无法恢复。

异步RPC调用注意不要阻塞IO线程池

服务异步化后很多步骤都会依赖于异步RPC调用的结果,这时需要特别注意一点,如果是使用基于NIO(比如Netty)的异步RPC,如Dubbo异步调用,则返回结果是由IO线程负责设置的,即回调方法由IO线程触发,CompletableFuture同步回调(如thenApply、thenAccept等无Async后缀的方法)如果依赖的异步RPC调用的返回结果,那么这些同步回调将运行在IO线程上,而整个服务只有一个IO线程池,这时需要保证同步回调中不能有阻塞等耗时过长的逻辑,否则在这些逻辑执行完成前,IO线程将一直被占用,影响整个服务的响应。

异常处理

由于异步执行的任务在其他线程上执行,而异常信息存储在线程栈中,因此当前线程除非阻塞等待返回结果,否则无法通过try\catch捕获异常。CompletableFuture提供了异常捕获回调exceptionally,相当于同步调用中的try\catch。当我们需要显示捕获异常,或者在某些服务暂不可用的情况下想使用默认值(类似于降级操作)以不影响主流程继续执行,可以显示处理。

    @Test
    void testEx() {
        ExecutorService executor = Executors.newFixedThreadPool(5);
        try {
            CompletableFuture<Object> future1 = CompletableFuture.supplyAsync(() -> {
                log.info ("curr-thread-------" + Thread.currentThread().getName());
                throw new RuntimeException("Oops!");
            }, executor).exceptionally(err -> {
                log.error("future1 error", err);
                // 返回默认值
                return 0;
            });

            CompletableFuture<Integer> future2 = CompletableFuture.supplyAsync(() -> {
//                throw new RuntimeException("Oops!");
                return 0;
            }, executor);

            CompletableFuture.allOf(future1, future2).join();
            log.info("completed");
        } catch (Exception e) {
            log.error("completed error", e);
        }
    }

但是返回的异常会存在被包装成CompletionException的情况,真正的异常存储在cause属性中,需要调用返回对象的getCause方法提取

原始

7531723172411_.pic.jpg

提取后

总结

本篇介绍了CompletableFuture的使用及其大概原理,其巧妙的回调机制实现,基于CAS的无锁操作,及其他设计思路值得学习和研究。

5

评论区