5-3.Netty源码:InBound事件传播

InBound事件传播

Posted by ZhaoLe on February 9, 2019

首先自定义三个inbound处理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//InBoundA
public class InBoundA extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("In Bound A" + msg);
        ctx.fireChannelRead(msg);
    }

    @Override
    public void channelActive(ChannelHandlerContext ctx) throws Exception {
        ctx.channel().pipeline().fireChannelRead("Test");
    }
}
//InBoundB
public class InBoundB extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("In Bound B" + msg);
        ctx.fireChannelRead(msg);
    }

}
//InBoundC
public class InBoundC extends ChannelInboundHandlerAdapter {
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
        System.out.println("In Bound C" + msg);
        ctx.fireChannelRead(msg);
    }
}

InBound事件是按照ChannelHandler的顺序进行执行的。 IMAGE 从head节点的channelRead开始,使用debug方式启动服务器端,然后在使用客户端连接(下面代码有删减,只保留传播流程中部分代码)。

当有新连接来到的时候,InBoundA中的channelActive方法触发,事件从调用fireChannelRead开始传播。

1
2
3
4
5
//DefaultChannelPipeline.java
public final ChannelPipeline fireChannelRead(Object msg) {
  AbstractChannelHandlerContext.invokeChannelRead(head, msg);
  return this;
}
  • 【3】注意这个head说明事件是从head开始传播的。head则是在创建pipeline时候已经创建好了。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
//AbstractChannelHandlerContext.java
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  next.invokeChannelRead(m);
}

private void invokeChannelRead(Object msg) {
  ((ChannelInboundHandler) handler()).channelRead(this, msg);
}

//DefaultChannelPipeline.java
 @Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    ctx.fireChannelRead(msg);
}
1
2
3
4
5
6
//AbstractChannelHandlerContext.java
@Override
public ChannelHandlerContext fireChannelRead(final Object msg) {
    invokeChannelRead(findContextInbound(), msg);
    return this;
}
  • findContextInbound()
    1
    2
    3
    4
    5
    6
    7
    
        private AbstractChannelHandlerContext findContextInbound() {
         AbstractChannelHandlerContext ctx = this;
         do {
             ctx = ctx.next;
         } while (!ctx.inbound);
         return ctx;
     }
    

    while循环,去找下一个Inbound节点

1
2
3
4
5
6
7
8
//AbstractChannelHandlerContext.java
static void invokeChannelRead(final AbstractChannelHandlerContext next, Object msg) {
  next.invokeChannelRead(m)    
}

private void invokeChannelRead(Object msg) {
  ((ChannelInboundHandler) handler()).channelRead(this, msg);
}

调用到服务器端中InBoundA#channelRead()然后在方法内部继续调用ctx.fireChannelRead(msg);向下传播,继续调用上述的几个方法直到进去tail节点。 需要说明的是ctx.channel().pipeline().fireChannelRead();是从head节点向下进行事件传播 ctx.fireChannelRead();是从当前节点向下进行事件传播。

一直在最后到Tail的节点。会调用TailContext中的ChannelRead方法

1
2
3
4
5
6
7
8
9
10
//DefaultChannelPipeline.java
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
    onUnhandledInboundMessage(msg);
}

//DefaultChannelPipeline.java
protected void onUnhandledInboundMessage(Object msg) {
  ReferenceCountUtil.release(msg);
}

【9】如果msg是实现ReferenceCounted接口的消息类型类型(也就是ByteBuf),那么传到tail节点后会被释放掉(引用计数减一)。