侧边栏壁纸
博主头像
博主等级

  • 累计撰写 19 篇文章
  • 累计创建 34 个标签
  • 累计收到 4 条评论

目 录CONTENT

文章目录

使用Pipeline模式拆解电商等复杂场景业务

前尘一梦
2023-01-28 / 0 评论 / 2 点赞 / 70 阅读 / 9141 字

前言

在电商,保险,餐饮等领域我们经常会遇到比较复杂的业务场景,比如在电商领域的下单业务中,需要根据用户的身份(是否会员),身份等级,商品属性,用户所处的位置属性等走不同的业务分支,不同业务分支的逻辑处理可能各不相同,每个分支的逻辑处理特别多且不固定,在后续产品完善,需求迭代过程中,需要频繁对各条业务线增减或修改。这时候如果我们只是在原有业务代码的基础上堆积木,业务类可能会越来越膨胀,越来越复杂,终成shi山,以至于后期难以维护,难以及时响应业务方需求,这是一个技术人所不能忍受的。 在此分享一下如何使用pipeline模式拆分,解耦这种复杂业务。

Spring源码中用到的责任链

Pipeline模式是责任链模式的一个变种,首先从责任链模式说起,看过Spring源码的同学都大概看到过在核心Http请求处理类DispatcherServlet中,在获取handler的时候,具体获取到的其实是一个被包装过的HandlerExecutionChain

2a460c40276444b0924cfeeb2a327fa6~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75 (1).jpg

16b58ff97af4482195ae5e980acabbe6~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

c27ed382df4d4893980cb01970abb817~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

HandlerExecutionChain中除了handler以外,还包含一堆实现了HandlerInterceptor接口的拦截器

bea385dce569423883267aaaa074f62d~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

里面有preHandle()、postHandle()、afterCompletion()三个方法,实现这三个方法可以分别在调用"Controller"方法之前,调用"Controller"方法之后渲染"ModelAndView"之前,以及渲染"ModelAndView"之后执行,每个方法都对应着HandlerExecutionChain中的applyxxx方法,真正执行时通过 转化为数组用指针遍历执行

838707d7aba2422aa95350b311203000~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

此处的处理链类似于责任链

34a2389cd5924190946a689c1a2393e3~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

在aop模块的ReflectiveMethodInvocation类中,执行增强逻辑时,也是通过递归调用形成责任链来执行

public Object proceed() throws Throwable {
   // We start with an index of -1 and increment early.
   // 执行完所有增强后执行切点方法
   if (this.currentInterceptorIndex == this.interceptorsAndDynamicMethodMatchers.size() - 1) {
      return invokeJoinpoint();
   }

   // 获取下一个要执行的拦截器
   Object interceptorOrInterceptionAdvice =
         this.interceptorsAndDynamicMethodMatchers.get(++this.currentInterceptorIndex);
   if (interceptorOrInterceptionAdvice instanceof InterceptorAndDynamicMethodMatcher) {
      // Evaluate dynamic method matcher here: static part will already have
      // been evaluated and found to match.
      // 动态匹配
      InterceptorAndDynamicMethodMatcher dm =
            (InterceptorAndDynamicMethodMatcher) interceptorOrInterceptionAdvice;
      Class<?> targetClass = (this.targetClass != null ? this.targetClass : this.method.getDeclaringClass());
      if (dm.methodMatcher.matches(this.method, targetClass, this.arguments)) {
         return dm.interceptor.invoke(this);
      }
      else {
         // Dynamic matching failed.
         // Skip this interceptor and invoke the next in the chain.
         // 不匹配则不执行拦截器
         return proceed();
      }
   }
   else {
      // 普通拦截器,直接调用拦截器,比如 ExposeInvocationInterceptor,AspectJAfterAdvice
      // It's an interceptor, so we just invoke it: The pointcut will have
      // been evaluated statically before this object was constructed.
      // 传入this,形成调用链条,如MethodBeforeAdviceInterceptor,会先执行增强方法,再执行后面逻辑
      return ((MethodInterceptor) interceptorOrInterceptionAdvice).invoke(this);
   }
}

0b4ddd3c4d5e43279e73bad4558df268~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

上面第一种是通过index下标去循环调用,无法终止处理,第二种通过递归,耦合比较严重,容易相互影响而无法调试,两者都不适用于业务编排。

Pipeline模式

上面我们说到Pipeline模式是责任链模式的一个变种,下面看看Pipeline的结构

a7a8f77935ae4414938da4a91e9dba69~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

Pipeline为链表结构,包含了头尾节点,当前上下文, 开始-结束方法,对节点的操作方法

 public class DefaultOrderPipeLine implements OrderPipeLine {
//    public static TransmittableThreadLocal<StopWatch> pipeLineStopWatchTTL = new TransmittableThreadLocal<>();

    public OrderContext context;
    public OrderHandlerNode head;
    public OrderHandlerNode tail;

    public DefaultOrderPipeLine(OrderContext context) {
        this.context = context;
        head = new OrderHandlerNode();
        tail = head;
//      pipeLineStopWatchTTL.set(new StopWatch("DefaultCastPipeLine"));
    }
    
        @Override
    public void addFirst(OrderHandler... handlers) {
        OrderHandlerNode handlerNode;
        for (OrderHandler handler : handlers) {
            OrderHandlerNode preNode = head.getNextNode();
            handlerNode = new OrderHandlerNode(handler);
            handlerNode.setNextNode(preNode);
            head.setNextNode(handlerNode);
        }
    }

每个handler-node包含了对下一个节点的引用,部分代码如下

public class OrderHandlerNode {
    private OrderHandler handler;

    private OrderHandlerNode nextNode;

    public OrderHandlerNode(OrderHandler handler) {
        this.handler = handler;
    }
    
    public void execute(OrderContext) {
    }
   }

每个node包含了handler,各个handler之间通过统一上下文handlerContext串联通信

public class OrderContext {
    private String skuId;
    private String goodsName;
    private BigDecimal amount;
}

当触发pipeline的start方法时,handlerContext便会像一艘小船一样在水流中流动执行。 上面举例我们只是创建了订单处理的pipeline,而实际真是业务场景在订单创建前可能需要作很多前置性校验,在订单创建后也可能需要作很多后续工作,如此我们可以起多条pipeline来分别处理不同业务。

Tomcat和Netty中用到的Pipeline模式

Tomcat中的核心模块也用到了这种模式,和上面我们说的这种很相似,它里面包含了4层管道,上层管道会调用下层管道,而每层管道都有一个特定的尾节点,每个尾部节点继承于ValveBase。

99b5378df8694e82b229870513b79470~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

Netty中的Pipeline使用有些不太一样,他包含了出站事件,入站事件。

ee2db57775194b2580f0cf2cb0c5a6fb~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

以上下文DefaultChannelHandlerContext为节点保存handler, 每个节点都有自己的执行器EventExecutor(其继承了java.util包的ScheduledExecutorService类),存储在DefaultChannelPipeline中,并且结构为双向链表,pipeline通过外界传入不同事件触发不同操作,如果是bind操作时, 通过尾节点tail的prev指针从后往前一个个执行,如果是fireChannelRegistered则通过头节点head的next指针从前往后执行

3d778588b1114b449ba6d9b713fbb7f0~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg
我们实际使用的时候也可以借鉴这种结构,在追求性能场景下,可以一次性初始化好所有业务线的pipeline,后面根据参数来选择具体使用哪一种,在业务入口和出口都需要处理的情况下可以使用双向链表,在需要做不同业务线程池隔离或者执行方式隔离的情况下,给每个node添加一个executor。

0c4d5842c75246dbbda400851b5c08c3~tplv-k3u1fbpfcp-jj-mark_3024_0_0_0_q75.jpg

总结

关于此篇核心介绍的Pipeline完整模版代码已经放到Github上,需要的同学可以前往

获取,拉下来即可测试,觉得有用的话,点个赞吧。

2

评论区