3-4.Netty源码:任务和定时任务队列

任务和定时任务队列

Posted by ZhaoLe on January 30, 2019

Netty中任务分普通任务Runable和定时任务ScheduledFutureTask。分别对应两种队列QueuePriorityQueue

任务添加

1.普通任务添加

再SingleThreadEventExecutor#execute中进行 addTask

1
2
3
4
5
6
7
 protected void addTask(Runnable task) {
    ...
    //taskQeue添加task
    if (!offerTask(task)) {
      reject(task);
    }
  }

2.定时任务添加

1
2
3
4
//客户端连接代码
..
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
...
1
2
3
4
5
6
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
    ...
    return schedule(new ScheduledFutureTask<Void>(
            this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
  • 【4-5】把一个定时任务封装为ScheduledFutureTask,然后调用schedule添加到队列中。重写了compareTo方法.

    重写了compareTo方法,截止时间小的向前排,截止时间相同的先入队列的排在前面

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    
      @Override
      public int compareTo(Delayed o) {
        if (this == o) {
            return 0;
        }
        
        ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o;
        long d = deadlineNanos() - that.deadlineNanos();
        if (d < 0) {
            return -1;
        } else if (d > 0) {
            return 1;
        } else if (id < that.id) {
            return -1;
        } else if (id == that.id) {
            throw new Error();
        } else {
            return 1;
        }
      }
    

    调用schedule添加到定时任务队列

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    
    <V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) {
      if (inEventLoop()) {
        scheduledTaskQueue().add(task);
      } else {
        execute(new Runnable() {
          @Override
          public void run() {
              scheduledTaskQueue().add(task);
          }
        });
      }
        
      return task;
    }
    
    • 【2】判断是否是在eventLoop线程中,
    • 【5-10】如果不是eventLoop继续把添加定时任务作为一个task扔到线程执行器中执行,保证线程安全。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      
      //初始化一个长度11的优先级队列
      PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue;
      ..
      PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() {
      if (scheduledTaskQueue == null) {
        scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>(
          SCHEDULED_FUTURE_TASK_COMPARATOR, 11);
      }
        return scheduledTaskQueue;
      }
      

处理任务

具体是在NioEventLoop#run方法中调用 runAllTasks(long timeoutNanos)开始处理这些任务,具体实现是在SingleThreadEventExecutor实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
  protected boolean runAllTasks(long timeoutNanos) {
    fetchFromScheduledTaskQueue();
    Runnable task = pollTask();
    if (task == null) {
        afterRunningAllTasks();
        return false;
    }

    final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
    long runTasks = 0;
    long lastExecutionTime;
    for (; ; ) {
        //执行这些任务
        safeExecute(task);

        runTasks++;

        if ((runTasks & 0x3F) == 0) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            if (lastExecutionTime >= deadline) {
                break;
            }
        }

        task = pollTask();
        if (task == null) {
            lastExecutionTime = ScheduledFutureTask.nanoTime();
            break;
        }
    }

    afterRunningAllTasks();
    this.lastExecutionTime = lastExecutionTime;
    return true;
  }
  • 【2】将定时任务从scheduleTaskQueue中抽出来 存入普通任务队列taskQueue队列中

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    
      private boolean fetchFromScheduledTaskQueue() {
        long nanoTime = AbstractScheduledEventExecutor.nanoTime();
        Runnable scheduledTask = pollScheduledTask(nanoTime);
        while (scheduledTask != null) {
            if (!taskQueue.offer(scheduledTask)) {
                scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask);
                return false;
            }
            //轮训
            scheduledTask = pollScheduledTask(nanoTime);
        }
        return true;
      }
    
    • 【2-3】获取一个当前纳秒时间,去获取定时任务
      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      
       protected final Runnable pollScheduledTask(long nanoTime) {
          assert inEventLoop();
          Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue;
        
          ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek();
          if (scheduledTask == null) {
              return null;
          }
          if (scheduledTask.deadlineNanos() <= nanoTime) {
              scheduledTaskQueue.remove();
              return scheduledTask;
          }
                
          return null;
      }
      
      • 【9-10】如果定时任务截止时间小于当前时间 说明时间到了 需要拿出来处理,拿出来处理的同时从队列中删除,如果没有的话 就不用处理
    • 【5-8】添加定时任务到taskQueue队列(ScheduledFutureTask实现了Runable接口),如果添加定时任务到队列失败了 需要吧这个任务放回到定时任务队列中继续等下次处理。
    • 【10】重复步骤,直到符合条件的定时任务全部取出来并且放入到taskQueue
  • 【3】从taskQueue里面取任务执行
  • 【5】其实是调用runAllTasksFrom 执行收尾任务操作 (tailTasks从哪来的??????)
  • 【9】计算下任务截止时间 不能超过传入的timeoutNanos时间
  • 【14】执行任务,内部就是直接调用run方法
  • 【16-23】记录下执行的任务次数,每64次判断下当前时间是否超过截止时间,如果当前任务时间超过截止时间,官方说明是生成nanoTime比较耗时。要以固定频率判断下。