ICode9

精准搜索请尝试: 精确搜索
首页 > 编程语言> 文章详细

Netty之责任链模式的过滤链实现源码分析(二)

2021-11-19 23:59:24  阅读:308  来源: 互联网

标签:Netty COMPLETE ctx next write 过滤 promise msg 源码


2021SC@SDUSC

下面分析一下出站数据传播的细节。我们从ChannelOutboundHandlerAdapter的write方法开始分析:


    public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
       ctx.write(msg, promise);
   }

如果想要实现自己的业务处理逻辑,需要继承ChannelOutboundHandlerAdapter并且重写write方法。处理完数据之后,还需要继续向下传递数据,也就是需要调用ctx.write(msg, promise)方法,它会调用AbstractChannelHandlerContext的等效方法,下面是一个它调用链路上的方法:


    private void write(Object msg, boolean flush, ChannelPromise promise) {
       AbstractChannelHandlerContext next = findContextOutbound();
       final Object m = pipeline.touch(msg, next);
       EventExecutor executor = next.executor();
       if (executor.inEventLoop()) {
           if (flush) {
               next.invokeWriteAndFlush(m, promise);
           } else {
               next.invokeWrite(m, promise);
           }
       } else {
           AbstractWriteTask task;
           if (flush) {
               task = WriteAndFlushTask.newInstance(next, m, promise);
           }  else {
               task = WriteTask.newInstance(next, m, promise);
           }
           safeExecute(executor, task, promise, m);
       }
   }

 

这里最关键的是通过findContextOutbound方法来寻找下一个符合要求的handler:

   private AbstractChannelHandlerContext findContextOutbound() {
       AbstractChannelHandlerContext ctx = this;
       do {
           ctx = ctx.prev;
       } while (!ctx.outbound);
       return ctx;
   }

这里从当前的handler,顺着prev往回找,找下一个outband为真的handler。看到这个方法可能会觉得很眼熟,因为和上一次博客中分析的findContextInbound(MASK_CHANNEL_READ)方法很相似,这个方法在入站事件传播时用于寻找下一个符合要求的ChannelHandler:

    private AbstractChannelHandlerContext findContextInbound(int mask) {
        AbstractChannelHandlerContext ctx = this;
        EventExecutor currentExecutor = executor();
        do {
            ctx = ctx.next;
        } while (skipContext(ctx, currentExecutor, mask, MASK_ONLY_INBOUND));
        return ctx;
    }

其实write事件原理和上次分析的read事件类似,区别就是方向和目标不同。入站事件的方向是next方向,目标是InboundHandler。出站事件的方向与入站事件相反,是prev方向,也就是往回传播,目标是OutboundHandler。所以从代码中我们可以看到,这个寻找是顺着prev的方向往回找,也就是出站事件的传播方向为从后往前。

到这里我们明白了ChannelPipeline的事件传播方向。现在来考虑对于入站数据,如果传递到了tail节点,到头了,会有怎样的处理?对于出站数据,如果传递到了head节点,会有怎样的处理?

来看一下 invokeWriteAndFlush方法,它有一个判断 invokeHandler()方法,下面是它的细节:

    private boolean invokeHandler() {
       int handlerState = this.handlerState;
       return handlerState == ADD_COMPLETE || (!ordered && handlerState == ADD_PENDING);
   }

我们重点关注如果 handlerState == ADD_COMPLETE这个条件成立的时候,会返回true,那么就会执行下面的方法:


           invokeWrite0(msg, promise);


  
private void invokeWrite0(Object msg, ChannelPromise promise) {
       try {
           ((ChannelOutboundHandler) handler()).write(this, msg, promise);
       } catch (Throwable t) {
           notifyOutboundHandlerException(t, promise);
       }
   }

我们先来考虑这个判断条件什么时候会成立。这里需要回到我们 HeadContext和TailContext的构造函数上:


        HeadContext(DefaultChannelPipeline pipeline) {
           super(pipeline, null, HEAD_NAME, false, true);
           unsafe = pipeline.channel().unsafe();
           setAddComplete();

       }
       
       TailContext(DefaultChannelPipeline pipeline) {
           super(pipeline, null, TAIL_NAME, true, false);
           setAddComplete();

       }        

    final void setAddComplete() {
       for (;;) {
           int oldState = handlerState;
           // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
           // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
           // exposing ordering guarantees.
           if (oldState == REMOVE_COMPLETE || HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
               return;
           }
       }
   }

这里我们可以看到两个构造方法后面都会调用setAddComplete()方法,这个方法的代码我直接贴在后面了。这里表达的意思是进入一个无限循环,我们不会改变更新,直到oldState的状态改变为REMOVE_COMPLETE。然后会执行 HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)这个原子操作,将handlerState设置为ADD_COMPLETE。这样上面函数的成立条件就解释清楚了。

然后我们来看之后执行的操作:

​

private void invokeWrite0(Object msg, ChannelPromise promise) {
       try {
           ((ChannelOutboundHandler) handler()).write(this, msg, promise);
       } catch (Throwable t) {
           notifyOutboundHandlerException(t, promise);
       }
   }

​

我们先看handler()这个方法,这个方法是返回当前context。也就是说,会执行TailContext的等效方法。比如read,就会执行TailContext的channelRead方法:


        public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
           onUnhandledInboundMessage(msg);
       }

    protected void onUnhandledInboundMessage(Object msg) {
       try {
           logger.debug(
                   "Discarded inbound message {} that reached at the tail of the pipeline. " +
                           "Please check your pipeline configuration.", msg);
       } finally {
           ReferenceCountUtil.release(msg);
       }
   }

可以看到消息传递到tail之后,就不会再传递下去了,并且会释放它。出站事件也是类似,当事件传递到head的时候,会调用HeadContext的等效方法,然后就不会再传递下去了。

标签:Netty,COMPLETE,ctx,next,write,过滤,promise,msg,源码
来源: https://blog.csdn.net/May121812345/article/details/121431245

本站声明: 1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。

专注分享技术,共同学习,共同进步。侵权联系[81616952@qq.com]

Copyright (C)ICode9.com, All Rights Reserved.

ICode9版权所有