4-2.Netty源码:NioSocketChannel的线程分配和selector注册

NioSocketChannel的线程分配和selector注册

Posted by ZhaoLe on February 3, 2019

具体实现是在AbstractNioMessageChannel#read()

服务端负责获取新连接并且处理。所以实现方法是在AbstractNioMessageChannel中

1
2
3
4
5
6
7
8
9
10
11
12
13
  @Override
  public void read() {
    try {
     // ...新连接检测+封装NioSocketChannel

      int size = readBuf.size();
      for (int i = 0; i < size; i ++) {
          readPending = false;
          pipeline.fireChannelRead(readBuf.get(i));
      }
    ...//TODO.
  }
}
  • 【4】见【4-1-1.Netty源码:新连接NioSocketChannel 初始化】
  • 【7-10]具体实现是在ServerBootStrap中的channelRead(ChannelHandlerContext ctx, Object msg)

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    
      //ServerBootStrap.java
       public void channelRead(ChannelHandlerContext ctx, Object msg) {
        final Channel child = (Channel) msg;
    
        child.pipeline().addLast(childHandler);
    
        setChannelOptions(child, childOptions, logger);
    
        for (Entry<AttributeKey<?>, Object> e: childAttrs) {
          child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
        }
    
        try {
          childGroup.register(child).addListener(new ChannelFutureListener() {
            @Override
            public void operationComplete(ChannelFuture future) throws Exception {
              if (!future.isSuccess()) {
                  forceClose(child, future.cause());
              }
            }
          });
        }
        ...
      }
    

    方法中的childHandler,childAttrs,childOptions,childGroup的这些值是来自于(NioServerSocketChannel初始化的时候,服务端channel的pipline上增加了ServerBootstrapAcceptor适配器,这些变量被包装在其中。见【2-1-2.Netty源码:注册服务端channel】,然后逐层传播到现在的channelRead方法中

    • 【4】给新连接添加childHandler,这里添加的childHandler是服务端demo启动的时候childHandler中的ChannelInitializer,这个里面是用户自己定义的handler集合
    • 【6-10】给新连接设置childOptions和childAttrs
    • 【13】childGroup中选择一个NioEventLoop并且注册

      1
      2
      3
      4
      5
      6
      7
      8
      9
      
      //MultithreadEventLoopGroup.class
      public EventLoop next() {
        return (EventLoop) super.next();
      }
      
      public ChannelFuture register(Channel channel) {
          //选用事件执行器中的一个线程
          return next().register(channel);
      }
      
      1
      2
      3
      4
      
      //MultithreadEventExecutorGroup.class
       public EventExecutor next() {
        return chooser.next();
      }
      

      这个chooser是NioEventLoopGroup初始化时候构建的,见【1-1.Netty源码:NioEventLoopGroup构造方法】,通过chooser拿到NioEventLoop并且进行注册,注册方法见【2-1-2.Netty源码:注册服务端channel】,跟服务器端注册类似。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      
       private void register0(ChannelPromise promise) {
        try {
          ....
          //真正注册地方 channel注册到selector
          doRegister();
          neverRegistered = false;
          registered = true;
          ...
        } 
      }