Dart Isolate源码分析

Isolate

💡 本文基于Dart 2.17.1

Isolate, an isolated Dart execution context.

All Dart code runs in an isolate, and code can access classes and values only from the same isolate. Different isolates can communicate by sending values through ports (see ReceivePort, SendPort).

In Dart an isolate has its own event loop, its own global fields, can run in parallel with other isolates and have their own live-cycle.
https://github.com/dart-lang/sdk/issues/36097#issuecomment-746510375

The new isolate has its own memory and its own thread working in parallel with the main isolate.

[https://www.youtube.com/watch?v=NoVYI94MJio&ab_channel=Flutterly](https://www.youtube.com/watch?v=NoVYI94MJio&ab_channel=Flutterly)

https://www.youtube.com/watch?v=NoVYI94MJio&ab_channel=Flutterly

Isolate创建会占用内存,可以使用IsolateGroup来解决,并且目前为止Dart和Flutter都默认支持在使用Isolate.spawn创建新Isolate的时候使用IsolateGroup(Isolate.spwanUri创建的时候会创建单独的IsolateGroup和Isolate)。

💡 在创建isolate的时候可以添加addOnExitListener 或者addErrorListener之类的监听,但是可能在执行添加代码的时候isolate就已经终止了而导致这些方法收不到回调。
为了避免这种情况,可以在创建isolate的时候指定他的状态为**paused**。

与isolate有关的类有:

  • Isolate 位置在sdk\lib\isolate\isolate.dart。主要是Isolate 通用方法、属性的抽象描述,没有具体实现。
  • Isolate 位置在sdk\lib\_internal\vm\lib\isolate_patch.dart,是app等平台对应的具体实现,部分方法调用了native层的Isolate实现。
  • Isolate 位置在runtime\vm\isolate.h以及runtime\vm\isolate.cc中,是Isolate的native层实现。

他们的关系大致如图:

Untitled

简单使用

创建新Isolate的方式:

  • Isolate(``SendPort controlPort``, {this.pauseCapability, this.terminateCapability}); 这种方式创建一种能力受限的Isolate。The capabilities should be the subset of the capabilities that are available to the original isolate.本质上并没有在native层孵化一个新的Isolate。
1
2
3
4
5
6
7
8
9
var newIsolate = Isolate(Isolate.current.controlPort);
newIsolate.addOnExitListener(Isolate.current.controlPort);
newIsolate.addErrorListener(Isolate.current.controlPort);
Future.delayed(Duration(seconds: 1),(){
newIsolate.kill();
print("try kill new isolate"); // after this,the dart code finish
});

print("finish");
  • Isolate.spawn(``void entryPoint(T message), T message,...) 创建一个和当前Isolate共享同一份代码的Isolate,并执行entryPoint方法,一般在message中传入SendPort以便从entryPoint中向来时的Isolate发送消息,新建的Isolate和当前Isolate在同一个IsolateGroup中。
1
2
3
4
5
6
7
8
9
10
11
void spawnIsolate() {
var receivePort = ReceivePort();
receivePort.listen((message) {
print("receivePort(${Isolate.current.debugName}) received msg: $message");
});
//创建一个和当前的isolate共享同一份代码的Isolate
var isolate = Isolate.spawn((message) {
print("Isolate initial function(${Isolate.current.debugName}) received msg: $message");
(message as SendPort).send("HELLO_FORM_ISOLATE(${Isolate.current.debugName})");
}, receivePort.sendPort,debugName: "another_isolate");
}
  • Isolate.spawnUri(Uri uri,List<String> args,var message,...)* 指定的uri中创建并孵化一个isolate,执行uri对应的library中的main方法(0~2个入参),并传入无参、args或message作为参数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
var receivePort = ReceivePort();
receivePort.listen((message) {
print("receivePort(${Isolate.current.debugName}) received msg: $message");
});
// 创建一个和当前的isolate共享同一份代码的Isolate
var isolate = await Isolate.spawnUri(
Uri.file(
r"E:\workspace\others\flutter_dart_source_code_analysis\lib\dart\another_dart_file_to_spawn_uri.dart"),
[],
receivePort.sendPort,
debugName: "another_isolate");
Future.delayed(const Duration(seconds: 2), () {
receivePort.close();
isolate.kill(priority: Isolate.immediate);
print("try kill new isolate");
});

使用方法

pause

Capability pause([Capability? resumeCapability]),暂停Isolate,停止从*event loop queue* 中取(并处理)消息,但是依然可以往里面加入消息

resumeCapability 是用来区分pause的,必须使用同一个*resumeCapability*来resume isolate。

  • 使用同一个*resumeCapability*多次pause,只需一次resume就可以恢复isolate
  • 使用不同*resumeCapability多次pause,必须使用对应的resumeCapability依次resume才可以恢复isolate注意:这里也只需要使用当时pause isolate的resumeCapability* 依次调用resume即可,而不用保持次数一致,比如,有2个*resumeCapability ,*调用pause次数分别为a 1,b 2,那么要想resume isolate,也只需要分别使用a,b调用一次resume即可)

ping

使用isolate往receivePort.sendPort发送response消息,即使isolate当前被pause也可以正常发送

1
2
isolate.pause();
isolate.ping(receivePort.sendPort, response: "is isolate resume?");//receivePort依然可以收到消息

ping可以正常发送的原因是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// -> lib\isolate\isolate.dart

// ping方法是一个external方法
external void ping(SendPort responsePort,
{Object? response, int priority = immediate});

// -> sdk/lib/_internal/vm/lib/isolate_patch.dart

@patch
void ping(SendPort responsePort,
{Object? response, int priority: immediate}) {
var msg = new List<Object?>.filled(5, null)
..[0] = 0 // Make room for OOM message type.
..[1] = _PING
..[2] = responsePort
..[3] = priority
..[4] = response;
_sendOOB(controlPort, msg);
}

@pragma("vm:external-name", "Isolate_sendOOB")
external static void _sendOOB(port, msg);

// -> runtime/lib/isolate.cc

// 创建了一个oob消息并压入oob_queue_
DEFINE_NATIVE_ENTRY(Isolate_sendOOB, 0, 2) {
GET_NON_NULL_NATIVE_ARGUMENT(SendPort, port, arguments->NativeArgAt(0));
GET_NON_NULL_NATIVE_ARGUMENT(Array, msg, arguments->NativeArgAt(1));

// Make sure to route this request to the isolate library OOB mesage handler.
msg.SetAt(0, Smi::Handle(Smi::New(Message::kIsolateLibOOBMsg)));

// Ensure message writer (and it's resources, e.g. forwarding tables) are
// cleaned up before handling interrupts.
{
PortMap::PostMessage(WriteMessage(/* can_send_any_object */ false,
/* same_group */ false, msg, port.Id(),
Message::kOOBPriority));
}

// Drain interrupts before running so any IMMEDIATE operations on the current
// isolate happen synchronously.
const Error& error = Error::Handle(thread->HandleInterrupts());
if (!error.IsNull()) {
Exceptions::PropagateError(error);
UNREACHABLE();
}

return Object::null();
}

在MessageHandler中有两种MessageQueue:oob_queue_queue_ ,前者优先级高,即使isolate被pause也会执行

1
2
3
4
5
6
7
// -> runtime\vm\message_handler.h

// 普通消息,暂停时不能处理
MessageQueue* queue_;
// 优先消息,即使处理消息时,优先处理obb_queue消息,如果为空再去考虑处理普通消息
// 即使isolate被pause也可以被处理
MessageQueue* oob_queue_;

像是ping/kill/pause/addOnExitListener/removeOnExitListener这些指令消息都是压入到obb_queue_中优先处理的。

源码分析

先看一下常用的几个方法是怎么实现的。

获取当前Isolate

sdk/lib/isolate/isolate.dartIsolate.current

(sdk/lib/_internal/vm/lib/isolate_patch.dart) Isolate get currentIsolate._getCurrentIsolate()_getPortAndCapabilitiesOfCurrentIsolate()

runtime/lib/isolate.ccDEFINE_NATIVE_ENTRY(Isolate_getPortAndCapabilitiesOfCurrentIsolate, 0, 0)

先看一下sdk/lib/_internal/vm/lib/isolate_patch.dart中的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// -> sdk/lib/_internal/vm/lib/isolate_patch.dart

static final _currentIsolate = _getCurrentIsolate();
@patch
static Isolate get current => _currentIsolate;

static Isolate _getCurrentIsolate() {
List portAndCapabilities = _getPortAndCapabilitiesOfCurrentIsolate();
// 这里的参数分别是SendPort,Capability,Capability
return new Isolate(portAndCapabilities[0],
pauseCapability: portAndCapabilities[1],
terminateCapability: portAndCapabilities[2]);
}

@pragma("vm:external-name", "Isolate_getPortAndCapabilitiesOfCurrentIsolate")
external static List _getPortAndCapabilitiesOfCurrentIsolate();

可以看到,最后是根据native端返回的信息,新建了一个Isolate引用,但是因为_currentIsolatestatic final的,所以只会被调用一次,确保了在Dart SDK中调用Isolate.current 时获取的是当前唯一的Isolate。

让我们看一下在native中是如何找到当前的Isolate的:

1
2
3
4
5
6
7
8
9
10
11
// -> \runtime\lib\isolate.cc

DEFINE_NATIVE_ENTRY(Isolate_getPortAndCapabilitiesOfCurrentIsolate, 0, 0) {
const Array& result = Array::Handle(Array::New(3));
result.SetAt(0, SendPort::Handle(SendPort::New(isolate->main_port())));
result.SetAt(
1, Capability::Handle(Capability::New(isolate->pause_capability())));
result.SetAt(
2, Capability::Handle(Capability::New(isolate->terminate_capability())));
return result.ptr();
}

可见是直接取的当前线程对应的isolate对应的值,经过包装再返回到调用方。

创建Isolate

在Dart中创建Isolate有3种方式:

  • Isolate(this.controlPort, {this.pauseCapability, this.terminateCapability}); create an isolate,本质上只是将controlPort 等设置为传入的对象,并没有在native层新建Isolate
  • Isolate.spawn create and spawns an isolate
  • Isolate.spawnUri create and spawns an isolate

这里分析一下后面两种方式,对比一下差异:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
// -> sdk\lib\isolate\isolate.dart

// Creates a new [Isolate] object with a restricted set of capabilities.
Isolate(this.controlPort, {this.pauseCapability, this.terminateCapability});

/// Creates and spawns an isolate that shares the same code as the current
/// isolate.
external static Future<Isolate> spawn<T>(
void entryPoint(T message), T message,
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? onError,
@Since("2.3") String? debugName});

/// Creates and spawns an isolate that runs the code from the library with
/// the specified URI.
///
/// The isolate starts executing the top-level `main` function of the library
/// with the given URI.
external static Future<Isolate> spawnUri(
Uri uri,
List<String> args,
var message,
{bool paused = false,
SendPort? onExit,
SendPort? onError,
bool errorsAreFatal = true,
bool? checked,
Map<String, String>? environment,
@Deprecated('The packages/ dir is not supported in Dart 2')
Uri? packageRoot,
Uri? packageConfig,
bool automaticPackageResolution = false,
@Since("2.3")
String? debugName});

对于APP等来说,上述Isolate.spawnIsolate.spawnUri的实现都在vm下面的isolate_patch.dart中(js会返回_unsupported()):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
// -> sdk\lib\_internal\vm\lib\isolate_patch.dart

@patch
static Future<Isolate> spawn<T>(void entryPoint(T message), T message,
{bool paused = false,
bool errorsAreFatal = true,
SendPort? onExit,
SendPort? onError,
String? debugName}) async {
// `paused` isn't handled yet.
// Check for the type of `entryPoint` on the spawning isolate to make
// error-handling easier.
if (entryPoint is! _UnaryFunction) {
throw new ArgumentError(entryPoint);
}
// The VM will invoke [_startIsolate] with entryPoint as argument.

// We do not inherit the package config settings from the parent isolate,
// instead we use the values that were set on the command line.
...

final RawReceivePort readyPort =
new RawReceivePort(null, 'Isolate.spawn ready');
try {
**_spawnFunction**(readyPort.sendPort, script.toString(), entryPoint, message,
paused, errorsAreFatal, onExit, onError, packageConfig, debugName);
return await **_spawnCommon**(readyPort);
} catch (e, st) {
readyPort.close();
return await new Future<Isolate>.error(e, st);
}
}

@patch
static Future<Isolate> spawnUri(Uri uri, List<String> args, var message,
{bool paused = false,
SendPort? onExit,
SendPort? onError,
bool errorsAreFatal = true,
bool? checked,
Map<String, String>? environment,
Uri? packageRoot,
Uri? packageConfig,
bool automaticPackageResolution = false,
String? debugName}) async {

// Verify that no mutually exclusive arguments have been passed.
...
// Resolve the uri against the current isolate's root Uri first.
...

// The VM will invoke [_startIsolate] and not `main`.
final packageConfigString = packageConfig?.toString();

final RawReceivePort readyPort =
new RawReceivePort(null, 'Isolate.spawnUri ready');
try {
**_spawnUri**(
readyPort.sendPort,
spawnedUri.toString(),
args,
message,
paused,
onExit,
onError,
errorsAreFatal,
checked,
null,
/* environment */
packageConfigString,
debugName);
return await **_spawnCommon**(readyPort);
} catch (e) {
readyPort.close();
rethrow;
}
}

// Isolate.spawn call
@pragma("vm:external-name", "Isolate_spawnFunction")
external static void _spawnFunction(
SendPort readyPort,
String uri,
Function topLevelFunction,
var message,
bool paused,
bool errorsAreFatal,
SendPort? onExit,
SendPort? onError,
String? packageConfig,
String? debugName);

// Isolate.spawnUri call
@pragma("vm:external-name", "Isolate_spawnUri")
external static void _spawnUri(
SendPort readyPort,
String uri,
List<String> args,
var message,
bool paused,
SendPort? onExit,
SendPort? onError,
bool errorsAreFatal,
bool? checked,
List? environment,
String? packageConfig,
String? debugName);

// 监听Isolate spawn状态,等成功之后将其处理后返回给Dart层的调用者
static Future<Isolate> _spawnCommon(RawReceivePort readyPort) {
final completer = new Completer<Isolate>.sync();
readyPort.handler = (readyMessage) {
readyPort.close();
if (readyMessage is List && readyMessage.length == 2) {
SendPort controlPort = readyMessage[0];
List capabilities = readyMessage[1];
**completer.complete(new Isolate(controlPort,
pauseCapability: capabilities[0],
terminateCapability: capabilities[1]));**
} else if (readyMessage is String) {
// We encountered an error while starting the new isolate.
completer.completeError(new IsolateSpawnException(
'Unable to spawn isolate: ${readyMessage}'));
} else {
// This shouldn't happen.
completer.completeError(new IsolateSpawnException(
"Internal error: unexpected format for ready message: "
"'${readyMessage}'"));
}
};
return completer.future;
}

其实,根据上述的代码,不管是Isolate.spawnUri() 还是Isolate.spawn,都是先调用RawReceivePort获取RawReceivePort readyPort,最后都是调用了_spawnCommon(readyPort) 方法,最终通过new Isolate(controlPort, pauseCapability: capabilities[0], terminateCapability: capabilities[1])方法创建了新的Isolate

这个方法的定义在sdk/lib/isolate/isolate.dart中:

1
2
3
4
// -> sdk/lib/isolate/isolate.dart
final SendPort controlPort;

Isolate(this.controlPort, {this.pauseCapability, this.terminateCapability});

可以看到,在Dart中,我们拿到的Isolate主要是持有一个和native中对应SendPort

通过上面的分析:

  • Isolate.spawn最后调用了_spawnFunction方法(native层实现为Isolate_spawnFunction);
  • Isolate.spawnUri最后调用了_spawnUri方法(native层实现为Isolate_spawnUri)。

💡 new RawReceivePort()方法主要是创建一个不存在于_RawReceivePortImplstatic final _portMap = <int, Map<String, dynamic>>{}; 中的SendPort(具体实现在PortMap::CreatePort中)。

Isolate_spawnFunction

Isolate.spawn最后调用了_spawnFunction方法,来看一下对应的Isolate_spawnFunction 的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
// -> runtime\lib\isolate.cc

DEFINE_NATIVE_ENTRY(Isolate_spawnFunction, 0, 10) {
// 解析参数
...

// closure_tuple_handle对应我们在Dart中Isolate.spawn()中传入的entryPoint
// 也就是isolate创建好以后执行的方法

std::unique_ptr<IsolateSpawnState> state(new IsolateSpawnState(
port.Id(), isolate->origin_id(), String2UTF8(script_uri),
closure_tuple_handle, &message_buffer, utf8_package_config,
paused.value(), fatal_errors, on_exit_port, on_error_port,
utf8_debug_name, **isolate->group()**));

// Since this is a call to Isolate.spawn, copy the parent isolate's code.
state->isolate_flags()->copy_parent_code = true;

**isolate->group()**->thread_pool()->Run<SpawnIsolateTask>(isolate,
std::move(state));
return Object::null();
}

可见,Isolate_spawnFunction方法中主要还是解析收到的各种参数,最后在当前isolate对应的IsolateGroup的线程池中执行SpawnIsolateTask

SpawnIsolateTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
// -> runtime\lib\isolate.cc

class SpawnIsolateTask : public ThreadPool::Task {
SpawnIsolateTask(Isolate* parent_isolate,
std::unique_ptr<IsolateSpawnState> state)
: parent_isolate_(parent_isolate), state_(std::move(state)) {
parent_isolate->IncrementSpawnCount();
}

void Run() override {
const char* name = (state_->debug_name() == nullptr)
? state_->function_name()
: state_->debug_name();
ASSERT(name != nullptr);

auto group = state_->isolate_group();
if (group == nullptr) {
RunHeavyweight(name);
} else {
RunLightweight(name);
}
}
}

RunLightWeight

因为这里我们的isolate→group不为空,所以走的是RunLightWeight:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
// -> runtime\lib\isolate.cc

void RunLightweight(const char* name) {
// The create isolate initialize callback is mandatory.
auto initialize_callback = **Isolate::InitializeCallback();**
if (initialize_callback == nullptr) {
FailedSpawn(
"Lightweight isolate spawn is not supported by this Dart embedder\n",
/*has_current_isolate=*/false);
return;
}

char* error = nullptr;

auto group = state_->isolate_group();
**Isolate* isolate = CreateWithinExistingIsolateGroup(group, name, &error);**
parent_isolate_->DecrementSpawnCount();
parent_isolate_ = nullptr;

if (isolate == nullptr) {
FailedSpawn(error, /*has_current_isolate=*/false);
free(error);
return;
}

void* child_isolate_data = nullptr;
**const bool success = initialize_callback(&child_isolate_data, &error);**
if (!success) {
FailedSpawn(error);
Dart_ShutdownIsolate();
free(error);
return;
}

**isolate->set_init_callback_data(child_isolate_data);

// 注意这里的Run方法,在**RunHeavyweight方法的最后也调用了
// 到时候会一起分析一下
**Run(isolate);**
}

// -> runtime\vm\dart_api_impl.cc

Isolate* CreateWithinExistingIsolateGroup(IsolateGroup* group,
const char* name,
char** error) {
API_TIMELINE_DURATION(Thread::Current());
CHECK_NO_ISOLATE(Isolate::Current());

auto spawning_group = group;

**Isolate* isolate =** reinterpret_cast<Isolate*>(
**CreateIsolate**(spawning_group, /*is_new_group=*/false, name,
/*isolate_data=*/nullptr, error));
if (isolate == nullptr) return nullptr;

// 因为执行到这里的都有IsolateGroup,共享同一份代码
auto source = spawning_group->source();
ASSERT(isolate->source() == source);

return isolate;
}

这里主要进行了2步:

  • 使用CreateWithinExistingIsolateGroup创建Isolate
  • 使用全局的initialize_callback (也就是Isolate::InitializeCallback())初始化Isolate
Isolate::InitializeCallback()

这其中的Isolate::InitializeCallback()是在Dart::Init的时候就已经设置了的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
// -> runtime/bin/main.cc

void main(int argc, char** argv) {
...
// Initialize the Dart VM.
Dart_InitializeParams init_params;
init_params.version = DART_INITIALIZE_PARAMS_CURRENT_VERSION;
init_params.vm_snapshot_data = vm_snapshot_data;
init_params.vm_snapshot_instructions = vm_snapshot_instructions;
**init_params.create_group = CreateIsolateGroupAndSetup;**
**init_params.initialize_isolate = OnIsolateInitialize;**
init_params.shutdown_isolate = OnIsolateShutdown;
init_params.cleanup_isolate = DeleteIsolateData;
init_params.cleanup_group = DeleteIsolateGroupData;
init_params.file_open = DartUtils::OpenFile;
init_params.file_read = DartUtils::ReadFile;
init_params.file_write = DartUtils::WriteFile;
init_params.file_close = DartUtils::CloseFile;
init_params.entropy_source = DartUtils::EntropySource;

error = Dart_Initialize(&init_params);
}

// -> runtime\vm\dart_api_impl.cc

DART_EXPORT char* Dart_Initialize(Dart_InitializeParams* params) {
if (params == NULL) {
return Utils::StrDup(
"Dart_Initialize: "
"Dart_InitializeParams is null.");
}

if (params->version != DART_INITIALIZE_PARAMS_CURRENT_VERSION) {
return Utils::StrDup(
"Dart_Initialize: "
"Invalid Dart_InitializeParams version.");
}

return Dart::Init(params);
}

// -> runtime\vm\dart.cc

char* Dart::Init(const Dart_InitializeParams* params) {
if (!init_state_.SetInitializing()) {
return Utils::StrDup(
"Bad VM initialization state, "
"already initialized or "
"multiple threads initializing the VM.");
}
char* retval = DartInit(params);
if (retval != NULL) {
init_state_.ResetInitializing();
return retval;
}
init_state_.SetInitialized();
return NULL;
}

char* Dart::DartInit(const Dart_InitializeParams* params) {
...
OSThread::Init();
Zone::Init();
IsolateGroup::Init();
Isolate::InitVM();
PortMap::Init();
Service::Init();
...

Thread::ExitIsolate(); // Unregister the VM isolate from this thread.
**Isolate::SetCreateGroupCallback(params->create_group);**
**Isolate::SetInitializeCallback_(params->initialize_isolate);**
Isolate::SetShutdownCallback(params->shutdown_isolate);
Isolate::SetCleanupCallback(params->cleanup_isolate);
Isolate::SetGroupCleanupCallback(params->cleanup_group);
Isolate::SetRegisterKernelBlobCallback(params->register_kernel_blob);
Isolate::SetUnregisterKernelBlobCallback(params->unregister_kernel_blob);
...

}

也就是说,上文的Isolate::InitializeCallback()实际上就是OnIsolateInitialize,它的主要作用就是在isolate创建好之后进行统一的初始化操作,绑定一些数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
// -> runtime\bin\main.cc

static bool OnIsolateInitialize(void** child_callback_data, char** error) {
Dart_Isolate isolate = Dart_CurrentIsolate();
ASSERT(isolate != nullptr);

auto isolate_group_data =
reinterpret_cast<IsolateGroupData*>(Dart_CurrentIsolateGroupData());

auto isolate_data = new IsolateData(isolate_group_data);
*child_callback_data = isolate_data;

Dart_EnterScope();
const auto **script_uri** = isolate_group_data->script_url;
const bool **isolate_run_app_snapshot** =
isolate_group_data->RunFromAppSnapshot();
Dart_Handle **result** = SetupCoreLibraries(isolate, isolate_data,
/*group_start=*/false,
/*resolved_packages_config=*/nullptr);
if (Dart_IsError(result)) goto failed;

if (isolate_run_app_snapshot) {
result = Loader::InitForSnapshot(script_uri, isolate_data);
if (Dart_IsError(result)) goto failed;
} else {
result = DartUtils::ResolveScript(Dart_NewStringFromCString(script_uri));
if (Dart_IsError(result)) goto failed;

if (isolate_group_data->kernel_buffer() != nullptr) {
// Various core-library parts will send requests to the Loader to resolve
// relative URIs and perform other related tasks. We need Loader to be
// initialized for this to work because loading from Kernel binary
// bypasses normal source code loading paths that initialize it.
const char* resolved_script_uri = NULL;
result = Dart_StringToCString(result, &resolved_script_uri);
if (Dart_IsError(result)) goto failed;
result = Loader::InitForSnapshot(resolved_script_uri, isolate_data);
if (Dart_IsError(result)) goto failed;
}
}

Dart_ExitScope();
return true;

failed:
*error = Utils::StrDup(Dart_GetError(result));
Dart_ExitScope();
return false;
}
CreateWithinExistingIsolateGroup

CreateWithinExistingIsolateGroupCreateIsolate

再看一下创建Isolate的具体方法,这个在不同的device上面不一样,我们只关注vm下面的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
// -> runtime\vm\dart_api_impl.cc

static Dart_Isolate CreateIsolate(IsolateGroup* group,
bool is_new_group,
const char* name,
void* isolate_data,
char** error) {
**CHECK_NO_ISOLATE**(Isolate::Current());

auto source = group->source();
**Isolate* I = Dart::CreateIsolate(name, source->flags, group);**
if (I == NULL) {
if (error != NULL) {
*error = Utils::StrDup("Isolate creation failed");
}
return reinterpret_cast<Dart_Isolate>(NULL);
}

Thread* T = Thread::Current();
bool success = false;
{
StackZone zone(T);
// We enter an API scope here as InitializeIsolate could compile some
// bootstrap library files which call out to a tag handler that may create
// Api Handles when an error is encountered.
T->EnterApiScope();
const Error& error_obj = Error::Handle(
Z, **Dart::InitializeIsolate(
source->snapshot_data, source->snapshot_instructions,
source->kernel_buffer, source->kernel_buffer_size,
is_new_group ? nullptr : group, isolate_data)**);
if (error_obj.IsNull()) {
#if defined(DEBUG) && !defined(DART_PRECOMPILED_RUNTIME)
if (FLAG_check_function_fingerprints && !FLAG_precompiled_mode) {
Library::CheckFunctionFingerprints();
}
#endif // defined(DEBUG) && !defined(DART_PRECOMPILED_RUNTIME).
success = true;
} else if (error != NULL) {
*error = Utils::StrDup(error_obj.ToErrorCString());
}
// We exit the API scope entered above.
T->ExitApiScope();
}

if (success) {
if (is_new_group) {
group->heap()->InitGrowthControl();
}
// A Thread structure has been associated to the thread, we do the
// safepoint transition explicitly here instead of using the
// TransitionXXX scope objects as the reverse transition happens
// outside this scope in Dart_ShutdownIsolate/Dart_ExitIsolate.
T->set_execution_state(Thread::kThreadInNative);
T->EnterSafepoint();
if (error != NULL) {
*error = NULL;
}
return Api::CastIsolate(I);
}

Dart::ShutdownIsolate();
return reinterpret_cast<Dart_Isolate>(NULL);
}

这里主要有两步:

  • Dart::CreateIsolate创建了Isolate* I
  • 然后调用Dart::InitializeIsolate初始化isolate。

Dart::CreateIsolate:

1
2
3
4
5
6
7
8
9
10
// -> runtime\vm\dart.cc

Isolate* Dart::CreateIsolate(const char* name_prefix,
const Dart_IsolateFlags& api_flags,
IsolateGroup* isolate_group) {
// Create a new isolate.
Isolate* isolate =
Isolate::InitIsolate(name_prefix, isolate_group, api_flags);
return isolate;
}

💡 在Dart虚拟机启动(Dart::DartInit)的时候,也会调用Dart::InitIsolate创建虚拟机对应的Isolate,执行UI操作:
vm_isolate_ = Isolate::InitIsolate(kVmIsolateName, group, api_flags, is_vm_isolate);

Isolate::InitIsolate方法中,先是用isolate_group创建了新的Isolate,然后将其与ThreadMessageHandlerSendPort等绑定:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
// -> runtime\vm\isolate.cc

Isolate* Isolate::InitIsolate(const char* name_prefix,
IsolateGroup* isolate_group,
const Dart_IsolateFlags& api_flags,
bool is_vm_isolate) {
// 创建新的Isolate
**Isolate* result = new Isolate(isolate_group, api_flags);**
result->BuildName(name_prefix);
if (!is_vm_isolate) {
// vm isolate object store is initialized later, after null instance
// is created (in Dart::Init).
// Non-vm isolates need to have isolate object store initialized is that
// exit_listeners have to be null-initialized as they will be used if
// we fail to create isolate below, have to do low level shutdown.
ASSERT(result->group()->object_store() != nullptr);
result->isolate_object_store()->Init();
}

ASSERT(result != nullptr);

#if !defined(PRODUCT)
// Initialize metrics.
#define ISOLATE_METRIC_INIT(type, variable, name, unit) \
result->metric_##variable##_.InitInstance(result, name, NULL, Metric::unit);
ISOLATE_METRIC_LIST(ISOLATE_METRIC_INIT);
#undef ISOLATE_METRIC_INIT
#endif // !defined(PRODUCT)

// First we ensure we enter the isolate. This will ensure we're participating
// in any safepointing requests from this point on. Other threads requesting a
// safepoint operation will therefore wait until we've stopped.
//
// Though the [result] isolate is still in a state where no memory has been
// allocated, which means it's safe to GC the isolate group until here.
// 创建一个Thread并和当前isolate绑定
if (!**Thread::EnterIsolate(result)**) {
delete result;
return nullptr;
}

// Setup the isolate message handler.
MessageHandler* handler = new IsolateMessageHandler(result);
ASSERT(handler != nullptr);
// 在这里绑定了message handler
**result->set_message_handler(handler);**

**result->set_main_port(PortMap::CreatePort(result->message_handler()));**
#if defined(DEBUG)
// Verify that we are never reusing a live origin id.
VerifyOriginId id_verifier(result->main_port());
Isolate::VisitIsolates(&id_verifier);
#endif
result->set_origin_id(result->main_port());
result->set_pause_capability(result->random()->NextUInt64());
result->set_terminate_capability(result->random()->NextUInt64());

#if !defined(PRODUCT)
result->debugger_ = new Debugger(result);
#endif

// Now we register the isolate in the group. From this point on any GC would
// traverse the isolate roots (before this point, the roots are only pointing
// to vm-isolate objects, e.g. null)
**isolate_group->RegisterIsolate(result);**

if (ServiceIsolate::NameEquals(name_prefix)) {
ASSERT(!ServiceIsolate::Exists());
ServiceIsolate::SetServiceIsolate(result);
#if !defined(DART_PRECOMPILED_RUNTIME)
} else if (KernelIsolate::NameEquals(name_prefix)) {
ASSERT(!KernelIsolate::Exists());
KernelIsolate::SetKernelIsolate(result);
#endif // !defined(DART_PRECOMPILED_RUNTIME)
}

if (FLAG_trace_isolates) {
if (name_prefix == nullptr || strcmp(name_prefix, "vm-isolate") != 0) {
OS::PrintErr(
"[+] Starting isolate:\n"
"\tisolate: %s\n",
result->name());
}
}

// Add to isolate list. Shutdown and delete the isolate on failure.
if (!TryMarkIsolateReady(result)) {
result->LowLevelShutdown();
Isolate::LowLevelCleanup(result);
return nullptr;
}

return result;
}

Dart::InitializeIsolate

这里主要是对isolate进行初始化,并在初始化完成后通知创建这个isolate的isolate。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
// -> runtime\vm\dart.cc

ErrorPtr Dart::InitializeIsolate(const uint8_t* snapshot_data,
const uint8_t* snapshot_instructions,
const uint8_t* kernel_buffer,
intptr_t kernel_buffer_size,
IsolateGroup* source_isolate_group,
void* isolate_data) {
// Initialize the new isolate.
Thread* T = Thread::Current();
Isolate* I = T->isolate();
auto IG = T->isolate_group();
#if defined(SUPPORT_TIMELINE)
TimelineBeginEndScope tbes(T, Timeline::GetIsolateStream(),
"InitializeIsolate");
tbes.SetNumArguments(1);
tbes.CopyArgument(0, "isolateName", I->name());
#endif
ASSERT(I != NULL);
StackZone zone(T);
HandleScope handle_scope(T);
bool was_child_cloned_into_existing_isolate = false;
if (source_isolate_group != nullptr) {
// If a static field gets registered in [IsolateGroup::RegisterStaticField]:
//
// * before this block it will ignore this isolate. The [Clone] of the
// initial field table will pick up the new value.
// * after this block it will add the new static field to this isolate.
{
SafepointReadRwLocker reader(T, source_isolate_group->program_lock());
**I->set_field_table**(T,
source_isolate_group->initial_field_table()->Clone(I));
**I->field_table()->MarkReadyToUse();**
}

was_child_cloned_into_existing_isolate = true;
} else {
const Error& error = Error::Handle(
// 从IsolateGroup中引用一些通用的变量(常量等等)
**InitIsolateFromSnapshot**(T, I, snapshot_data, snapshot_instructions,
kernel_buffer, kernel_buffer_size));
if (!error.IsNull()) {
return error.ptr();
}
}

Object::VerifyBuiltinVtables();
if (T->isolate()->origin_id() == 0) {
DEBUG_ONLY(IG->heap()->Verify(kForbidMarked));
}

#if defined(DART_PRECOMPILED_RUNTIME)
const bool kIsAotRuntime = true;
#else
const bool kIsAotRuntime = false;
#endif

if (kIsAotRuntime || was_child_cloned_into_existing_isolate) {
#if !defined(TARGET_ARCH_IA32)
ASSERT(IG->object_store()->build_generic_method_extractor_code() !=
Code::null());
ASSERT(IG->object_store()->build_nongeneric_method_extractor_code() !=
Code::null());
#endif
} else {
#if !defined(TARGET_ARCH_IA32)
if (I != Dart::vm_isolate()) {
if (IG->object_store()->build_generic_method_extractor_code() !=
nullptr) {
SafepointWriteRwLocker ml(T, IG->program_lock());
if (IG->object_store()->build_generic_method_extractor_code() !=
nullptr) {
IG->object_store()->set_build_generic_method_extractor_code(
Code::Handle(
StubCode::GetBuildGenericMethodExtractorStub(nullptr)));
}
}
if (IG->object_store()->build_nongeneric_method_extractor_code() !=
nullptr) {
SafepointWriteRwLocker ml(T, IG->program_lock());
if (IG->object_store()->build_nongeneric_method_extractor_code() !=
nullptr) {
IG->object_store()->set_build_nongeneric_method_extractor_code(
Code::Handle(
StubCode::GetBuildNonGenericMethodExtractorStub(nullptr)));
}
}
}
#endif // !defined(TARGET_ARCH_IA32)
}

I->set_ic_miss_code(StubCode::SwitchableCallMiss());

Error& error = Error::Handle();
if (snapshot_data == nullptr || kernel_buffer != nullptr) {
error ^= IG->object_store()->PreallocateObjects();
if (!error.IsNull()) {
return error.ptr();
}
}
const auto& out_of_memory =
Object::Handle(IG->object_store()->out_of_memory());
error ^= I->isolate_object_store()->PreallocateObjects(out_of_memory);
if (!error.IsNull()) {
return error.ptr();
}

if (!was_child_cloned_into_existing_isolate) {
IG->heap()->InitGrowthControl();
}
I->set_init_callback_data(isolate_data);
if (FLAG_print_class_table) {
IG->class_table()->Print();
}
#if !defined(PRODUCT)
ServiceIsolate::MaybeMakeServiceIsolate(I);
if (!Isolate::IsSystemIsolate(I)) {
I->message_handler()->set_should_pause_on_start(
FLAG_pause_isolates_on_start);
I->message_handler()->set_should_pause_on_exit(FLAG_pause_isolates_on_exit);
}
#endif // !defined(PRODUCT)

ServiceIsolate::SendIsolateStartupMessage();
#if !defined(PRODUCT)
I->debugger()->NotifyIsolateCreated();
#endif

// Create tag table.
I->set_tag_table(GrowableObjectArray::Handle(GrowableObjectArray::New()));
// Set up default UserTag.
const UserTag& default_tag = UserTag::Handle(UserTag::DefaultTag());
I->set_current_tag(default_tag);

I->init_loaded_prefixes_set_storage();

return Error::null();
}

可以看到,如果是调用Isolate.spawn()的话,先从当前isolate获取对应的Isolate Group,然后使用这个Isolate Group创建配置一个新的isolate,这样在同一个isolate group中的Isolate可以共享常量,heap等。

Isolate_spawnUri

如果是使用Isolate.spawnUri()的话,就会通过Isolate_spawnUri来创建isolate。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
// -> runtime\lib\isolate.cc

DEFINE_NATIVE_ENTRY(Isolate_spawnUri, 0, 12) {
// 解析参数
...

// Canonicalize the uri with respect to the current isolate.
const Library& root_lib =
Library::Handle(isolate->group()->object_store()->root_library());
char* error = NULL;
// 获取canonical_uri
const char* **canonical_uri = CanonicalizeUri**(thread, root_lib, uri, &error);
if (canonical_uri == NULL) {
const String& msg = String::Handle(String::New(error));
ThrowIsolateSpawnException(msg);
}

const char* utf8_package_config =
packageConfig.IsNull() ? NULL : String2UTF8(packageConfig);
const char* utf8_debug_name =
debugName.IsNull() ? NULL : String2UTF8(debugName);

std::unique_ptr<IsolateSpawnState> state(new **IsolateSpawnState**(
port.Id(), canonical_uri, utf8_package_config, &arguments_buffer,
&message_buffer, paused.value(), fatal_errors, on_exit_port,
// 注意下面这里的group=nullptr
on_error_port, utf8_debug_name, **/*group=*/nullptr**));

// If we were passed a value then override the default flags state for
// checked mode.
if (!checked.IsNull()) {
Dart_IsolateFlags* flags = state->isolate_flags();
flags->enable_asserts = checked.value();
}

// Since this is a call to Isolate.spawnUri, don't copy the parent's code.
state->isolate_flags()->copy_parent_code = false;

isolate->group()->thread_pool()->**Run<SpawnIsolateTask>**(isolate,
std::move(state));
return Object::null();
}

可以看到Isolate_spawnUri还是执行了SpawnIsolateTask

SpawnIsolateTask

SpawnIsolateTask.Run方法中,因为spawnUriIsolateSpawnStateIsolateGroupnulltrp,所以这里执行的是RunHeavyweight(name)

RunHeavyweight

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
// -> 

class SpawnIsolateTask : public ThreadPool::Task {

void Run() override {
const char* name = (state_->debug_name() == nullptr)
? state_->function_name()
: state_->debug_name();
ASSERT(name != nullptr);

auto group = state_->isolate_group();
if (group == nullptr) {
**RunHeavyweight(name);**
} else {
RunLightweight(name);
}
}

void RunHeavyweight(const char* name) {
// The create isolate group callback is mandatory. If not provided we
// cannot spawn isolates.
// 在Dart::DartInit中已经被设置,在Isolate创建时会被回调
**auto create_group_callback = Isolate::CreateGroupCallback();**
if (create_group_callback == nullptr) {
FailedSpawn("Isolate spawn is not supported by this Dart embedder\n");
return;
}

char* error = nullptr;

// Make a copy of the state's isolate flags and hand it to the callback.
Dart_IsolateFlags api_flags = *(state_->isolate_flags());
api_flags.is_system_isolate = false;
// 创建isolate
**Dart_Isolate isolate =
(create_group_callback)(state_->script_url(), name, nullptr,
state_->package_config(), &api_flags,
parent_isolate_->init_callback_data(), &error);**
parent_isolate_->DecrementSpawnCount();
parent_isolate_ = nullptr;

if (isolate == nullptr) {
FailedSpawn(error, /*has_current_isolate=*/false);
free(error);
return;
}
// 切换到指定的isolate
Dart_EnterIsolate(isolate);

// 这里也调用了Run方法
**Run(reinterpret_cast<Isolate*>(isolate));**
}

}

主要创建Isolate的过程在Isolate::CreateGroupCallback();中,让我们看一下他是怎么来的:

Isolate::CreateGroupCallback()

他和上述Isolate::InitializeCallback_的来源一致,都是在Dart_Initialize中配置的,此外,还使用了parent_isolate_->init_callback_data()

先看一下的CreateIsolateGroupAndSetup实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
// -> runtime\bin\main.cc

static Dart_Isolate CreateIsolateGroupAndSetup(const char* script_uri,
const char* main,
const char* package_root,
const char* package_config,
Dart_IsolateFlags* flags,
void* callback_data,
char** error) {
// The VM should never call the isolate helper with a NULL flags.
ASSERT(flags != NULL);
ASSERT(flags->version == DART_FLAGS_CURRENT_VERSION);
ASSERT(package_root == nullptr);

bool dontneed_safe = true;
#if defined(DART_HOST_OS_LINUX)
// This would also be true in Linux, except that Google3 overrides the default
// ELF interpreter to one that apparently doesn't create proper mappings.
dontneed_safe = false;
#elif defined(DEBUG)
// If the snapshot isn't file-backed, madvise(DONT_NEED) is destructive.
if (Options::force_load_elf_from_memory()) {
dontneed_safe = false;
}
#endif
flags->snapshot_is_dontneed_safe = dontneed_safe;

int exit_code = 0;
#if !defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM)
if (strcmp(script_uri, DART_KERNEL_ISOLATE_NAME) == 0) {
return **CreateAndSetupKernelIsolate**(script_uri, package_config, flags, error,
&exit_code);
}
#endif // !defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM)

#if !defined(DART_PRECOMPILED_RUNTIME)
if (strcmp(script_uri, DART_DEV_ISOLATE_NAME) == 0) {
return **CreateAndSetupDartDevIsolate**(script_uri, package_config, flags,
error, &exit_code);
}
#endif // !defined(DART_PRECOMPILED_RUNTIME)

if (strcmp(script_uri, DART_VM_SERVICE_ISOLATE_NAME) == 0) {
return **CreateAndSetupServiceIsolate**(script_uri, package_config, flags,
error, &exit_code);
}

bool is_main_isolate = false;
return **CreateIsolateGroupAndSetupHelper**(is_main_isolate, script_uri, main,
package_config, flags, callback_data,
error, &exit_code);
}

这里创建Isolate的时候,区分了几种情况:

  • 如果是kernel-service(DART_KERNEL_ISOLATE_NAME)就执行CreateAndSetupKernelIsolate
  • 如果是dartdev(DART_DEV_ISOLATE_NAME)就执行CreateAndSetupDartDevIsolate
  • 如果是vm-service(DART_VM_SERVICE_ISOLATE_NAME)就执行CreateAndSetupServiceIsolate
  • 如果以上都不满足,就执行CreateIsolateGroupAndSetupHelper

显然,当我们在Dart代码中调用Isolate.spawnUri的时候,这里会执行的是CreateIsolateGroupAndSetupHelper

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
// -> runtime\bin\main.cc

// 调用方
bool is_main_isolate = false;
return CreateIsolateGroupAndSetupHelper(is_main_isolate, script_uri, main,
package_config, flags, callback_data,
error, &exit_code);

// Returns newly created Isolate on success, NULL on failure.
static Dart_Isolate CreateIsolateGroupAndSetupHelper(
bool is_main_isolate,
const char* script_uri,
const char* name,
const char* packages_config,
Dart_IsolateFlags* flags,
void* callback_data,
char** error,
int* exit_code) {

...
// 根据是AOT还是JIT获取kernel_buffer,app_snapshot,isolate_run_app_snapshot等数据
#if defined(DART_PRECOMPILED_RUNTIME){
// AOT: All isolates need to be run from AOT compiled snapshots.
}
#else{
// JIT: Main isolate starts from the app snapshot, if any. Other isolates
// use the core libraries snapshot.
}

// 创建isolate_group_data
auto isolate_group_data = new IsolateGroupData(
script_uri, packages_config, app_snapshot, isolate_run_app_snapshot);
// copy_parent_code为true的话,这里的kernel_buffer为NULL
if (kernel_buffer != NULL) {
if (kernel_buffer_ptr) {
isolate_group_data->SetKernelBufferAlreadyOwned(
std::move(kernel_buffer_ptr), kernel_buffer_size);
} else {
isolate_group_data->SetKernelBufferNewlyOwned(kernel_buffer,
kernel_buffer_size);
}
}

Dart_Isolate isolate = NULL;

IsolateData* isolate_data = nullptr;

#if !defined(DART_PRECOMPILED_RUNTIME)
if (!isolate_run_app_snapshot && (isolate_snapshot_data == NULL)) {
const uint8_t* platform_kernel_buffer = NULL;
intptr_t platform_kernel_buffer_size = 0;
dfe.LoadPlatform(&platform_kernel_buffer, &platform_kernel_buffer_size);
if (platform_kernel_buffer == NULL) {
platform_kernel_buffer = kernel_buffer;
platform_kernel_buffer_size = kernel_buffer_size;
}
if (platform_kernel_buffer == NULL) {
#if defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM)
FATAL(
"Binary built with --exclude-kernel-service. Cannot run"
" from source.");
#else
FATAL("platform_program cannot be NULL.");
#endif // defined(EXCLUDE_CFE_AND_KERNEL_PLATFORM)
}
// TODO(sivachandra): When the platform program is unavailable, check if
// application kernel binary is self contained or an incremental binary.
// Isolate should be created only if it is a self contained kernel binary.
isolate_data = new IsolateData(isolate_group_data);
isolate = Dart_CreateIsolateGroupFromKernel(
script_uri, name, platform_kernel_buffer, platform_kernel_buffer_size,
flags, isolate_group_data, isolate_data, error);
} else {
isolate_data = new IsolateData(isolate_group_data);
// Creates a new isolate. The new isolate becomes the current isolate.
isolate = Dart_CreateIsolateGroup(script_uri, name, isolate_snapshot_data,
isolate_snapshot_instructions, flags,
isolate_group_data, isolate_data, error);
}
#else
**isolate_data = new IsolateData**(isolate_group_data);
// Creates a new isolate. The new isolate becomes the current isolate.
**isolate = Dart_CreateIsolateGroup**(script_uri, name, isolate_snapshot_data,
isolate_snapshot_instructions, flags,
**isolate_group_data**, **isolate_data**, error);
#endif // !defined(DART_PRECOMPILED_RUNTIME)

Dart_Isolate created_isolate = NULL;
if (isolate == NULL) {
delete isolate_data;
delete isolate_group_data;
} else {
**created_isolate = IsolateSetupHelper**(
isolate, is_main_isolate, script_uri, packages_config,
isolate_run_app_snapshot, flags, error, exit_code);
}
int64_t end = Dart_TimelineGetMicros();
Dart_TimelineEvent("CreateIsolateGroupAndSetupHelper", start, end,
Dart_Timeline_Event_Duration, 0, NULL, NULL);
return created_isolate;
}

这里可以看到,CreateIsolateGroupAndSetupHelper按照是JIT还是AOT的编译方式,有不同的获取数据的方式,但不管哪种方式,最后都执行了一下三步:

  • 创建IsolateData* isolate_data 使用isolate_group_data创建IsolateData
  • 创建Dart_Isolate isolate 创建Dart_Isolate,将script_uriisolate_data,和isolate_group_data等绑定
  • 创建并返回Dart_Isolate created_isolate包装isolate ,进行数据绑定,并将isolate标记为runnable
Dart_CreateIsolateGroup

这里分析一下**Dart_CreateIsolateGroup的过程:**

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
// --> runtime/vm/dart_api_impl.cc#L1371

DART_EXPORT Dart_Isolate
Dart_CreateIsolateGroup(const char* script_uri,
const char* name,
const uint8_t* snapshot_data,
const uint8_t* snapshot_instructions,
Dart_IsolateFlags* flags,
void* isolate_group_data,
void* isolate_data,
char** error) {
API_TIMELINE_DURATION(Thread::Current());

Dart_IsolateFlags api_flags;
if (flags == nullptr) {
Isolate::FlagsInitialize(&api_flags);
flags = &api_flags;
}

const char* non_null_name = name == nullptr ? "isolate" : name;
std::unique_ptr<IsolateGroupSource> source(
new IsolateGroupSource(script_uri, non_null_name, snapshot_data,
snapshot_instructions, nullptr, -1, *flags));
// 创建Isolate Group
auto group = new IsolateGroup(std::move(source), isolate_group_data, *flags);
// 创建Isolate Group持有的Heap,由所有在这个Isolate Group下的isolate共享
group->CreateHeap(
/*is_vm_isolate=*/false, IsServiceOrKernelIsolateName(non_null_name));
IsolateGroup::RegisterIsolateGroup(group);
// 根据刚刚创建的Isolate Group创建Isolate
Dart_Isolate isolate = CreateIsolate(group, /*is_new_group=*/true,
non_null_name, isolate_data, error);
if (isolate != nullptr) {
group->set_initial_spawn_successful();
}
return isolate;
}

Run(Isolate* child)

在上面的分析中,我们注意到,无论是RunHeavyweight(const char* name)还是RunLightweight(const char* name)方法,最后在创建了新的isolate之后,都执行了Run(Isolate* child)方法,在这里正式启动了isolate:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// -> runtime\lib\isolate.cc

void Run(Isolate* child) {
if (!EnsureIsRunnable(child)) {
Dart_ShutdownIsolate();
return;
}

state_->set_isolate(child);
if (state_->origin_id() != ILLEGAL_PORT) {
// origin_id is set to parent isolate main port id when spawning via
// spawnFunction.
child->set_origin_id(state_->origin_id());
}

bool success = true;
{
auto thread = Thread::Current();
// TransitionNativeToVM is used to transition the safepoint state of a
// thread from "running native code" to "running vm code" and ensures
// that the state is reverted back to "running native code" when
// exiting the scope/frame.
TransitionNativeToVM transition(thread);
// Create an empty zone and set is at the current zone for the Thread.
StackZone zone(thread);
// The class HandleScope is used to start a new handles scope in the
// code.
HandleScope hs(thread);

success = **EnqueueEntrypointInvocationAndNotifySpawner**(thread);
}

if (!success) {
state_ = nullptr;
Dart_ShutdownIsolate();
return;
}

// All preconditions are met for this to always succeed.
char* error = nullptr;
// Lets the VM run message processing for the isolate.
if (!**Dart_RunLoopAsync**(state_->errors_are_fatal(), state_->on_error_port(),
state_->on_exit_port(), &error)) {
FATAL("Dart_RunLoopAsync() failed: %s. Please file a Dart VM bug report.",
error);
}
}

这里只是做了一些环境准备,然后在EnqueueEntrypointInvocationAndNotifySpawner方法中将isolate要运行的所有东西都准备好,然后再在Dart_RunLoopAsync方法中正式开始isolate处理event queue.

EnqueueEntrypointInvocationAndNotifySpawner

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// -> runtime\lib\isolate.cc

bool EnqueueEntrypointInvocationAndNotifySpawner(Thread* thread) {
auto isolate = thread->isolate();
auto zone = thread->zone();
const bool is_spawn_uri = state_->is_spawn_uri();

// Step 1) Resolve the entrypoint function.
// 查找isolate开始运行的第一个方法,比如Isolate.spawn的spawn或者Isolate.spawnUri的main方法
auto& entrypoint_closure = Closure::Handle(zone);
if (state_->closure_tuple_handle() != nullptr) {
const auto& result = Object::Handle(
zone,
ReadObjectGraphCopyMessage(thread, state_->closure_tuple_handle()));
if (result.IsError()) {
ReportError(
"Failed to deserialize the passed entrypoint to the new isolate.");
return false;
}
entrypoint_closure = Closure::RawCast(result.ptr());
} else {
const auto& result = Object::Handle(zone, state_->ResolveFunction());
if (result.IsError()) {
ASSERT(is_spawn_uri);
ReportError("Failed to resolve entrypoint function.");
return false;
}
ASSERT(result.IsFunction());
auto& func = Function::Handle(zone, Function::Cast(result).ptr());
func = func.ImplicitClosureFunction();
entrypoint_closure = func.ImplicitStaticClosure();
}

// Step 2) Enqueue delayed invocation of entrypoint callback.
const auto& args_obj = Object::Handle(zone, state_->BuildArgs(thread));
if (args_obj.IsError()) {
ReportError(
"Failed to deserialize the passed arguments to the new isolate.");
return false;
}
ASSERT(args_obj.IsNull() || args_obj.IsInstance());
const auto& message_obj =
Object::Handle(zone, state_->BuildMessage(thread));
if (message_obj.IsError()) {
ReportError(
"Failed to deserialize the passed arguments to the new isolate.");
return false;
}
ASSERT(message_obj.IsNull() || message_obj.IsInstance());
// 解析参数,分别是isolate初始运行方法,参数args、messgae、是否spawn_uri
const Array& args = Array::Handle(zone, Array::New(4));
args.SetAt(0, entrypoint_closure);
args.SetAt(1, args_obj);
args.SetAt(2, message_obj);
args.SetAt(3, is_spawn_uri ? Bool::True() : Bool::False());

const auto& lib = Library::Handle(zone, Library::IsolateLibrary());
const auto& entry_name = String::Handle(zone, String::New("_startIsolate"));
const auto& entry_point =
Function::Handle(zone, lib.LookupLocalFunction(entry_name));
ASSERT(entry_point.IsFunction() && !entry_point.IsNull());
const auto& result =
Object::Handle(zone, DartEntry::InvokeFunction(entry_point, args));
if (result.IsError()) {
ReportError("Failed to enqueue delayed entrypoint invocation.");
return false;
}

// Step 3) Pause the isolate if required & Notify parent isolate about
// isolate creation.
const auto& capabilities = Array::Handle(zone, Array::New(2));
auto& capability = Capability::Handle(zone);
capability = Capability::New(isolate->pause_capability());
capabilities.SetAt(0, capability);
capability = Capability::New(isolate->terminate_capability());
capabilities.SetAt(1, capability);
const auto& send_port =
SendPort::Handle(zone, SendPort::New(isolate->main_port()));
const auto& message = Array::Handle(zone, Array::New(2));
message.SetAt(0, send_port);
message.SetAt(1, capabilities);
if (state_->paused()) {
capability ^= capabilities.At(0);
const bool added = isolate->AddResumeCapability(capability);
ASSERT(added);
isolate->message_handler()->increment_paused();
}
{
// If parent isolate died, we ignore the fact that we cannot notify it.
// 创建一个新的Message并将其压入Isolate的父Isolate对应的MessageHandler的event queue中
PortMap::PostMessage(WriteMessage(/* can_send_any_object */ false,
/* same_group */ false, message,
state_->parent_port(),
Message::kNormalPriority));
}

return true;
}

这里主要做了3件事:

  • 查找isolate开始运行的第一个方法entrypoint,比如Isolate.spawnentrypoint或者Isolate.spawnUrimain方法
  • 解析参数,分别是isolate初始运行方法,参数argsmessgae、是否spawn_uri等等,将其与上一步找到的entrypoint结合
  • (如果需要的话暂停创建好的isolate),并通知isolate的父isolate当前isolate创建成功(附带当前isolate的send_port

至此,Isolate的创建工作已经完成,在Dart_RunLoopAsync开始isolate处理消息:

Dart_RunLoopAsync

在这里主要是开始处理event loop。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
// ->  runtime\vm\dart_api_impl.cc

DART_EXPORT bool Dart_RunLoopAsync(bool errors_are_fatal,
Dart_Port on_error_port,
Dart_Port on_exit_port,
char** error) {
auto thread = Thread::Current();
auto isolate = thread->isolate();
CHECK_ISOLATE(isolate);
*error = nullptr;

if (thread->api_top_scope() != nullptr) {
*error = Utils::StrDup("There must not be an active api scope.");
return false;
}

if (!isolate->is_runnable()) {
const char* error_msg = isolate->MakeRunnable();
if (error_msg != nullptr) {
*error = Utils::StrDup(error_msg);
return false;
}
}

isolate->SetErrorsFatal(errors_are_fatal);

if (on_error_port != ILLEGAL_PORT || on_exit_port != ILLEGAL_PORT) {
auto thread = Thread::Current();
TransitionNativeToVM transition(thread);
StackZone zone(thread);

if (on_error_port != ILLEGAL_PORT) {
const auto& port =
SendPort::Handle(thread->zone(), SendPort::New(on_error_port));
isolate->AddErrorListener(port);
}
if (on_exit_port != ILLEGAL_PORT) {
const auto& port =
SendPort::Handle(thread->zone(), SendPort::New(on_exit_port));
isolate->AddExitListener(port, Instance::null_instance());
}
}

Dart_ExitIsolate();
**isolate->Run();**
return true;
}

// -> runtime\vm\isolate.cc
void Isolate::Run() {
message_handler()->Run(group()->thread_pool(), nullptr, ShutdownIsolate,
reinterpret_cast<uword>(this));
}

Isolate::Run()实际上是开启了处理消息队列:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
// -> 

// Runs this message handler on the thread pool.
//
// Before processing messages, the optional StartFunction is run.
//
// A message handler will run until it terminates either normally or
// abnormally. Normal termination occurs when the message handler
// no longer has any live ports. Abnormal termination occurs when
// HandleMessage() indicates that an error has occurred during
// message processing.

// Returns false if the handler terminated abnormally, otherwise it
// returns true.
bool MessageHandler::Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data) {
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Starting message handler:\n"
"\thandler: %s\n",
name());
}
ASSERT(pool_ == NULL);
ASSERT(!delete_me_);
pool_ = pool;
start_callback_ = start_callback;
end_callback_ = end_callback;
callback_data_ = data;
task_running_ = true;
bool result = **pool_->Run<MessageHandlerTask>(this);**
if (!result) {
pool_ = nullptr;
start_callback_ = nullptr;
end_callback_ = nullptr;
callback_data_ = 0;
task_running_ = false;
}
return result;
}

// -> runtime\vm\thread_pool.h
class ThreadPool {
bool Run(Args&&... args) {
return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
}
...
}

// -> runtime\vm\thread_pool.cc

bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
Worker* new_worker = nullptr;
{
MonitorLocker ml(&pool_monitor_);
if (shutting_down_) {
return false;
}
// 从线程池中获取task,如果有空闲的/达到最大数量就尝试复用(此时这里返回null)
// 否则创建新的并返回
new_worker = **ScheduleTaskLocked**(&ml, std::move(task));
}
if (new_worker != nullptr) {
// 创建一个新的Worker在新的系统线程运行task
new_worker->**StartThread()**;
}
return true;
}

void ThreadPool::Worker::StartThread() {
// 创建一个新的系统线程,运行指定的代码,
// android的实现在runtime\vm\os_thread_android.cc
int result = OSThread::Start("DartWorker", &**Worker::Main**,
reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Could not start worker thread: result = %d.", result);
}
}

这里通过ThreadPool::ScheduleTaskLocked方法获取new_worker

  • 如果已有的worker有空闲的或者已经达到最大数目了,就等待已有的worker执行任务
  • 否则就创建新的worker,并在新的线程运行

在获取到worker之后,就执行MessageHandlerTask(见下文详细分析)。

我们主要关注3点:

  • ScheduleTaskLocked 分配Worker
  • OSThread::Start中使用&Worker::Main 在新系统线程开启Worker循环
  • MessageHandlerTask 执行具体的消息分发内容

ScheduleTaskLocked

先详细看一下获取new_workerThreadPool::ScheduleTaskLocked方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
// -> runtime\vm\thread_pool.cc

ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
std::unique_ptr<Task> task) {
// Enqueue the new task.
tasks_.Append(task.release());
pending_tasks_++;
ASSERT(pending_tasks_ >= 1);

// Notify existing idle worker (if available).
if (count_idle_ >= pending_tasks_) {
ASSERT(!idle_workers_.IsEmpty());
ml->Notify();
return nullptr;
}

// If we have maxed out the number of threads running, we will not start a
// new one.
if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
if (!idle_workers_.IsEmpty()) {
ml->Notify();
}
return nullptr;
}

// Otherwise start a new worker.
auto new_worker = new Worker(this);
idle_workers_.Append(new_worker);
count_idle_++;
return new_worker;
}

这里的逻辑是:

将当前任务加入到tasks_队列中。

  • 如果空闲count_idle_ 的Worker比等待中的任务数pending_tasks_多,那就发送通知,使用已有的Worker处理任务。
  • 如果当前Worker数量已经最大了,那就将等待中的任务数pending_tasks_ 加一,等待有空闲的Worker处理任务。
  • 否则,就新建一个Worker(会对应创建一个新的系统线程)来处理任务。

&Worker::Main

ThreadPool::RunImpl(std::unique_ptr<Task> task)这里,StartThread的第二个参数,**&Worker::Main**启动了一个循环,不断的在任务队列tasks_中取出消息并执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// -> runtime\vm\thread_pool.cc

ThreadPool::Worker::Main(uword args){
Worker* worker = reinterpret_cast<Worker*>(args);
ThreadPool* pool = worker->pool_;

pool->WorkerLoop(worker);

}

void ThreadPool::WorkerLoop(Worker* worker) {
WorkerList dead_workers_to_join;

while (true) {
MonitorLocker ml(&pool_monitor_);
// worker会从task_取出一个task并运行
if (!tasks_.IsEmpty()) {
IdleToRunningLocked(worker);
while (!tasks_.IsEmpty()) {
std::unique_ptr<Task> task(tasks_.RemoveFirst());
pending_tasks_--;
MonitorLeaveScope mls(&ml);
**task->Run();**
ASSERT(Isolate::Current() == nullptr);
task.reset();
}
RunningToIdleLocked(worker);
}

if (running_workers_.IsEmpty()) {
ASSERT(tasks_.IsEmpty());
OnEnterIdleLocked(&ml);
if (!tasks_.IsEmpty()) {
continue;
}
}

if (shutting_down_) {
ObtainDeadWorkersLocked(&dead_workers_to_join);
IdleToDeadLocked(worker);
break;
}

// Sleep until we get a new task, we time out or we're shutdown.
const int64_t idle_start = OS::GetCurrentMonotonicMicros();
bool done = false;
while (!done) {
const auto result = ml.WaitMicros(ComputeTimeout(idle_start));

// We have to drain all pending tasks.
if (!tasks_.IsEmpty()) break;

if (shutting_down_ || result == Monitor::kTimedOut) {
done = true;
break;
}
}
if (done) {
ObtainDeadWorkersLocked(&dead_workers_to_join);
IdleToDeadLocked(worker);
break;
}
}

// Before we transitioned to dead we obtained the list of previously died dead
// workers, which we join here. Since every death of a worker will join
// previously died workers, we keep the pending non-joined [dead_workers_] to
// effectively 1.
JoinDeadWorkersLocked(&dead_workers_to_join);
}

MessageHandlerTask

无论是哪种Worker,最后都是执行的MessageHandlerTask

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
// -> runtime\vm\message_handler.cc

class MessageHandlerTask : public ThreadPool::Task {
public:
explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
ASSERT(handler != NULL);
}

virtual void Run() {
ASSERT(handler_ != NULL);
handler_->TaskCallback();
}

void MessageHandler::TaskCallback() {
ASSERT(Isolate::Current() == NULL);
MessageStatus status = kOK;
bool run_end_callback = false;
bool delete_me = false;
EndCallback end_callback = NULL;
CallbackData callback_data = 0;
{
// We will occasionally release and reacquire this monitor in this
// function. Whenever we reacquire the monitor we *must* process
// all pending OOB messages, or we may miss a request for vm
// shutdown.
MonitorLocker ml(&monitor_);

// This method is running on the message handler task. Which means no
// other message handler tasks will be started until this one sets
// [task_running_] to false.
ASSERT(task_running_);

#if !defined(PRODUCT)
if (ShouldPauseOnStart(kOK)) {
if (!is_paused_on_start()) {
PausedOnStartLocked(&ml, true);
}
// More messages may have come in before we (re)acquired the monitor.
status = HandleMessages(&ml, false, false);
if (ShouldPauseOnStart(status)) {
// Still paused.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false; // No task in queue.
return;
} else {
PausedOnStartLocked(&ml, false);
}
}
if (is_paused_on_exit()) {
status = HandleMessages(&ml, false, false);
if (ShouldPauseOnExit(status)) {
// Still paused.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false; // No task in queue.
return;
} else {
PausedOnExitLocked(&ml, false);
}
}
#endif // !defined(PRODUCT)

if (status == kOK) {
if (start_callback_ != nullptr) {
// Initialize the message handler by running its start function,
// if we have one. For an isolate, this will run the isolate's
// main() function.
//
// Release the monitor_ temporarily while we call the start callback.
ml.Exit();
status = start_callback_(callback_data_);
ASSERT(Isolate::Current() == NULL);
start_callback_ = NULL;
ml.Enter();
}

// Handle any pending messages for this message handler.
if (status != kShutdown) {
status = HandleMessages(&ml, (status == kOK), true);
}
}

// The isolate exits when it encounters an error or when it no
// longer has live ports.
if (status != kOK || !HasLivePorts()) {
#if !defined(PRODUCT)
if (ShouldPauseOnExit(status)) {
if (FLAG_trace_service_pause_events) {
OS::PrintErr(
"Isolate %s paused before exiting. "
"Use the Observatory to release it.\n",
name());
}
PausedOnExitLocked(&ml, true);
// More messages may have come in while we released the monitor.
**status = HandleMessages(&ml, false, false);**
if (ShouldPauseOnExit(status)) {
// Still paused.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false; // No task in queue.
return;
} else {
PausedOnExitLocked(&ml, false);
}
}
#endif // !defined(PRODUCT)
if (FLAG_trace_isolates) {
if (status != kOK && thread() != NULL) {
const Error& error = Error::Handle(thread()->sticky_error());
OS::PrintErr(
"[-] Stopping message handler (%s):\n"
"\thandler: %s\n"
"\terror: %s\n",
MessageStatusString(status), name(), error.ToCString());
} else {
OS::PrintErr(
"[-] Stopping message handler (%s):\n"
"\thandler: %s\n",
MessageStatusString(status), name());
}
}
pool_ = NULL;
// Decide if we have a callback before releasing the monitor.
end_callback = end_callback_;
callback_data = callback_data_;
run_end_callback = end_callback_ != NULL;
delete_me = delete_me_;
}

// Clear task_running_ last. This allows other tasks to potentially start
// for this message handler.
ASSERT(oob_queue_->IsEmpty());
task_running_ = false;
}

// The handler may have been deleted by another thread here if it is a native
// message handler.

// Message handlers either use delete_me or end_callback but not both.
ASSERT(!delete_me || !run_end_callback);

if (run_end_callback) {
ASSERT(end_callback != NULL);
end_callback(callback_data);
// The handler may have been deleted after this point.
}
if (delete_me) {
delete this;
}
}

可以看到,这里执行了MessageHandler::HandleMessages方法,来处理消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
// -> runtime\vm\message_handler.cc

MessageHandler::MessageStatus MessageHandler::HandleMessages(
MonitorLocker* ml,
bool allow_normal_messages,
bool allow_multiple_normal_messages) {
ASSERT(monitor_.IsOwnedByCurrentThread());

// Scheduling of the mutator thread during the isolate start can cause this
// thread to safepoint.
// We want to avoid holding the message handler monitor during the safepoint
// operation to avoid possible deadlocks, which can occur if other threads are
// sending messages to this message handler.
//
// If isolate() returns nullptr [StartIsolateScope] does nothing.
ml->Exit();
StartIsolateScope start_isolate(isolate());
ml->Enter();

auto idle_time_handler =
isolate() != nullptr ? isolate()->group()->idle_time_handler() : nullptr;

MessageStatus max_status = kOK;
Message::Priority min_priority =
((allow_normal_messages && !paused()) ? Message::kNormalPriority
: Message::kOOBPriority);
std::unique_ptr<Message> **message = DequeueMessage(min_priority);**
while (message != nullptr) {
intptr_t message_len = message->Size();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[<] Handling message:\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
message_len, name(), message->dest_port());
}

// Release the monitor_ temporarily while we handle the message.
// The monitor was acquired in MessageHandler::TaskCallback().
ml->Exit();
Message::Priority saved_priority = message->priority();
Dart_Port saved_dest_port = message->dest_port();
MessageStatus status = kOK;
{
DisableIdleTimerScope disable_idle_timer(idle_time_handler);
status = HandleMessage(std::move(message));
}
if (status > max_status) {
max_status = status;
}
ml->Enter();
if (FLAG_trace_isolates) {
OS::PrintErr(
"[.] Message handled (%s):\n"
"\tlen: %" Pd
"\n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
MessageStatusString(status), message_len, name(), saved_dest_port);
}
// If we are shutting down, do not process any more messages.
if (status == kShutdown) {
ClearOOBQueue();
break;
}

// Remember time since the last message. Don't consider OOB messages so
// using Observatory doesn't trigger additional idle tasks.
if ((FLAG_idle_timeout_micros != 0) &&
(saved_priority == Message::kNormalPriority)) {
if (idle_time_handler != nullptr) {
idle_time_handler->UpdateStartIdleTime();
}
}

// Some callers want to process only one normal message and then quit. At
// the same time it is OK to process multiple OOB messages.
if ((saved_priority == Message::kNormalPriority) &&
!allow_multiple_normal_messages) {
// We processed one normal message. Allow no more.
allow_normal_messages = false;
}

// Reevaluate the minimum allowable priority. The paused state
// may have changed as part of handling the message. We may also
// have encountered an error during message processing.
//
// Even if we encounter an error, we still process pending OOB
// messages so that we don't lose the message notification.
min_priority = (((max_status == kOK) && allow_normal_messages && !paused())
? Message::kNormalPriority
: Message::kOOBPriority);
message = DequeueMessage(min_priority);
}
return max_status;
}

MessageHandler::DequeueMessage则是按照优先级,依次从oob_queue_queue_中获取消息:

1
2
3
4
5
6
7
8
9
10
11
// -> runtime\vm\message_handler.cc

std::unique_ptr<Message> MessageHandler::DequeueMessage(
Message::Priority min_priority) {
// TODO(turnidge): Add assert that monitor_ is held here.
std::unique_ptr<Message> message = oob_queue_->Dequeue();
if ((message == nullptr) && (min_priority < Message::kOOBPriority)) {
message = queue_->Dequeue();
}
return message;
}

关于oob_queue_queue_ 的区别如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
// -> 

class MessageHandler {
MessageQueue* queue_;
MessageQueue* oob_queue_;
...
}

// -> runtime\vm\message.h

class Message {
// A message processed at any interrupt point (stack overflow check) instead
// of at the top of the message loop. Control messages from dart:isolate or
// vm-service requests.
bool IsOOB() const { return priority_ == Message::kOOBPriority; }

}

这里消息处理的步骤也启动了。

总结一下,Dart_RunLoopAsync的主要功能是触发isolate的message_handler处理消息分发:

Dart_RunLoopAsyncIsolate::Run()message_handler()->Run()pool_->Run<MessageHandlerTask> ThreadPool::RunImpl

ThreadPool::RunImpl(std::unique_ptr<Task> task)这里主要触发了2步:

  • ScheduleTaskLocked获取到new_worker
  • new_worker调用ThreadPool::Worker::StartThread()方法开启循环

然后根据是否创建了new_worker有两种情况:

  • new_worker,使用在OSThread::Start方法中创建了一个新的系统线程,执行ThreadPool::Worker::Main(这个方法的主要作用使用new_worker从线程池中的取出任务执行)
  • 没有new_worker,那么等待已有的Worker空闲时执行任务

无论如何,这里的Worker要执行的任务都是在MessageHandler::Run方法中指定的MessageHandlerTask ,而这个任务的内容便是开启MessageHandler::HandleMessages 方法,按照优先级不断的依次从oob_queue_queue_中获取消息并处理。

总结

Isolate是Dart代码运行的地方,拥有独立的event loop,和全局变量,在自己单独的线程运行。

Isolate.spawn默认会创建在同一个IsolateGroup中的Isolate,他们之间共享Heap(这里会发生GC)和一个线程池。

Isolate.spawnUri会从制定的Uri中创建一个新的IsolateGroup和对应的Isolate,并执行Uri中的main方法。

Isolate内部维持一个Event Loop。