逻辑跟InBound类似。
首先自定义三个outBound处理器
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
//OutBoundA
public class OutBoundA extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundA " + msg);
ctx.write(msg, promise);
}
@Override
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
ctx.executor().schedule(() -> {
//模拟服务端给客户端写的数据
ctx.channel().pipeline().write("write ing");
}, 5, TimeUnit.SECONDS);
}
}
//OutBoundB
public class OutBoundB extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundB " + msg);
ctx.write(msg, promise);
}
}
//OutBoundC
public class OutBoundC extends ChannelOutboundHandlerAdapter {
@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
System.out.println("OutBoundC " + msg);
ctx.write(msg, promise);
}
}
OutBound事件是按照ChannelHandler的反向进行执行的。
从tail节点的write
开始,使用debug方式启动服务器端,然后在使用客户端连接(下面代码有删减,只保留传播流程中部分代码)。
当有新连接来到的时候,OutBoundAhandlerAdded
方法触发,事件从调用write
开始传播。
1
2
3
4
//DefaultChannelPipeline.java
public final ChannelFuture write(Object msg) {
return tail.write(msg);
}
- 【3】注意这个tail说明事件是从tail开始传播的。tail则是在创建pipeline时候已经创建好了
1
2
3
4
5
6
7
8
9
//AbstractChannelHandlerContext.java
public ChannelFuture write(Object msg) {
return write(msg, newPromise());
}
public ChannelFuture write(final Object msg, final ChannelPromise promise) {
write(msg, false, promise);
return promise;
}
1
2
3
4
5
private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
next.invokeWrite(m, promise);
}
- 【2】while循环找之前一个是OutBound的节点并且返回
1 2 3 4 5 6 7
private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do { ctx = ctx.prev; } while (!ctx.outbound); return ctx; }
1
2
3
4
5
6
7
8
//AbstractChannelHandlerContext.java
private void invokeWrite(Object msg, ChannelPromise promise) {
invokeWrite0(msg, promise);
}
private void invokeWrite0(Object msg, ChannelPromise promise) {
((ChannelOutboundHandler) handler()).write(this, msg, promise);
}
调用到服务器端中OutBoundC#write()
然后在方法内部继续调用ctx.write(msg, promise)
向上传播,继续调用上述的几个方法直到进去head节点。
需要说明的是ctx.channel().pipeline().write()
是从tail节点向上进行事件传播
ctx.write()
是从当前节点向上进行事件传播。
一直在最后到head的节点。会调用HeadContext
中的write方法
1
2
3
4
//DefaultChannelPipeline.java
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
unsafe.write(msg, promise);
}
如果一直没有找到有用的handler,就会在head节点调用这个方法unsafe.write(msg, promise);
,最后msg是实现ReferenceCounted
接口的消息类型类型(也就是ByteBuf),那么传到tail节点后会被释放掉(引用计数减一)。