Netty中任务分普通任务Runable
和定时任务ScheduledFutureTask
。分别对应两种队列Queue
和PriorityQueue
任务添加
1.普通任务添加
再SingleThreadEventExecutor#execute中进行 addTask
1
2
3
4
5
6
7
protected void addTask(Runnable task) {
...
//taskQeue添加task
if (!offerTask(task)) {
reject(task);
}
}
2.定时任务添加
1
2
3
4
//客户端连接代码
..
bootstrap.config().group().schedule(() -> connect(bootstrap, host, port, retry - 1), delay, TimeUnit.SECONDS);
...
1
2
3
4
5
6
@Override
public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) {
...
return schedule(new ScheduledFutureTask<Void>(
this, command, null, ScheduledFutureTask.deadlineNanos(unit.toNanos(delay))));
}
-
【4-5】把一个定时任务封装为ScheduledFutureTask,然后调用schedule添加到队列中。重写了
compareTo
方法.重写了compareTo方法,截止时间小的向前排,截止时间相同的先入队列的排在前面
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
@Override public int compareTo(Delayed o) { if (this == o) { return 0; } ScheduledFutureTask<?> that = (ScheduledFutureTask<?>) o; long d = deadlineNanos() - that.deadlineNanos(); if (d < 0) { return -1; } else if (d > 0) { return 1; } else if (id < that.id) { return -1; } else if (id == that.id) { throw new Error(); } else { return 1; } }
调用
schedule
添加到定时任务队列1 2 3 4 5 6 7 8 9 10 11 12 13 14
<V> ScheduledFuture<V> schedule(final ScheduledFutureTask<V> task) { if (inEventLoop()) { scheduledTaskQueue().add(task); } else { execute(new Runnable() { @Override public void run() { scheduledTaskQueue().add(task); } }); } return task; }
- 【2】判断是否是在eventLoop线程中,
-
【5-10】如果不是eventLoop继续把添加定时任务作为一个task扔到线程执行器中执行,保证线程安全。
1 2 3 4 5 6 7 8 9 10
//初始化一个长度11的优先级队列 PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue; .. PriorityQueue<ScheduledFutureTask<?>> scheduledTaskQueue() { if (scheduledTaskQueue == null) { scheduledTaskQueue = new DefaultPriorityQueue<ScheduledFutureTask<?>>( SCHEDULED_FUTURE_TASK_COMPARATOR, 11); } return scheduledTaskQueue; }
处理任务
具体是在NioEventLoop#run方法中调用
runAllTasks(long timeoutNanos)
开始处理这些任务,具体实现是在SingleThreadEventExecutor
实现。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
protected boolean runAllTasks(long timeoutNanos) {
fetchFromScheduledTaskQueue();
Runnable task = pollTask();
if (task == null) {
afterRunningAllTasks();
return false;
}
final long deadline = ScheduledFutureTask.nanoTime() + timeoutNanos;
long runTasks = 0;
long lastExecutionTime;
for (; ; ) {
//执行这些任务
safeExecute(task);
runTasks++;
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
-
【2】将定时任务从scheduleTaskQueue中抽出来 存入普通任务队列
taskQueue
队列中1 2 3 4 5 6 7 8 9 10 11 12 13
private boolean fetchFromScheduledTaskQueue() { long nanoTime = AbstractScheduledEventExecutor.nanoTime(); Runnable scheduledTask = pollScheduledTask(nanoTime); while (scheduledTask != null) { if (!taskQueue.offer(scheduledTask)) { scheduledTaskQueue().add((ScheduledFutureTask<?>) scheduledTask); return false; } //轮训 scheduledTask = pollScheduledTask(nanoTime); } return true; }
- 【2-3】获取一个当前纳秒时间,去获取定时任务
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
protected final Runnable pollScheduledTask(long nanoTime) { assert inEventLoop(); Queue<ScheduledFutureTask<?>> scheduledTaskQueue = this.scheduledTaskQueue; ScheduledFutureTask<?> scheduledTask = scheduledTaskQueue == null ? null : scheduledTaskQueue.peek(); if (scheduledTask == null) { return null; } if (scheduledTask.deadlineNanos() <= nanoTime) { scheduledTaskQueue.remove(); return scheduledTask; } return null; }
- 【9-10】如果定时任务截止时间小于当前时间 说明时间到了 需要拿出来处理,拿出来处理的同时从队列中删除,如果没有的话 就不用处理
- 【5-8】添加定时任务到
taskQueue
队列(ScheduledFutureTask实现了Runable接口),如果添加定时任务到队列失败了 需要吧这个任务放回到定时任务队列中继续等下次处理。 - 【10】重复步骤,直到符合条件的定时任务全部取出来并且放入到
taskQueue
中
- 【2-3】获取一个当前纳秒时间,去获取定时任务
- 【3】从taskQueue里面取任务执行
- 【5】其实是调用runAllTasksFrom 执行收尾任务操作 (
tailTasks
从哪来的??????) - 【9】计算下任务截止时间 不能超过传入的
timeoutNanos
时间 - 【14】执行任务,内部就是直接调用
run
方法 - 【16-23】记录下执行的任务次数,每64次判断下当前时间是否超过截止时间,如果当前任务时间超过截止时间,官方说明是生成nanoTime比较耗时。要以固定频率判断下。