Dart event loop

本文基于Dart 2.17


Dart App中所有的代码都在一个isolate中运行(各个isolate之间的代码运行时是隔离的),一个isolate有自己的heap,维持有一个消息队列event_loop,处理两种消息:

  1. event queue 执行用户点击、屏幕刷新、绘制,一般的Future、IO、Stream流等,每次执行完毕都会先检查执行micro task queue中的任务,直到其为空再执行下一个event queue
  2. microTask queue 优先执行,一般执行跑完即弃的小任务,如Dart内部的微任务

上述两种event会在普通的Dart同步方法执行完毕后执行,无论是microTask还是普通的event,他们都是concurrency并行执行(也就是说实际上还是上一个执行完毕,再执行另外一个),所以如果这些event中存在耗时长的方法,依旧会阻塞其他方法的执行,可能导致UI卡顿等情况。


在代码执行的过程中,各种事件(如用户点击、屏幕刷新、future、microtask等)都会被当做一个个event放入到event queue中,然后不停的从event loop取出事件并执行:

dart_event_loop

他们的执行顺序如下:

dart_event_loop_sequeue

可以从下述例子详细看一下代码执行的时候各个方法执行过程:

dart_test_queue_code

dart_test_queue_output


本文根据Dart SDK源码分析一下event loop的实现。

代码参考:https://gist.github.com/jixiaoyong/ac811902db42a51cf97e3290788ade4a

1. 同步方法

同步方法包括普通的方法,以及一下几种会按照同步方法立即执行的方式:

1
2
3
4
5
6
7
8
9
10
11
12
13
Future.sync(() => print("Hello, I am future created by Future.sync"));
Future.forEach(
[1, 2, 3],
(element) =>
print("Hello, I am future($element) created by Future.forEach"));
Future.doWhile(() async {
if (repeatCounter++ < 3) {
print("repeat ($repeatCounter/3) inner Future.doWhile");
await Future.delayed(const Duration(seconds: 1));
return true;
}
return false;
});

2. micro task

microtask会在同步方法执行完毕之后立即被执行,一般用来执行“即抛型”的方法,不应当执行耗时方法。microtask列表会一直执行,直到event loop中没有micro task了,才会去执行Future等普通的event。

1
2
3
4
5
6
7
8
9
10
11
12
13
scheduleMicrotask(() {
print('Hello, world! I am a microtask.');
});

Future.microtask(
() => print("Hello, I am microtask created by Future.microtask"));

Future.value(1).then((value) {
print("Hello, I am future created by Future.value");
});

Future.error(Exception("Hello, I am future created by Future.error"))
.onError((error, stackTrace) => print(error));

Future.value([FutureOr<T>? value]) 比较特殊,如果value 是future,那么他会在value执行完毕后返回他的值,如果value不是future,他就会立即执行属于microtask

代码分析

让我们看一下上述方法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
// -> sdk\lib\async\future.dart

factory Future.microtask(FutureOr<T> computation()) {
_Future<T> result = new _Future<T>();
scheduleMicrotask(() {
try {
result._complete(computation());
} catch (e, s) {
_completeWithErrorCallback(result, e, s);
}
});
return result;
}

可以看出,Future.microtask本质还是调用scheduleMicrotask实现的,其实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// -> sdk\lib\async\schedule_microtask.dart

@pragma('vm:entry-point', 'call')
void scheduleMicrotask(void Function() callback) {
_Zone currentZone = Zone._current;
if (identical(_rootZone, currentZone)) {
// No need to bind the callback. We know that the root's scheduleMicrotask
// will be invoked in the root zone.
_rootScheduleMicrotask(null, null, _rootZone, callback);
return;
}
_ZoneFunction implementation = currentZone._scheduleMicrotask;
if (identical(_rootZone, implementation.zone) &&
_rootZone.inSameErrorZone(currentZone)) {
_rootScheduleMicrotask(
null, null, currentZone, currentZone.registerCallback(callback));
return;
}
Zone.current.scheduleMicrotask(Zone.current.bindCallbackGuarded(callback));
}

Zone.scheduleMicrotask()最后调用的是_RootZone的同名方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// -> sdk\lib\async\zone.dart

class _RootZone extends _Zone {
void scheduleMicrotask(void f()) {
_rootScheduleMicrotask(null, null, this, f);
}
}

void _rootScheduleMicrotask(
Zone? self, ZoneDelegate? parent, Zone zone, void f()) {
if (!identical(_rootZone, zone)) {
bool hasErrorHandler = !_rootZone.inSameErrorZone(zone);
if (hasErrorHandler) {
f = zone.bindCallbackGuarded(f);
} else {
f = zone.bindCallback(f);
}
}
_scheduleAsyncCallback(f);
}

_RootZone._scheduleAsyncCallback

这里调用了_RootZone._scheduleAsyncCallback方法,将传入的callback当做microtask执行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// -> sdk\lib\async\schedule_microtask.dart

/// Schedules a callback to be called as a microtask.
///
/// The microtask is called after all other currently scheduled
/// microtasks, but as part of the current system event.
void _scheduleAsyncCallback(_AsyncCallback callback) {
_AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback);
_AsyncCallbackEntry? lastCallback = _lastCallback;
if (lastCallback == null) {
_nextCallback = _lastCallback = newEntry;
if (!_isInCallbackLoop) {
_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
}
} else {
lastCallback.next = newEntry;
_lastCallback = newEntry;
}
}

_RootZone._startMicrotaskLoop

这里面的_startMicrotaskLoop方法是实际上处理microtask的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// -> sdk\lib\async\schedule_microtask.dart

/// Whether we are currently inside the callback loop.
///
/// If we are inside the loop, we never need to schedule the loop,
/// even if adding a first element.
bool _isInCallbackLoop = false;

void _microtaskLoop() {
for (var entry = _nextCallback; entry != null; entry = _nextCallback) {
_lastPriorityCallback = null;
var next = entry.next;
_nextCallback = next;
if (next == null) _lastCallback = null;
(entry.callback)();
}
}

void _startMicrotaskLoop() {
_isInCallbackLoop = true;
try {
// Moved to separate function because try-finally prevents
// good optimization.
_microtaskLoop();
} finally {
_lastPriorityCallback = null;
_isInCallbackLoop = false;
if (_nextCallback != null) {
_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
}
}
}

_AsyncRun._scheduleImmediate方法则是触发处理microtask的方法:

1
2
3
4
5
6
// -> sdk\lib\async\schedule_microtask.dart

class _AsyncRun {
/// Schedule the given callback before any other event in the event-loop.
external static void _scheduleImmediate(void Function() callback);
}

_AsyncRun._scheduleImmediate

_AsyncRun._scheduleImmediate方法的实现在schedule_microtask_patch.dart中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// -> sdk/lib/_internal/vm/lib/schedule_microtask_patch.dart

@patch
class _AsyncRun {
@patch
static void _scheduleImmediate(void callback()) {
final closure = _ScheduleImmediate._closure;
if (closure == null) {
throw new UnsupportedError("Microtasks are not supported");
}
closure(callback);
}
}

typedef void _ScheduleImmediateClosure(void callback());

class _ScheduleImmediate {
static _ScheduleImmediateClosure? _closure;
}

@pragma("vm:entry-point", "call")
void _setScheduleImmediateClosure(_ScheduleImmediateClosure closure) {
_ScheduleImmediate._closure = closure;
}

@pragma("vm:entry-point", "call")
void _ensureScheduleImmediate() {
_AsyncRun._scheduleImmediate(_startMicrotaskLoop);
}

_ScheduleImmediateClosure

可以看到,microtask实际上是使用_ScheduleImmediateClosure调用的,关于他主要有两个方法:

  1. _setScheduleImmediateClosure
  2. _ensureScheduleImmediate

让我们先看一下第一个方法_setScheduleImmediateClosure

1
2
3
4
5
6
7
8
9
10
11
12
13
// -> runtime\bin\dartutils.cc

// PrepareAsyncLibrary方法会在Dart虚拟机启动的时候被调用
Dart_Handle DartUtils::PrepareAsyncLibrary(Dart_Handle async_lib,
Dart_Handle isolate_lib) {
Dart_Handle schedule_immediate_closure = Dart_Invoke(
isolate_lib, NewString("_getIsolateScheduleImmediateClosure"), 0, NULL);
RETURN_IF_ERROR(schedule_immediate_closure);
Dart_Handle args[1];
args[0] = schedule_immediate_closure;
return Dart_Invoke(async_lib, NewString("_setScheduleImmediateClosure"), 1,
args);
}

这里主要调用了Dart中的_getIsolateScheduleImmediateClosure方法创建了schedule_immediate_closure,然后通过_setScheduleImmediateClosure返回:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
// -> sdk\lib\_internal\vm\lib\isolate_patch.dart

/// The embedder can execute this function to get hold of
/// [_isolateScheduleImmediate] above.
@pragma("vm:entry-point", "call")
Function _getIsolateScheduleImmediateClosure() {
return _isolateScheduleImmediate;
}

/// The closure that should be used as scheduleImmediateClosure, when the VM
/// is responsible for the event loop.
void _isolateScheduleImmediate(void callback()) {
assert((_pendingImmediateCallback == null) ||
(_pendingImmediateCallback == callback));
_pendingImmediateCallback = callback;
}

/// The callback that has been registered through `scheduleImmediate`.
_ImmediateCallback? _pendingImmediateCallback;

可以看到,这个方法会将传递进来的callback赋值给_pendingImmediateCallback

而结合上面的代码,_ensureScheduleImmediate 方法主要也是用来触发_ScheduleImmediateClosure执行回调事件。


到目前为止,我们能确定的是:

  • 在Dart VM启动的时候,会创建一个_ScheduleImmediateClosure并保存在_pendingImmediateCallback;
  • 当有新的microtask加入的时候,会触发_startMicrotaskLoop方法在_microtaskLoop()中实际处理一个microtask(这里的_startMicrotaskLoop触发的实际是通过_AsyncRun._scheduleImmediate(_startMicrotaskLoop)将其使用_pendingImmediateCallback 包裹之后执行的)。

_pendingImmediateCallback

现在的问题是,这个_pendingImmediateCallback 什么时候会被安排执行呢?

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// -> sdk\lib\_internal\vm\lib\isolate_patch.dart

@pragma("vm:entry-point", "call")
void _runPendingImmediateCallback() {
final callback = _pendingImmediateCallback;
if (callback != null) {
_pendingImmediateCallback = null;
callback();
}
}

@pragma("vm:entry-point")
class _RawReceivePortImpl implements RawReceivePort {

// Called from the VM to retrieve the handler and handle a message.
@pragma("vm:entry-point", "call")
static _handleMessage(int id, var message) {
final handler = _portMap[id]?['handler'];
if (handler == null) {
return null;
}
// TODO(floitsch): this relies on the fact that any exception aborts the
// VM. Once we have non-fatal global exceptions we need to catch errors
// so that we can run the immediate callbacks.
handler(message);
_runPendingImmediateCallback();
return handler;
}

}

注意这里的关键代码,在_handleMessage方法中,会先执行RawReceivePort原本的handler内容,然后,执行_runPendingImmediateCallback()

_runPendingImmediateCallback则会执行_pendingImmediateCallback的内容,也就是前面的_startMicrotaskLoop,处理event loop中的micro task。


这也就证明了我们之前说的“microtask会在同步方法之后立即执行,并在每次普通的event loop执行完毕之后,都会检查并执行event loop中的microtask,之后才继续执行普通event”。

在下面的分析中,我们也还可以看到,在event每次处理Timer事件之后,都会检查执行micro task。

3. event

除了下面列出来的使用Future或者Timer等创建的方法外,屏幕点击、刷新等事件也在此类event中。

当前event loop中没有micro task之后,就会执行一次此类普通event,然后再检查一次event loop,如果有micro task就执行micro task直到清空micro task,否则继续执行下一个普通event,直到event loop列表为空,退出app。

先看几种会触发此类事件的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
Future.delayed(const Duration(seconds: 1), () {
print("Hello, I am future");
});

Future.any([
Future(() {
return "I am Future run immediately Future.any";
}),
Future.delayed(const Duration(seconds: 1), () {
return "I am Future run delay, will be discard Future.any";
})
]).then((value) => print(value));

Future.wait([
Future(() {
return "I am Future run immediately Future.wait 1/2";
}),
Future.delayed(const Duration(seconds: 1), () {
return "I am Future run delay Future.wait 2/2";
})
]).then((value) => print(value));

以及Timer

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
Timer.periodic(const Duration(seconds: 1), (timer) {
print("Hello, I am running inner(${timer.tick}/2) Timer.periodic");
if (timer.tick == 2) {
timer.cancel();
}
});

Timer.run(() {
print(
"Hello, I will run asynchronously as soon as possible with Timer.run");
});

Timer(Duration(seconds: 1), () {
print("Hello, I will run asynchronously after 1 second with Timer");
});

代码分析

我们依次看一下上述几个方法的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// -> sdk\lib\async\future.dart

factory Future.delayed(Duration duration, [FutureOr<T> computation()?]) {
if (computation == null && !typeAcceptsNull<T>()) {
throw ArgumentError.value(
null, "computation", "The type parameter is not nullable");
}
_Future<T> result = new _Future<T>();
new Timer(duration, () {
if (computation == null) {
result._complete(null as T);
} else {
try {
result._complete(computation());
} catch (e, s) {
_completeWithErrorCallback(result, e, s);
}
}
});
return result;
}

static Future<T> any<T>(Iterable<Future<T>> futures) {
var completer = new Completer<T>.sync();
void onValue(T value) {
if (!completer.isCompleted) completer.complete(value);
}

void onError(Object error, StackTrace stack) {
if (!completer.isCompleted) completer.completeError(error, stack);
}

for (var future in futures) {
// 一旦有一个future执行完毕,就立即返回结果,并丢弃掉后续future的返回
future.then(onValue, onError: onError);
}
return completer.future;
}

@pragma("vm:recognized", "other")
static Future<List<T>> wait<T>(Iterable<Future<T>> futures,
{bool eagerError = false, void cleanUp(T successValue)?}) {
// This is a VM recognised method, and the _future variable is deliberately
// allocated in a specific slot in the closure context for stack unwinding.
final _Future<List<T>> _future = _Future<List<T>>();
List<T?>? values; // Collects the values. Set to null on error.
int remaining = 0; // How many futures are we waiting for.
late Object error; // The first error from a future.
late StackTrace stackTrace; // The stackTrace that came with the error.

// Handle an error from any of the futures.
void handleError(Object theError, StackTrace theStackTrace) {
remaining--;
List<T?>? valueList = values;
if (valueList != null) {
if (cleanUp != null) {
for (var value in valueList) {
if (value != null) {
// Ensure errors from cleanUp are uncaught.
T cleanUpValue = value;
new Future.sync(() {
cleanUp(cleanUpValue);
});
}
}
}
values = null;
if (remaining == 0 || eagerError) {
_future._completeError(theError, theStackTrace);
} else {
error = theError;
stackTrace = theStackTrace;
}
} else if (remaining == 0 && !eagerError) {
_future._completeError(error, stackTrace);
}
}

try {
// As each future completes, put its value into the corresponding
// position in the list of values.
for (var future in futures) {
int pos = remaining;
// 在这里依次执行future
future.then((T value) {
remaining--;
List<T?>? valueList = values;
if (valueList != null) {
valueList[pos] = value;
if (remaining == 0) {
_future._completeWithValue(List<T>.from(valueList));
}
} else {
if (cleanUp != null && value != null) {
// Ensure errors from cleanUp are uncaught.
new Future.sync(() {
cleanUp(value);
});
}
if (remaining == 0 && !eagerError) {
// If eagerError is false, and valueList is null, then
// error and stackTrace have been set in handleError above.
_future._completeError(error, stackTrace);
}
}
}, onError: handleError);
// Increment the 'remaining' after the call to 'then'.
// If that call throws, we don't expect any future callback from
// the future, and we also don't increment remaining.
remaining++;
}
if (remaining == 0) {
return _future.._completeWithValue(<T>[]);
}
values = new List<T?>.filled(remaining, null);
} catch (e, st) {
// The error must have been thrown while iterating over the futures
// list, or while installing a callback handler on the future.
// This is a breach of the `Future` protocol, but we try to handle it
// gracefully.
if (remaining == 0 || eagerError) {
// Throw a new Future.error.
// Don't just call `_future._completeError` since that would propagate
// the error too eagerly, not giving the callers time to install
// error handlers.
// Also, don't use `_asyncCompleteError` since that one doesn't give
// zones the chance to intercept the error.
return new Future.error(e, st);
} else {
// Don't allocate a list for values, thus indicating that there was an
// error.
// Set error to the caught exception.
error = e;
stackTrace = st;
}
}
return _future;
}

可以看到,除了Future.waitFuture.any 这两个处理Future集合的方法外,Future.delayed 这个方法内部是实际上是通过Timer实现**的。

Future.then

在看Timer实现之前,先看一下Futrue.then的实现,他对应的实现是_Future.then

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// -> sdk\lib\async\future_impl.dart

class _Future<T> implements Future<T> {
// Register callbacks to be called when this future completes.
//
// When this future completes with a value, the [onValue] callback will be called with that value.
// If this future is already completed, the callback will not be called immediately,
// but will be scheduled in a later microtask
Future<R> then<R>(FutureOr<R> f(T value), {Function? onError}) {
Zone currentZone = Zone.current;
if (identical(currentZone, _rootZone)) {
if (onError != null &&
onError is! Function(Object, StackTrace) &&
onError is! Function(Object)) {
throw ArgumentError.value(
onError,
"onError",
"Error handler must accept one Object or one Object and a StackTrace"
" as arguments, and return a value of the returned future's type");
}
} else {
f = currentZone.registerUnaryCallback<FutureOr<R>, T>(f);
if (onError != null) {
// This call also checks that onError is assignable to one of:
// dynamic Function(Object)
// dynamic Function(Object, StackTrace)
onError = _registerErrorHandler(onError, currentZone);
}
}
_Future<R> result = new _Future<R>();
_addListener(new _FutureListener<T, R>.then(result, f, onError));
// 返回创建好的Future
return result;
}

}

_Futrue.then只是对传入的回调的进行了包装,实际上是通过_Future._addListener()实现具体的逻辑:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// -> sdk\lib\async\future_impl.dart

bool get _mayComplete => (_state & _completionStateMask) == _stateIncomplete;
bool get _isPendingComplete => (_state & _statePendingComplete) != 0;
bool get _mayAddListener =>
_state <= (_statePendingComplete | _stateIgnoreError);

void _addListener(_FutureListener listener) {
assert(listener._nextListener == null);
// 如果是待完成的或者忽略错误的,将当前listener添加到链表头部;
// 在后文处理结果的时候,会从链表尾部开始读取
if (_mayAddListener) {
listener._nextListener = _resultOrListeners;
_resultOrListeners = listener;
} else {
if (_isChained) {
// Delegate listeners to chained source future.
// If the source is complete, instead copy its values and
// drop the chaining.
_Future source = _chainSource;
if (!source._isComplete) {
// 如果依赖于source,那么就添加为source的listener
source._addListener(listener);
return;
}
_cloneResult(source);
}
assert(_isComplete);
// Handle late listeners asynchronously.
_zone.scheduleMicrotask(() {
// Propagates the value/error of [source] to its [listeners]
_propagateToListeners(this, listener);
});
}
}

_Future._addListener(_FutureListener listener)中基本上做了如下判断:

  • 如果Future是延迟完成的,就添加监听。
  • 如果Future已经完成了,就加入到micro task中,安排执行listener回调(_propagateToListeners(this, listener))。

具体可以参考Flutter之Future原理解析


Timer

我们再看一下Timer的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
// -> sdk\lib\async\timer.dart

// Timer.run

static void run(void Function() callback) {
new Timer(Duration.zero, callback);
}

factory Timer(Duration duration, void Function() callback) {
if (Zone.current == Zone.root) {
// No need to bind the callback. We know that the root's timer will
// be invoked in the root zone.
return Zone.current.createTimer(duration, callback);
}
return Zone.current
.createTimer(duration, Zone.current.bindCallbackGuarded(callback));
}

factory Timer.periodic(Duration duration, void callback(Timer timer)) {
if (Zone.current == Zone.root) {
// No need to bind the callback. We know that the root's timer will
// be invoked in the root zone.
return Zone.current.createPeriodicTimer(duration, callback);
}
var boundCallback = Zone.current.bindUnaryCallbackGuarded<Timer>(callback);
return Zone.current.createPeriodicTimer(duration, boundCallback);
}

创建Timer

可以看到,Timer的创建实际上是Zone通过两种方式创建的:

1
2
3
4
5
6
7
8
9
10
// -> sdk\lib\async\zone.dart
abstract class Zone {

/// Creates a [Timer] where the callback is executed in this zone.
Timer createTimer(Duration duration, void Function() callback);

/// Creates a periodic [Timer] where the callback is executed in this zone.
Timer createPeriodicTimer(Duration period, void callback(Timer timer));

}

Zone是抽象类,他的实现是_RootZone

1
2
3
4
5
6
7
8
9
10
11
12
13
// -> sdk\lib\async\zone.dart

class _RootZone extends _Zone {

Timer createTimer(Duration duration, void f()) {
return Timer._createTimer(duration, f);
}

Timer createPeriodicTimer(Duration duration, void f(Timer timer)) {
return Timer._createPeriodicTimer(duration, f);
}

}

可以看到这里实际上是调用了Timer中对应的私有方法:

1
2
3
4
5
6
// -> sdk\lib\async\timer.dart

external static Timer _createTimer(
Duration duration, void Function() callback);
external static Timer _createPeriodicTimer(
Duration duration, void callback(Timer timer));

他们的具体实现在timer_patch.dart中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
// -> sdk\lib\_internal\vm\lib\timer_patch.dart

@patch
class Timer {
@patch
static Timer _createTimer(Duration duration, void callback()) {
final factory = VMLibraryHooks.timerFactory;
if (factory == null) {
throw new UnsupportedError("Timer interface not supported.");
}
int milliseconds = duration.inMilliseconds;
if (milliseconds < 0) milliseconds = 0;
return factory(milliseconds, (_) {
callback();
}, false);
}

@patch
static Timer _createPeriodicTimer(
Duration duration, void callback(Timer timer)) {
final factory = VMLibraryHooks.timerFactory;
if (factory == null) {
throw new UnsupportedError("Timer interface not supported.");
}
int milliseconds = duration.inMilliseconds;
if (milliseconds < 0) milliseconds = 0;
return factory(milliseconds, callback, true);
}
}

可以看到,无论是单次的还是循环的Timer都是使用VMLibraryHooks.timerFactory创建的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// -> sdk\lib\_internal\vm\lib\timer_impl.dart

@pragma("vm:entry-point", "call")
_setupHooks() {
VMLibraryHooks.timerFactory = _Timer._factory;
}

// The Timer factory registered with the dart:async library by the embedder.
static Timer _factory(
int milliSeconds, void callback(Timer timer), bool repeating) {
if (repeating) {
return new _Timer.periodic(milliSeconds, callback);
}
return new _Timer(milliSeconds, callback);
}

factory _Timer(int milliSeconds, void callback(Timer timer)) {
return _createTimer(callback, milliSeconds, false);
}

factory _Timer.periodic(int milliSeconds, void callback(Timer timer)) {
return _createTimer(callback, milliSeconds, true);
}

最终都是调用的_Timer._createTimer方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// -> sdk\lib\_internal\vm\lib\timer_impl.dart

static _Timer _createTimer(
void callback(Timer timer), int milliSeconds, bool repeating) {
// Negative timeouts are treated as if 0 timeout.
if (milliSeconds < 0) {
milliSeconds = 0;
}
// Add one because DateTime.now() is assumed to round down
// to nearest millisecond, not up, so that time + duration is before
// duration milliseconds from now. Using microsecond timers like
// Stopwatch allows detecting that the timer fires early.
int now = VMLibraryHooks.timerMillisecondClock();
int wakeupTime = (milliSeconds == 0) ? now : (now + 1 + milliSeconds);

_Timer timer =
new _Timer._internal(callback, wakeupTime, milliSeconds, repeating);
// Enqueue this newly created timer in the appropriate structure and
// notify if necessary.
timer._enqueue();
return timer;
}

在创建timer的时候,先获取了当前的时间戳,然后计算出timer的唤醒时间wakeupTime ,最后调用_Timer._internal创建timer。

_Timer._internal只是简单创建了Timer:

1
2
3
_Timer._internal(
this._callback, this._wakeupTime, this._milliSeconds, this._repeating)
: _id = _nextId();

在创建根据需要创建好Timer之后,使用_Timer._enqueue方法把Timer放入到相应的队列中。

timer._enqueue

主要看一下timer._enqueue()方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// -> sdk\lib\_internal\vm\lib\timer_impl.dart

// Timers are ordered by wakeup time. Timers with a timeout value of > 0 do
// end up on the TimerHeap. Timers with a timeout of 0 are queued in a list.
static final _heap = new _TimerHeap();
static _Timer? _firstZeroTimer;
static _Timer _lastZeroTimer = _sentinelTimer;

// Adds a timer to the heap or timer list. Timers with the same wakeup time
// are enqueued in order and notified in FIFO order.
void _enqueue() {
if (_milliSeconds == 0) {
if (_firstZeroTimer == null) {
_lastZeroTimer = this;
_firstZeroTimer = this;
} else {
_lastZeroTimer._indexOrNext = this;
_lastZeroTimer = this;
}
// Every zero timer gets its own event.
_notifyZeroHandler();
} else {
_heap.add(this);
if (_heap.isFirst(this)) {
_notifyEventHandler();
}
}
}

可以看到无论是单次还是循环的Timer最后都是使用_Timer._internal创建的,然后再使用_Timer._enqueue()方法将timer添加到heap或者timer list中:

  • 如果Timer的_milliSeconds为0,则会被添加到_lastZeroTimer中(并将上一个timer的_indexOrNext指向自己),并在_notifyZeroHandler()方法发送_ZERO_EVENT事件(最终会触发_Timer._handleMessage );
  • 否则则将其加入到_heap中,如果他是第一个timer,就通过_notifyEventHandler()启动处理Timer的event handler(这个线程会在合适的时间唤起Timer执行_Timer._handleMessage方法)。

在具体分析整个过程之前,我们先看一下几个属性的创建过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
// -> sdk\lib\_internal\vm\lib\timer_impl.dart

class _Timer implements Timer {

static _RawReceivePortImpl? _receivePort;
static SendPort? _sendPort;

// Tell the event handler to wake this isolate at a specific time.
static void _scheduleWakeup(int wakeupTime) {
if (!_receivePortActive) {
_createTimerHandler();
}
VMLibraryHooks.eventHandlerSendData(null, _sendPort!, wakeupTime);
_scheduledWakeupTime = wakeupTime;
}

// Enqueue one message for each zero timer. To be able to distinguish from
// EventHandler messages we send a _ZERO_EVENT instead of a _TIMEOUT_EVENT.
static void _notifyZeroHandler() {
if (!_receivePortActive) {
_createTimerHandler();
}
_sendPort!.send(_ZERO_EVENT);
}

// Create a receive port and register a message handler for the timer
// events.
static void _createTimerHandler() {
var receivePort = _receivePort;
if (receivePort == null) {
assert(_sendPort == null);
final port = _RawReceivePortImpl('Timer');
port.handler = _handleMessage;
_sendPort = port.sendPort;
_receivePort = port;
_scheduledWakeupTime = 0;
} else {
receivePort._setActive(true);
}
_receivePortActive = true;
}

}

从上面代码我们可以看到:

  • _sendPort_receivePort对应的sendPort,后者的handler是_handleMessage()方法
  • 无论是_notifyEventHandler()还是 _notifyZeroHandler()都会保证_createTimerHandler()调用过

_milliSeconds == 0

先看一下_milliSeconds为0的情况:

1
2
3
4
5
6
7
8
9
10
// -> sdk\lib\_internal\vm\lib\timer_impl.dart

// Enqueue one message for each zero timer. To be able to distinguish from
// EventHandler messages we send a _ZERO_EVENT instead of a _TIMEOUT_EVENT.
static void _notifyZeroHandler() {
if (!_receivePortActive) {
_createTimerHandler();
}
_sendPort!.send(_ZERO_EVENT);
}

按照上面的分析,_sendPort!.send(_ZERO_EVENT)发送的消息,通过MessageHandler::PostMessage处理,最后调用_receivePort的handler也就是在_handleMessage(msg)方法中执行。

_milliSeconds ≠ 0

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// -> sdk\lib\_internal\vm\lib\timer_impl.dart

static void _notifyEventHandler() {
if (_handlingCallbacks) {
// While we are already handling callbacks we will not notify the event
// handler. _handleTimeout will call _notifyEventHandler once all pending
// timers are processed.
return;
}

// If there are no pending timers. Close down the receive port.
if ((_firstZeroTimer == null) && _heap.isEmpty) {
// No pending timers: Close the receive port and let the event handler
// know.
if (_sendPort != null) {
_cancelWakeup();
_shutdownTimerHandler();
}
return;
} else if (_heap.isEmpty) {
// Only zero timers are left. Cancel any scheduled wakeups.
_cancelWakeup();
return;
}
// Only send a message if the requested wakeup time differs from the
// already scheduled wakeup time.
var wakeupTime = _heap.first._wakeupTime;
if ((_scheduledWakeupTime == 0) || (wakeupTime != _scheduledWakeupTime)) {
_scheduleWakeup(wakeupTime);
}
}

// Tell the event handler to wake this isolate at a specific time.
static void _scheduleWakeup(int wakeupTime) {
if (!_receivePortActive) {
_createTimerHandler();
}
VMLibraryHooks.eventHandlerSendData(null, _sendPort!, wakeupTime);
_scheduledWakeupTime = wakeupTime;
}

可见,当_milliSeconds ≠ 0的时候,会将其加入到_heap中,如果当前的timer是_heap中第一个,则调用_notifyEventHandler()告诉event handler在指定的时间唤起isolate

这里主要的实现是VMLibraryHooks.eventHandlerSendData,他的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// -> sdk\lib\_internal\vm\bin\common_patch.dart

@pragma("vm:entry-point", "call")
_setupHooks() {
VMLibraryHooks.eventHandlerSendData = _EventHandler._sendData;
VMLibraryHooks.timerMillisecondClock = _EventHandler._timerMillisecondClock;
}

// -> sdk\lib\_internal\vm\bin\eventhandler_patch.dart

@patch
class _EventHandler {
@patch
@pragma("vm:external-name", "EventHandler_SendData")
external static void _sendData(Object? sender, SendPort sendPort, int data);

@pragma("vm:external-name", "EventHandler_TimerMillisecondClock")
external static int _timerMillisecondClock();
}

// -> runtime\bin\eventhandler.cc

/*
* Send data to the EventHandler thread to register for a given instance
* args[0] a ReceivePort args[1] with a notification event args[2].
*/
void FUNCTION_NAME(EventHandler_SendData)(Dart_NativeArguments args) {
// Get the id out of the send port. If the handle is not a send port
// we will get an error and propagate that out.
Dart_Handle handle = Dart_GetNativeArgument(args, 1);
Dart_Port dart_port;
handle = Dart_SendPortGetId(handle, &dart_port);
if (Dart_IsError(handle)) {
Dart_PropagateError(handle);
UNREACHABLE();
}
Dart_Handle sender = Dart_GetNativeArgument(args, 0);
intptr_t id;
if (Dart_IsNull(sender)) {
id = kTimerId;
} else {
Socket* socket = Socket::GetSocketIdNativeField(sender);
ASSERT(dart_port != ILLEGAL_PORT);
socket->set_port(dart_port);
socket->Retain(); // inc refcount before sending to the eventhandler.
id = reinterpret_cast<intptr_t>(socket);
}
int64_t data = DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 2));
event_handler->SendData(id, dart_port, data);
}

他的实现在native层的EventHander中名为event handler的子线程中通过异步IO执行任务

1
2
3
4
5
6
7
8
9
10
// -> 

class EventHandler {
public:
EventHandler() {}
void SendData(intptr_t id, Dart_Port dart_port, int64_t data) {
delegate_.SendData(id, dart_port, data);
}

}

不同的系统实现不同,对于Android来说:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
// -> runtime\bin\eventhandler_android.cc

void EventHandlerImplementation::SendData(intptr_t id,
Dart_Port dart_port,
int64_t data) {
WakeupHandler(id, dart_port, data);
}

void EventHandlerImplementation::WakeupHandler(intptr_t id,
Dart_Port dart_port,
int64_t data) {
InterruptMessage msg;
msg.id = id;
msg.dart_port = dart_port;
msg.data = data;
// WriteToBlocking will write up to 512 bytes atomically, and since our msg
// is smaller than 512, we don't need a thread lock.
// See: http://linux.die.net/man/7/pipe, section 'Pipe_buf'.
ASSERT(kInterruptMessageSize < PIPE_BUF);
intptr_t result =
FDUtils::WriteToBlocking(interrupt_fds_[1], &msg, kInterruptMessageSize);
if (result != kInterruptMessageSize) {
if (result == -1) {
perror("Interrupt message failure:");
}
FATAL1("Interrupt message failure. Wrote %" Pd " bytes.", result);
}
}

然后系统会在时间到了之后,会调用EventHandlerImplementation::HandleEvents通过_send_port发送消息,并触发_receivePort的hander也就是_handleMessage方法处理消息。

_handleMessage

无论是_milliSeconds == 0 的时候_sendPort!.send(_ZERO_EVENT);,还是_milliSeconds != 0 通过EventHandler发送_TIMEOUT_EVENT消息,最终都会使用_handleMessage处理消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// -> sdk\lib\_internal\vm\lib\timer_impl.dart

static void _handleMessage(msg) {
List<_Timer> pendingTimers;
if (msg == _ZERO_EVENT) {
pendingTimers = _queueFromZeroEvent();
assert(pendingTimers.length > 0);
} else {
assert(msg == _TIMEOUT_EVENT);
_scheduledWakeupTime = 0; // Consumed the last scheduled wakeup now.
pendingTimers = _queueFromTimeoutEvent();
}
_runTimers(pendingTimers);
// Notify the event handler or shutdown the port if no more pending
// timers are present.
_notifyEventHandler();
}

_handleMessage中按照msg的类型取出对应的pendingTimers然后再_runTimers中执行,在执行完毕或者遇到错误时,调用_notifyEventHandler()通知event handler或者关闭TimerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
// -> sdk\lib\_internal\vm\lib\timer_impl.dart

static void _runTimers(List<_Timer> pendingTimers) {
// If there are no pending timers currently reset the id space before we
// have a chance to enqueue new timers.
if (_heap.isEmpty && (_firstZeroTimer == null)) {
_idCount = 0;
}

// Fast exit if no pending timers.
if (pendingTimers.length == 0) {
return;
}

// Trigger all of the pending timers. New timers added as part of the
// callbacks will be enqueued now and notified in the next spin at the
// earliest.
_handlingCallbacks = true;
var i = 0;
try {
// 在这里遍历处理所有的pendingTimers
for (; i < pendingTimers.length; i++) {
// Next pending timer.
var timer = pendingTimers[i];
timer._indexOrNext = null;

// One of the timers in the pending_timers list can cancel
// one of the later timers which will set the callback to
// null. Or the pending zero timer has been canceled earlier.
var callback = timer._callback;
if (callback != null) {
if (!timer._repeating) {
// Mark timer as inactive.
timer._callback = null;
} else if (timer._milliSeconds > 0) {
var ms = timer._milliSeconds;
int overdue =
VMLibraryHooks.timerMillisecondClock() - timer._wakeupTime;
if (overdue > ms) {
int missedTicks = overdue ~/ ms;
timer._wakeupTime += missedTicks * ms;
timer._tick += missedTicks;
}
}
timer._tick += 1;

callback(timer);
// Re-insert repeating timer if not canceled.
if (timer._repeating && (timer._callback != null)) {
timer._advanceWakeupTime();
timer._enqueue();
}
// 每次执行完event之后,都要执行没有被执行的micro task
// Execute pending micro tasks.
_runPendingImmediateCallback();
}
}
} finally {
_handlingCallbacks = false;
// Re-queue timers we didn't get to.
for (i++; i < pendingTimers.length; i++) {
var timer = pendingTimers[i];
timer._enqueue();
}
_notifyEventHandler();
}
}

这里可以看到,这里依次遍历传入的pendingTimers,并在每次执行完event后,去检查执行一下micro task。


根据创建Timer的时候_milliSeconds是否等于0:会分别使用MessageHandler执行或者在名为event handler的IO线程通过isolate中的MessageHandler来执行任务;最后都会触发Timer的_handleMessage方法在_runTimers方法中执行callback。

结论

综上,dart中的方法总共有3种,按照优先级从前到后依次是:

  1. 普通的同步方法
  2. micro task
  3. 其他event:部分Future、Timer、点击事件、屏幕刷新等

在方法执行的时候:

  1. 先执行完毕所有的同步方法;
  2. 然后判断是否有micro task,有的话就立即执行;
  3. 否则,就执行普通的event,每次执行完一个event就执行一次步骤2;
  4. 直到当前app中既没有micro task也没有普通的event,退出app。

参考资料

dart sdk

The Event Loop and Dart

Flutter之Future原理解析 - 掘金

Flutter之Timer原理解析 - 掘金

Dart 官网