Future.sync(() => print("Hello, I am future created by Future.sync")); Future.forEach( [1, 2, 3], (element) => print("Hello, I am future($element) created by Future.forEach")); Future.doWhile(() async { if (repeatCounter++ < 3) { print("repeat ($repeatCounter/3) inner Future.doWhile"); await Future.delayed(constDuration(seconds: 1)); returntrue; } returnfalse; });
@pragma('vm:entry-point', 'call') void scheduleMicrotask(voidFunction() callback) { _Zone currentZone = Zone._current; if (identical(_rootZone, currentZone)) { // No need to bind the callback. We know that the root's scheduleMicrotask // will be invoked in the root zone. _rootScheduleMicrotask(null, null, _rootZone, callback); return; } _ZoneFunction implementation = currentZone._scheduleMicrotask; if (identical(_rootZone, implementation.zone) && _rootZone.inSameErrorZone(currentZone)) { _rootScheduleMicrotask( null, null, currentZone, currentZone.registerCallback(callback)); return; } Zone.current.scheduleMicrotask(Zone.current.bindCallbackGuarded(callback)); }
/// Schedules a callback to be called as a microtask. /// /// The microtask is called after all other currently scheduled /// microtasks, but as part of the current system event. void _scheduleAsyncCallback(_AsyncCallback callback) { _AsyncCallbackEntry newEntry = new _AsyncCallbackEntry(callback); _AsyncCallbackEntry? lastCallback = _lastCallback; if (lastCallback == null) { _nextCallback = _lastCallback = newEntry; if (!_isInCallbackLoop) { _AsyncRun._scheduleImmediate(_startMicrotaskLoop); } } else { lastCallback.next = newEntry; _lastCallback = newEntry; } }
/// Whether we are currently inside the callback loop. /// /// If we are inside the loop, we never need to schedule the loop, /// even if adding a first element. bool _isInCallbackLoop = false;
void _microtaskLoop() { for (var entry = _nextCallback; entry != null; entry = _nextCallback) { _lastPriorityCallback = null; var next = entry.next; _nextCallback = next; if (next == null) _lastCallback = null; (entry.callback)(); } }
void _startMicrotaskLoop() { _isInCallbackLoop = true; try { // Moved to separate function because try-finally prevents // good optimization. _microtaskLoop(); } finally { _lastPriorityCallback = null; _isInCallbackLoop = false; if (_nextCallback != null) { _AsyncRun._scheduleImmediate(_startMicrotaskLoop); } } }
class_AsyncRun{ /// Schedule the given callback before any other event in the event-loop. externalstaticvoid _scheduleImmediate(voidFunction() callback); }
/// The embedder can execute this function to get hold of /// [_isolateScheduleImmediate] above. @pragma("vm:entry-point", "call") Function _getIsolateScheduleImmediateClosure() { return _isolateScheduleImmediate; }
/// The closure that should be used as scheduleImmediateClosure, when the VM /// is responsible for the event loop. void _isolateScheduleImmediate(void callback()) { assert((_pendingImmediateCallback == null) || (_pendingImmediateCallback == callback)); _pendingImmediateCallback = callback; }
/// The callback that has been registered through `scheduleImmediate`. _ImmediateCallback? _pendingImmediateCallback;
// Called from the VM to retrieve the handler and handle a message. @pragma("vm:entry-point", "call") static _handleMessage(int id, var message) { final handler = _portMap[id]?['handler']; if (handler == null) { returnnull; } // TODO(floitsch): this relies on the fact that any exception aborts the // VM. Once we have non-fatal global exceptions we need to catch errors // so that we can run the immediate callbacks. handler(message); _runPendingImmediateCallback(); return handler; }
Future.delayed(constDuration(seconds: 1), () { print("Hello, I am future"); });
Future.any([ Future(() { return"I am Future run immediately Future.any"; }), Future.delayed(constDuration(seconds: 1), () { return"I am Future run delay, will be discard Future.any"; }) ]).then((value) => print(value));
Future.wait([ Future(() { return"I am Future run immediately Future.wait 1/2"; }), Future.delayed(constDuration(seconds: 1), () { return"I am Future run delay Future.wait 2/2"; }) ]).then((value) => print(value));
以及Timer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
Timer.periodic(constDuration(seconds: 1), (timer) { print("Hello, I am running inner(${timer.tick}/2) Timer.periodic"); if (timer.tick == 2) { timer.cancel(); } });
Timer.run(() { print( "Hello, I will run asynchronously as soon as possible with Timer.run"); });
Timer(Duration(seconds: 1), () { print("Hello, I will run asynchronously after 1 second with Timer"); });
for (var future in futures) { // 一旦有一个future执行完毕,就立即返回结果,并丢弃掉后续future的返回 future.then(onValue, onError: onError); } return completer.future; }
@pragma("vm:recognized", "other") static Future<List<T>> wait<T>(Iterable<Future<T>> futures, {bool eagerError = false, void cleanUp(T successValue)?}) { // This is a VM recognised method, and the _future variable is deliberately // allocated in a specific slot in the closure context for stack unwinding. final _Future<List<T>> _future = _Future<List<T>>(); List<T?>? values; // Collects the values. Set to null on error. int remaining = 0; // How many futures are we waiting for. lateObject error; // The first error from a future. late StackTrace stackTrace; // The stackTrace that came with the error.
// Handle an error from any of the futures. void handleError(Object theError, StackTrace theStackTrace) { remaining--; List<T?>? valueList = values; if (valueList != null) { if (cleanUp != null) { for (var value in valueList) { if (value != null) { // Ensure errors from cleanUp are uncaught. T cleanUpValue = value; new Future.sync(() { cleanUp(cleanUpValue); }); } } } values = null; if (remaining == 0 || eagerError) { _future._completeError(theError, theStackTrace); } else { error = theError; stackTrace = theStackTrace; } } elseif (remaining == 0 && !eagerError) { _future._completeError(error, stackTrace); } }
try { // As each future completes, put its value into the corresponding // position in the list of values. for (var future in futures) { int pos = remaining; // 在这里依次执行future future.then((T value) { remaining--; List<T?>? valueList = values; if (valueList != null) { valueList[pos] = value; if (remaining == 0) { _future._completeWithValue(List<T>.from(valueList)); } } else { if (cleanUp != null && value != null) { // Ensure errors from cleanUp are uncaught. new Future.sync(() { cleanUp(value); }); } if (remaining == 0 && !eagerError) { // If eagerError is false, and valueList is null, then // error and stackTrace have been set in handleError above. _future._completeError(error, stackTrace); } } }, onError: handleError); // Increment the 'remaining' after the call to 'then'. // If that call throws, we don't expect any future callback from // the future, and we also don't increment remaining. remaining++; } if (remaining == 0) { return _future.._completeWithValue(<T>[]); } values = newList<T?>.filled(remaining, null); } catch (e, st) { // The error must have been thrown while iterating over the futures // list, or while installing a callback handler on the future. // This is a breach of the `Future` protocol, but we try to handle it // gracefully. if (remaining == 0 || eagerError) { // Throw a new Future.error. // Don't just call `_future._completeError` since that would propagate // the error too eagerly, not giving the callers time to install // error handlers. // Also, don't use `_asyncCompleteError` since that one doesn't give // zones the chance to intercept the error. returnnew Future.error(e, st); } else { // Don't allocate a list for values, thus indicating that there was an // error. // Set error to the caught exception. error = e; stackTrace = st; } } return _future; }
class_Future<T> implementsFuture<T> { // Register callbacks to be called when this future completes. // // When this future completes with a value, the [onValue] callback will be called with that value. // If this future is already completed, the callback will not be called immediately, // but will be scheduled in a later microtask Future<R> then<R>(FutureOr<R> f(T value), {Function? onError}) { Zone currentZone = Zone.current; if (identical(currentZone, _rootZone)) { if (onError != null && onError is! Function(Object, StackTrace) && onError is! Function(Object)) { throw ArgumentError.value( onError, "onError", "Error handler must accept one Object or one Object and a StackTrace" " as arguments, and return a value of the returned future's type"); } } else { f = currentZone.registerUnaryCallback<FutureOr<R>, T>(f); if (onError != null) { // This call also checks that onError is assignable to one of: // dynamic Function(Object) // dynamic Function(Object, StackTrace) onError = _registerErrorHandler(onError, currentZone); } } _Future<R> result = new _Future<R>(); _addListener(new _FutureListener<T, R>.then(result, f, onError)); // 返回创建好的Future return result; }
staticvoid run(voidFunction() callback) { new Timer(Duration.zero, callback); }
factory Timer(Duration duration, voidFunction() callback) { if (Zone.current == Zone.root) { // No need to bind the callback. We know that the root's timer will // be invoked in the root zone. return Zone.current.createTimer(duration, callback); } return Zone.current .createTimer(duration, Zone.current.bindCallbackGuarded(callback)); }
factory Timer.periodic(Duration duration, void callback(Timer timer)) { if (Zone.current == Zone.root) { // No need to bind the callback. We know that the root's timer will // be invoked in the root zone. return Zone.current.createPeriodicTimer(duration, callback); } var boundCallback = Zone.current.bindUnaryCallbackGuarded<Timer>(callback); return Zone.current.createPeriodicTimer(duration, boundCallback); }
创建Timer
可以看到,Timer的创建实际上是Zone通过两种方式创建的:
1 2 3 4 5 6 7 8 9 10
// -> sdk\lib\async\zone.dart abstractclassZone{
/// Creates a [Timer] where the callback is executed in this zone. Timer createTimer(Duration duration, voidFunction() callback);
/// Creates a periodic [Timer] where the callback is executed in this zone. Timer createPeriodicTimer(Duration period, void callback(Timer timer));
// The Timer factory registered with the dart:async library by the embedder. static Timer _factory( int milliSeconds, void callback(Timer timer), bool repeating) { if (repeating) { returnnew _Timer.periodic(milliSeconds, callback); } returnnew _Timer(milliSeconds, callback); }
static _Timer _createTimer( void callback(Timer timer), int milliSeconds, bool repeating) { // Negative timeouts are treated as if 0 timeout. if (milliSeconds < 0) { milliSeconds = 0; } // Add one because DateTime.now() is assumed to round down // to nearest millisecond, not up, so that time + duration is before // duration milliseconds from now. Using microsecond timers like // Stopwatch allows detecting that the timer fires early. int now = VMLibraryHooks.timerMillisecondClock(); int wakeupTime = (milliSeconds == 0) ? now : (now + 1 + milliSeconds);
_Timer timer = new _Timer._internal(callback, wakeupTime, milliSeconds, repeating); // Enqueue this newly created timer in the appropriate structure and // notify if necessary. timer._enqueue(); return timer; }
// Timers are ordered by wakeup time. Timers with a timeout value of > 0 do // end up on the TimerHeap. Timers with a timeout of 0 are queued in a list. staticfinal _heap = new _TimerHeap(); static _Timer? _firstZeroTimer; static _Timer _lastZeroTimer = _sentinelTimer;
// Adds a timer to the heap or timer list. Timers with the same wakeup time // are enqueued in order and notified in FIFO order. void _enqueue() { if (_milliSeconds == 0) { if (_firstZeroTimer == null) { _lastZeroTimer = this; _firstZeroTimer = this; } else { _lastZeroTimer._indexOrNext = this; _lastZeroTimer = this; } // Every zero timer gets its own event. _notifyZeroHandler(); } else { _heap.add(this); if (_heap.isFirst(this)) { _notifyEventHandler(); } } }
// Tell the event handler to wake this isolate at a specific time. staticvoid _scheduleWakeup(int wakeupTime) { if (!_receivePortActive) { _createTimerHandler(); } VMLibraryHooks.eventHandlerSendData(null, _sendPort!, wakeupTime); _scheduledWakeupTime = wakeupTime; }
// Enqueue one message for each zero timer. To be able to distinguish from // EventHandler messages we send a _ZERO_EVENT instead of a _TIMEOUT_EVENT. staticvoid _notifyZeroHandler() { if (!_receivePortActive) { _createTimerHandler(); } _sendPort!.send(_ZERO_EVENT); }
// Create a receive port and register a message handler for the timer // events. staticvoid _createTimerHandler() { var receivePort = _receivePort; if (receivePort == null) { assert(_sendPort == null); final port = _RawReceivePortImpl('Timer'); port.handler = _handleMessage; _sendPort = port.sendPort; _receivePort = port; _scheduledWakeupTime = 0; } else { receivePort._setActive(true); } _receivePortActive = true; }
// Enqueue one message for each zero timer. To be able to distinguish from // EventHandler messages we send a _ZERO_EVENT instead of a _TIMEOUT_EVENT. staticvoid _notifyZeroHandler() { if (!_receivePortActive) { _createTimerHandler(); } _sendPort!.send(_ZERO_EVENT); }
staticvoid _notifyEventHandler() { if (_handlingCallbacks) { // While we are already handling callbacks we will not notify the event // handler. _handleTimeout will call _notifyEventHandler once all pending // timers are processed. return; }
// If there are no pending timers. Close down the receive port. if ((_firstZeroTimer == null) && _heap.isEmpty) { // No pending timers: Close the receive port and let the event handler // know. if (_sendPort != null) { _cancelWakeup(); _shutdownTimerHandler(); } return; } elseif (_heap.isEmpty) { // Only zero timers are left. Cancel any scheduled wakeups. _cancelWakeup(); return; } // Only send a message if the requested wakeup time differs from the // already scheduled wakeup time. var wakeupTime = _heap.first._wakeupTime; if ((_scheduledWakeupTime == 0) || (wakeupTime != _scheduledWakeupTime)) { _scheduleWakeup(wakeupTime); } }
// Tell the event handler to wake this isolate at a specific time. staticvoid _scheduleWakeup(int wakeupTime) { if (!_receivePortActive) { _createTimerHandler(); } VMLibraryHooks.eventHandlerSendData(null, _sendPort!, wakeupTime); _scheduledWakeupTime = wakeupTime; }
/* * Send data to the EventHandler thread to register for a given instance * args[0] a ReceivePort args[1] with a notification event args[2]. */ void FUNCTION_NAME(EventHandler_SendData)(Dart_NativeArguments args) { // Get the id out of the send port. If the handle is not a send port // we will get an error and propagate that out. Dart_Handle handle = Dart_GetNativeArgument(args, 1); Dart_Port dart_port; handle = Dart_SendPortGetId(handle, &dart_port); if (Dart_IsError(handle)) { Dart_PropagateError(handle); UNREACHABLE(); } Dart_Handle sender = Dart_GetNativeArgument(args, 0); intptr_t id; if (Dart_IsNull(sender)) { id = kTimerId; } else { Socket* socket = Socket::GetSocketIdNativeField(sender); ASSERT(dart_port != ILLEGAL_PORT); socket->set_port(dart_port); socket->Retain(); // inc refcount before sending to the eventhandler. id = reinterpret_cast<intptr_t>(socket); } int64_t data = DartUtils::GetIntegerValue(Dart_GetNativeArgument(args, 2)); event_handler->SendData(id, dart_port, data); }
staticvoid _runTimers(List<_Timer> pendingTimers) { // If there are no pending timers currently reset the id space before we // have a chance to enqueue new timers. if (_heap.isEmpty && (_firstZeroTimer == null)) { _idCount = 0; }
// Fast exit if no pending timers. if (pendingTimers.length == 0) { return; }
// Trigger all of the pending timers. New timers added as part of the // callbacks will be enqueued now and notified in the next spin at the // earliest. _handlingCallbacks = true; var i = 0; try { // 在这里遍历处理所有的pendingTimers for (; i < pendingTimers.length; i++) { // Next pending timer. var timer = pendingTimers[i]; timer._indexOrNext = null;
// One of the timers in the pending_timers list can cancel // one of the later timers which will set the callback to // null. Or the pending zero timer has been canceled earlier. var callback = timer._callback; if (callback != null) { if (!timer._repeating) { // Mark timer as inactive. timer._callback = null; } elseif (timer._milliSeconds > 0) { var ms = timer._milliSeconds; int overdue = VMLibraryHooks.timerMillisecondClock() - timer._wakeupTime; if (overdue > ms) { int missedTicks = overdue ~/ ms; timer._wakeupTime += missedTicks * ms; timer._tick += missedTicks; } } timer._tick += 1;
callback(timer); // Re-insert repeating timer if not canceled. if (timer._repeating && (timer._callback != null)) { timer._advanceWakeupTime(); timer._enqueue(); } // 每次执行完event之后,都要执行没有被执行的micro task // Execute pending micro tasks. _runPendingImmediateCallback(); } } } finally { _handlingCallbacks = false; // Re-queue timers we didn't get to. for (i++; i < pendingTimers.length; i++) { var timer = pendingTimers[i]; timer._enqueue(); } _notifyEventHandler(); } }