All Dart code runs in an isolate, and code can access classes and values only from the same isolate. Different isolates can communicate by sending values through ports (see ReceivePort, SendPort).
Isolate(``SendPort controlPort``, {this.pauseCapability, this.terminateCapability}); 这种方式创建一种能力受限的Isolate。The capabilities should be the subset of the capabilities that are available to the original isolate.本质上并没有在native层孵化一个新的Isolate。
1 2 3 4 5 6 7 8 9
var newIsolate = Isolate(Isolate.current.controlPort); newIsolate.addOnExitListener(Isolate.current.controlPort); newIsolate.addErrorListener(Isolate.current.controlPort); Future.delayed(Duration(seconds: 1),(){ newIsolate.kill(); print("try kill new isolate"); // after this,the dart code finish });
print("finish");
Isolate.spawn(``void entryPoint(T message), T message,...) 创建一个和当前Isolate共享同一份代码的Isolate,并执行entryPoint方法,一般在message中传入SendPort以便从entryPoint中向来时的Isolate发送消息,新建的Isolate和当前Isolate在同一个IsolateGroup中。
1 2 3 4 5 6 7 8 9 10 11
void spawnIsolate() { var receivePort = ReceivePort(); receivePort.listen((message) { print("receivePort(${Isolate.current.debugName}) received msg: $message"); }); //创建一个和当前的isolate共享同一份代码的Isolate var isolate = Isolate.spawn((message) { print("Isolate initial function(${Isolate.current.debugName}) received msg: $message"); (message as SendPort).send("HELLO_FORM_ISOLATE(${Isolate.current.debugName})"); }, receivePort.sendPort,debugName: "another_isolate"); }
// Make sure to route this request to the isolate library OOB mesage handler. msg.SetAt(0, Smi::Handle(Smi::New(Message::kIsolateLibOOBMsg)));
// Ensure message writer (and it's resources, e.g. forwarding tables) are // cleaned up before handling interrupts. { PortMap::PostMessage(WriteMessage(/* can_send_any_object */false, /* same_group */false, msg, port.Id(), Message::kOOBPriority)); }
// Drain interrupts before running so any IMMEDIATE operations on the current // isolate happen synchronously. const Error& error = Error::Handle(thread->HandleInterrupts()); if (!error.IsNull()) { Exceptions::PropagateError(error); UNREACHABLE(); }
// Creates a new [Isolate] object with a restricted set of capabilities. Isolate(this.controlPort, {this.pauseCapability, this.terminateCapability});
/// Creates and spawns an isolate that shares the same code as the current /// isolate. externalstatic Future<Isolate> spawn<T>( void entryPoint(T message), T message, {bool paused = false, bool errorsAreFatal = true, SendPort? onExit, SendPort? onError, @Since("2.3") String? debugName});
/// Creates and spawns an isolate that runs the code from the library with /// the specified URI. /// /// The isolate starts executing the top-level `main` function of the library /// with the given URI. externalstatic Future<Isolate> spawnUri( Uri uri, List<String> args, var message, {bool paused = false, SendPort? onExit, SendPort? onError, bool errorsAreFatal = true, bool? checked, Map<String, String>? environment, @Deprecated('The packages/ dir is not supported in Dart 2') Uri? packageRoot, Uri? packageConfig, bool automaticPackageResolution = false, @Since("2.3") String? debugName});
@patch static Future<Isolate> spawn<T>(void entryPoint(T message), T message, {bool paused = false, bool errorsAreFatal = true, SendPort? onExit, SendPort? onError, String? debugName}) async { // `paused` isn't handled yet. // Check for the type of `entryPoint` on the spawning isolate to make // error-handling easier. if (entryPoint is! _UnaryFunction) { thrownew ArgumentError(entryPoint); } // The VM will invoke [_startIsolate] with entryPoint as argument.
// We do not inherit the package config settings from the parent isolate, // instead we use the values that were set on the command line. ...
void RunLightweight(const char* name) { // The create isolate initialize callback is mandatory. auto initialize_callback = **Isolate::InitializeCallback();** if (initialize_callback == nullptr) { FailedSpawn( "Lightweight isolate spawn is not supported by this Dart embedder\n", /*has_current_isolate=*/false); return; }
char* error = nullptr;
auto group = state_->isolate_group(); **Isolate* isolate = CreateWithinExistingIsolateGroup(group, name, &error);** parent_isolate_->DecrementSpawnCount(); parent_isolate_ = nullptr;
if (isolate == nullptr) { FailedSpawn(error, /*has_current_isolate=*/false); free(error); return; }
Thread::ExitIsolate(); // Unregister the VM isolate from this thread. **Isolate::SetCreateGroupCallback(params->create_group);** **Isolate::SetInitializeCallback_(params->initialize_isolate);** Isolate::SetShutdownCallback(params->shutdown_isolate); Isolate::SetCleanupCallback(params->cleanup_isolate); Isolate::SetGroupCleanupCallback(params->cleanup_group); Isolate::SetRegisterKernelBlobCallback(params->register_kernel_blob); Isolate::SetUnregisterKernelBlobCallback(params->unregister_kernel_blob); ...
if (isolate_run_app_snapshot) { result = Loader::InitForSnapshot(script_uri, isolate_data); if (Dart_IsError(result)) goto failed; } else { result = DartUtils::ResolveScript(Dart_NewStringFromCString(script_uri)); if (Dart_IsError(result)) goto failed;
if (isolate_group_data->kernel_buffer() != nullptr) { // Various core-library parts will send requests to the Loader to resolve // relative URIs and perform other related tasks. We need Loader to be // initialized for this to work because loading from Kernel binary // bypasses normal source code loading paths that initialize it. constchar* resolved_script_uri = NULL; result = Dart_StringToCString(result, &resolved_script_uri); if (Dart_IsError(result)) goto failed; result = Loader::InitForSnapshot(resolved_script_uri, isolate_data); if (Dart_IsError(result)) goto failed; } }
auto source = group->source(); **Isolate* I = Dart::CreateIsolate(name, source->flags, group);** if (I == NULL) { if (error != NULL) { *error = Utils::StrDup("Isolate creation failed"); } return reinterpret_cast<Dart_Isolate>(NULL); }
Thread* T = Thread::Current(); bool success = false; { StackZone zone(T); // We enter an API scope here as InitializeIsolate could compile some // bootstrap library files which call out to a tag handler that may create // Api Handles when an error is encountered. T->EnterApiScope(); const Error& error_obj = Error::Handle( Z, **Dart::InitializeIsolate( source->snapshot_data, source->snapshot_instructions, source->kernel_buffer, source->kernel_buffer_size, is_new_group ? nullptr : group, isolate_data)**); if (error_obj.IsNull()) { #if defined(DEBUG) && !defined(DART_PRECOMPILED_RUNTIME) if (FLAG_check_function_fingerprints && !FLAG_precompiled_mode) { Library::CheckFunctionFingerprints(); } #endif // defined(DEBUG) && !defined(DART_PRECOMPILED_RUNTIME). success = true; } elseif (error != NULL) { *error = Utils::StrDup(error_obj.ToErrorCString()); } // We exit the API scope entered above. T->ExitApiScope(); }
if (success) { if (is_new_group) { group->heap()->InitGrowthControl(); } // A Thread structure has been associated to the thread, we do the // safepoint transition explicitly here instead of using the // TransitionXXX scope objects as the reverse transition happens // outside this scope in Dart_ShutdownIsolate/Dart_ExitIsolate. T->set_execution_state(Thread::kThreadInNative); T->EnterSafepoint(); if (error != NULL) { *error = NULL; } return Api::CastIsolate(I); }
Isolate* Isolate::InitIsolate(const char* name_prefix, IsolateGroup* isolate_group, const Dart_IsolateFlags& api_flags, bool is_vm_isolate) { // 创建新的Isolate **Isolate* result = new Isolate(isolate_group, api_flags);** result->BuildName(name_prefix); if (!is_vm_isolate) { // vm isolate object store is initialized later, after null instance // is created (in Dart::Init). // Non-vm isolates need to have isolate object store initialized is that // exit_listeners have to be null-initialized as they will be used if // we fail to create isolate below, have to do low level shutdown. ASSERT(result->group()->object_store() != nullptr); result->isolate_object_store()->Init(); }
// First we ensure we enter the isolate. This will ensure we're participating // in any safepointing requests from this point on. Other threads requesting a // safepoint operation will therefore wait until we've stopped. // // Though the [result] isolate is still in a state where no memory has been // allocated, which means it's safe to GC the isolate group until here. // 创建一个Thread并和当前isolate绑定 if (!**Thread::EnterIsolate(result)**) { delete result; return nullptr; }
// Setup the isolate message handler. MessageHandler* handler = new IsolateMessageHandler(result); ASSERT(handler != nullptr); // 在这里绑定了message handler **result->set_message_handler(handler);**
**result->set_main_port(PortMap::CreatePort(result->message_handler()));** #if defined(DEBUG) // Verify that we are never reusing a live origin id. VerifyOriginId id_verifier(result->main_port()); Isolate::VisitIsolates(&id_verifier); #endif result->set_origin_id(result->main_port()); result->set_pause_capability(result->random()->NextUInt64()); result->set_terminate_capability(result->random()->NextUInt64());
#if !defined(PRODUCT) result->debugger_ = new Debugger(result); #endif
// Now we register the isolate in the group. From this point on any GC would // traverse the isolate roots (before this point, the roots are only pointing // to vm-isolate objects, e.g. null) **isolate_group->RegisterIsolate(result);**
// Add to isolate list. Shutdown and delete the isolate on failure. if (!TryMarkIsolateReady(result)) { result->LowLevelShutdown(); Isolate::LowLevelCleanup(result); return nullptr; }
ErrorPtr Dart::InitializeIsolate(const uint8_t* snapshot_data, const uint8_t* snapshot_instructions, const uint8_t* kernel_buffer, intptr_t kernel_buffer_size, IsolateGroup* source_isolate_group, void* isolate_data) { // Initialize the new isolate. Thread* T = Thread::Current(); Isolate* I = T->isolate(); auto IG = T->isolate_group(); #if defined(SUPPORT_TIMELINE) TimelineBeginEndScope tbes(T, Timeline::GetIsolateStream(), "InitializeIsolate"); tbes.SetNumArguments(1); tbes.CopyArgument(0, "isolateName", I->name()); #endif ASSERT(I != NULL); StackZone zone(T); HandleScope handle_scope(T); bool was_child_cloned_into_existing_isolate = false; if (source_isolate_group != nullptr) { // If a static field gets registered in [IsolateGroup::RegisterStaticField]: // // * before this block it will ignore this isolate. The [Clone] of the // initial field table will pick up the new value. // * after this block it will add the new static field to this isolate. { SafepointReadRwLocker reader(T, source_isolate_group->program_lock()); **I->set_field_table**(T, source_isolate_group->initial_field_table()->Clone(I)); **I->field_table()->MarkReadyToUse();** }
// If we were passed a value then override the default flags state for // checked mode. if (!checked.IsNull()) { Dart_IsolateFlags* flags = state->isolate_flags(); flags->enable_asserts = checked.value(); }
// Since this is a call to Isolate.spawnUri, don't copy the parent's code. state->isolate_flags()->copy_parent_code = false;
auto group = state_->isolate_group(); if (group == nullptr) { **RunHeavyweight(name);** } else { RunLightweight(name); } }
void RunHeavyweight(const char* name) { // The create isolate group callback is mandatory. If not provided we // cannot spawn isolates. // 在Dart::DartInit中已经被设置,在Isolate创建时会被回调 **auto create_group_callback = Isolate::CreateGroupCallback();** if (create_group_callback == nullptr) { FailedSpawn("Isolate spawn is not supported by this Dart embedder\n"); return; }
char* error = nullptr;
// Make a copy of the state's isolate flags and hand it to the callback. Dart_IsolateFlags api_flags = *(state_->isolate_flags()); api_flags.is_system_isolate = false; // 创建isolate **Dart_Isolate isolate = (create_group_callback)(state_->script_url(), name, nullptr, state_->package_config(), &api_flags, parent_isolate_->init_callback_data(), &error);** parent_isolate_->DecrementSpawnCount(); parent_isolate_ = nullptr;
static Dart_Isolate CreateIsolateGroupAndSetup(const char* script_uri, const char* main, const char* package_root, const char* package_config, Dart_IsolateFlags* flags, void* callback_data, char** error) { // The VM should never call the isolate helper with a NULL flags. ASSERT(flags != NULL); ASSERT(flags->version == DART_FLAGS_CURRENT_VERSION); ASSERT(package_root == nullptr);
bool dontneed_safe = true; #if defined(DART_HOST_OS_LINUX) // This would also be true in Linux, except that Google3 overrides the default // ELF interpreter to one that apparently doesn't create proper mappings. dontneed_safe = false; #elif defined(DEBUG) // If the snapshot isn't file-backed, madvise(DONT_NEED) is destructive. if (Options::force_load_elf_from_memory()) { dontneed_safe = false; } #endif flags->snapshot_is_dontneed_safe = dontneed_safe;
// Returns newly created Isolate on success, NULL on failure. static Dart_Isolate CreateIsolateGroupAndSetupHelper( bool is_main_isolate, constchar* script_uri, constchar* name, constchar* packages_config, Dart_IsolateFlags* flags, void* callback_data, char** error, int* exit_code){
... // 根据是AOT还是JIT获取kernel_buffer,app_snapshot,isolate_run_app_snapshot等数据 #if defined(DART_PRECOMPILED_RUNTIME){ // AOT: All isolates need to be run from AOT compiled snapshots. } #else{ // JIT: Main isolate starts from the app snapshot, if any. Other isolates // use the core libraries snapshot. }
// 创建isolate_group_data auto isolate_group_data = newIsolateGroupData( script_uri, packages_config, app_snapshot, isolate_run_app_snapshot); // copy_parent_code为true的话,这里的kernel_buffer为NULL if (kernel_buffer != NULL) { if (kernel_buffer_ptr) { isolate_group_data->SetKernelBufferAlreadyOwned( std::move(kernel_buffer_ptr), kernel_buffer_size); } else { isolate_group_data->SetKernelBufferNewlyOwned(kernel_buffer, kernel_buffer_size); } }
Dart_Isolate isolate = NULL;
IsolateData* isolate_data = nullptr;
#if !defined(DART_PRECOMPILED_RUNTIME) if (!isolate_run_app_snapshot && (isolate_snapshot_data == NULL)) { constuint8_t* platform_kernel_buffer = NULL; intptr_t platform_kernel_buffer_size = 0; dfe.LoadPlatform(&platform_kernel_buffer, &platform_kernel_buffer_size); if (platform_kernel_buffer == NULL) { platform_kernel_buffer = kernel_buffer; platform_kernel_buffer_size = kernel_buffer_size; } if (platform_kernel_buffer == NULL) { #if defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM) FATAL( "Binary built with --exclude-kernel-service. Cannot run" " from source."); #else FATAL("platform_program cannot be NULL."); #endif// defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM) } // TODO(sivachandra): When the platform program is unavailable, check if // application kernel binary is self contained or an incremental binary. // Isolate should be created only if it is a self contained kernel binary. isolate_data = newIsolateData(isolate_group_data); isolate = Dart_CreateIsolateGroupFromKernel( script_uri, name, platform_kernel_buffer, platform_kernel_buffer_size, flags, isolate_group_data, isolate_data, error); } else { isolate_data = newIsolateData(isolate_group_data); // Creates a new isolate. The new isolate becomes the current isolate. isolate = Dart_CreateIsolateGroup(script_uri, name, isolate_snapshot_data, isolate_snapshot_instructions, flags, isolate_group_data, isolate_data, error); } #else **isolate_data = new IsolateData**(isolate_group_data); // Creates a new isolate. The new isolate becomes the current isolate. **isolate = Dart_CreateIsolateGroup**(script_uri, name, isolate_snapshot_data, isolate_snapshot_instructions, flags, **isolate_group_data**, **isolate_data**, error); #endif// !defined(DART_PRECOMPILED_RUNTIME)
voidRun(Isolate* child){ if (!EnsureIsRunnable(child)) { Dart_ShutdownIsolate(); return; }
state_->set_isolate(child); if (state_->origin_id() != ILLEGAL_PORT) { // origin_id is set to parent isolate main port id when spawning via // spawnFunction. child->set_origin_id(state_->origin_id()); }
bool success = true; { auto thread = Thread::Current(); // TransitionNativeToVM is used to transition the safepoint state of a // thread from "running native code" to "running vm code" and ensures // that the state is reverted back to "running native code" when // exiting the scope/frame. TransitionNativeToVM transition(thread); // Create an empty zone and set is at the current zone for the Thread. StackZone zone(thread); // The class HandleScope is used to start a new handles scope in the // code. HandleScope hs(thread);
if (!success) { state_ = nullptr; Dart_ShutdownIsolate(); return; }
// All preconditions are met for this to always succeed. char* error = nullptr; // Lets the VM run message processing for the isolate. if (!**Dart_RunLoopAsync**(state_->errors_are_fatal(), state_->on_error_port(), state_->on_exit_port(), &error)) { FATAL("Dart_RunLoopAsync() failed: %s. Please file a Dart VM bug report.", error); } }
boolEnqueueEntrypointInvocationAndNotifySpawner(Thread* thread){ auto isolate = thread->isolate(); auto zone = thread->zone(); constbool is_spawn_uri = state_->is_spawn_uri();
// Step 1) Resolve the entrypoint function. // 查找isolate开始运行的第一个方法,比如Isolate.spawn的spawn或者Isolate.spawnUri的main方法 auto& entrypoint_closure = Closure::Handle(zone); if (state_->closure_tuple_handle() != nullptr) { constauto& result = Object::Handle( zone, ReadObjectGraphCopyMessage(thread, state_->closure_tuple_handle())); if (result.IsError()) { ReportError( "Failed to deserialize the passed entrypoint to the new isolate."); returnfalse; } entrypoint_closure = Closure::RawCast(result.ptr()); } else { constauto& result = Object::Handle(zone, state_->ResolveFunction()); if (result.IsError()) { ASSERT(is_spawn_uri); ReportError("Failed to resolve entrypoint function."); returnfalse; } ASSERT(result.IsFunction()); auto& func = Function::Handle(zone, Function::Cast(result).ptr()); func = func.ImplicitClosureFunction(); entrypoint_closure = func.ImplicitStaticClosure(); }
// Step 2) Enqueue delayed invocation of entrypoint callback. constauto& args_obj = Object::Handle(zone, state_->BuildArgs(thread)); if (args_obj.IsError()) { ReportError( "Failed to deserialize the passed arguments to the new isolate."); returnfalse; } ASSERT(args_obj.IsNull() || args_obj.IsInstance()); constauto& message_obj = Object::Handle(zone, state_->BuildMessage(thread)); if (message_obj.IsError()) { ReportError( "Failed to deserialize the passed arguments to the new isolate."); returnfalse; } ASSERT(message_obj.IsNull() || message_obj.IsInstance()); // 解析参数,分别是isolate初始运行方法,参数args、messgae、是否spawn_uri const Array& args = Array::Handle(zone, Array::New(4)); args.SetAt(0, entrypoint_closure); args.SetAt(1, args_obj); args.SetAt(2, message_obj); args.SetAt(3, is_spawn_uri ? Bool::True() : Bool::False());
// Runs this message handler on the thread pool. // // Before processing messages, the optional StartFunction is run. // // A message handler will run until it terminates either normally or // abnormally. Normal termination occurs when the message handler // no longer has any live ports. Abnormal termination occurs when // HandleMessage() indicates that an error has occurred during // message processing.
// If we have maxed out the number of threads running, we will not start a // new one. if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) { if (!idle_workers_.IsEmpty()) { ml->Notify(); } returnnullptr; }
// Otherwise start a new worker. auto new_worker = newWorker(this); idle_workers_.Append(new_worker); count_idle_++; return new_worker; }
while (true) { MonitorLocker ml(&pool_monitor_); // worker会从task_取出一个task并运行 if (!tasks_.IsEmpty()) { IdleToRunningLocked(worker); while (!tasks_.IsEmpty()) { std::unique_ptr<Task> task(tasks_.RemoveFirst()); pending_tasks_--; MonitorLeaveScope mls(&ml); **task->Run();** ASSERT(Isolate::Current() == nullptr); task.reset(); } RunningToIdleLocked(worker); }
if (running_workers_.IsEmpty()) { ASSERT(tasks_.IsEmpty()); OnEnterIdleLocked(&ml); if (!tasks_.IsEmpty()) { continue; } }
if (shutting_down_) { ObtainDeadWorkersLocked(&dead_workers_to_join); IdleToDeadLocked(worker); break; }
// Sleep until we get a new task, we time out or we're shutdown. constint64_t idle_start = OS::GetCurrentMonotonicMicros(); bool done = false; while (!done) { constauto result = ml.WaitMicros(ComputeTimeout(idle_start));
// We have to drain all pending tasks. if (!tasks_.IsEmpty()) break;
if (shutting_down_ || result == Monitor::kTimedOut) { done = true; break; } } if (done) { ObtainDeadWorkersLocked(&dead_workers_to_join); IdleToDeadLocked(worker); break; } }
// Before we transitioned to dead we obtained the list of previously died dead // workers, which we join here. Since every death of a worker will join // previously died workers, we keep the pending non-joined [dead_workers_] to // effectively 1. JoinDeadWorkersLocked(&dead_workers_to_join); }
voidMessageHandler::TaskCallback(){ ASSERT(Isolate::Current() == NULL); MessageStatus status = kOK; bool run_end_callback = false; bool delete_me = false; EndCallback end_callback = NULL; CallbackData callback_data = 0; { // We will occasionally release and reacquire this monitor in this // function. Whenever we reacquire the monitor we *must* process // all pending OOB messages, or we may miss a request for vm // shutdown. MonitorLocker ml(&monitor_);
// This method is running on the message handler task. Which means no // other message handler tasks will be started until this one sets // [task_running_] to false. ASSERT(task_running_);
#if !defined(PRODUCT) if (ShouldPauseOnStart(kOK)) { if (!is_paused_on_start()) { PausedOnStartLocked(&ml, true); } // More messages may have come in before we (re)acquired the monitor. status = HandleMessages(&ml, false, false); if (ShouldPauseOnStart(status)) { // Still paused. ASSERT(oob_queue_->IsEmpty()); task_running_ = false; // No task in queue. return; } else { PausedOnStartLocked(&ml, false); } } if (is_paused_on_exit()) { status = HandleMessages(&ml, false, false); if (ShouldPauseOnExit(status)) { // Still paused. ASSERT(oob_queue_->IsEmpty()); task_running_ = false; // No task in queue. return; } else { PausedOnExitLocked(&ml, false); } } #endif// !defined(PRODUCT)
if (status == kOK) { if (start_callback_ != nullptr) { // Initialize the message handler by running its start function, // if we have one. For an isolate, this will run the isolate's // main() function. // // Release the monitor_ temporarily while we call the start callback. ml.Exit(); status = start_callback_(callback_data_); ASSERT(Isolate::Current() == NULL); start_callback_ = NULL; ml.Enter(); }
// Handle any pending messages for this message handler. if (status != kShutdown) { status = HandleMessages(&ml, (status == kOK), true); } }
// The isolate exits when it encounters an error or when it no // longer has live ports. if (status != kOK || !HasLivePorts()) { #if !defined(PRODUCT) if (ShouldPauseOnExit(status)) { if (FLAG_trace_service_pause_events) { OS::PrintErr( "Isolate %s paused before exiting. " "Use the Observatory to release it.\n", name()); } PausedOnExitLocked(&ml, true); // More messages may have come in while we released the monitor. **status = HandleMessages(&ml, false, false);** if (ShouldPauseOnExit(status)) { // Still paused. ASSERT(oob_queue_->IsEmpty()); task_running_ = false; // No task in queue. return; } else { PausedOnExitLocked(&ml, false); } } #endif// !defined(PRODUCT) if (FLAG_trace_isolates) { if (status != kOK && thread() != NULL) { const Error& error = Error::Handle(thread()->sticky_error()); OS::PrintErr( "[-] Stopping message handler (%s):\n" "\thandler: %s\n" "\terror: %s\n", MessageStatusString(status), name(), error.ToCString()); } else { OS::PrintErr( "[-] Stopping message handler (%s):\n" "\thandler: %s\n", MessageStatusString(status), name()); } } pool_ = NULL; // Decide if we have a callback before releasing the monitor. end_callback = end_callback_; callback_data = callback_data_; run_end_callback = end_callback_ != NULL; delete_me = delete_me_; }
// Clear task_running_ last. This allows other tasks to potentially start // for this message handler. ASSERT(oob_queue_->IsEmpty()); task_running_ = false; }
// The handler may have been deleted by another thread here if it is a native // message handler.
// Message handlers either use delete_me or end_callback but not both. ASSERT(!delete_me || !run_end_callback);
if (run_end_callback) { ASSERT(end_callback != NULL); end_callback(callback_data); // The handler may have been deleted after this point. } if (delete_me) { deletethis; } }
// Scheduling of the mutator thread during the isolate start can cause this // thread to safepoint. // We want to avoid holding the message handler monitor during the safepoint // operation to avoid possible deadlocks, which can occur if other threads are // sending messages to this message handler. // // If isolate() returns nullptr [StartIsolateScope] does nothing. ml->Exit(); StartIsolateScopestart_isolate(isolate()); ml->Enter();
auto idle_time_handler = isolate() != nullptr ? isolate()->group()->idle_time_handler() : nullptr;
// Release the monitor_ temporarily while we handle the message. // The monitor was acquired in MessageHandler::TaskCallback(). ml->Exit(); Message::Priority saved_priority = message->priority(); Dart_Port saved_dest_port = message->dest_port(); MessageStatus status = kOK; { DisableIdleTimerScopedisable_idle_timer(idle_time_handler); status = HandleMessage(std::move(message)); } if (status > max_status) { max_status = status; } ml->Enter(); if (FLAG_trace_isolates) { OS::PrintErr( "[.] Message handled (%s):\n" "\tlen: %"Pd "\n" "\thandler: %s\n" "\tport: %"Pd64"\n", MessageStatusString(status), message_len, name(), saved_dest_port); } // If we are shutting down, do not process any more messages. if (status == kShutdown) { ClearOOBQueue(); break; }
// Remember time since the last message. Don't consider OOB messages so // using Observatory doesn't trigger additional idle tasks. if ((FLAG_idle_timeout_micros != 0) && (saved_priority == Message::kNormalPriority)) { if (idle_time_handler != nullptr) { idle_time_handler->UpdateStartIdleTime(); } }
// Some callers want to process only one normal message and then quit. At // the same time it is OK to process multiple OOB messages. if ((saved_priority == Message::kNormalPriority) && !allow_multiple_normal_messages) { // We processed one normal message. Allow no more. allow_normal_messages = false; }
// Reevaluate the minimum allowable priority. The paused state // may have changed as part of handling the message. We may also // have encountered an error during message processing. // // Even if we encounter an error, we still process pending OOB // messages so that we don't lose the message notification. min_priority = (((max_status == kOK) && allow_normal_messages && !paused()) ? Message::kNormalPriority : Message::kOOBPriority); message = DequeueMessage(min_priority); } return max_status; }
classMessage { // A message processed at any interrupt point (stack overflow check) instead // of at the top of the message loop. Control messages from dart:isolate or // vm-service requests. bool IsOOB() const { return priority_ == Message::kOOBPriority; }