Dart 读取文件过程分析

Dart读取文件时,先在Dart代码创建File引用,通过与IOServiceIsolate通信(先通过IO Service而发送请求到native端,等到native执行完操作之后再回调结果)从而实现对文件的读写。

实现一个简单的读取文件的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import 'dart:io';

main() {
var filePath =
r"G:/21996.1.210529-1541.co_release_CLIENT_CONSUMER_x64FRE_en-us.iso";

var file = File(filePath);

var startTime = printCurrentTimeMs("start run file.readAsBytes");
file.readAsBytes().then((value) {
printCurrentTimeMs("file.readAsBytes() finish",
lastTimeMs: startTime,
suffix: "\nfile.readAsBytes() result:${value.length}");
});
printCurrentTimeMs("finish run file.readAsBytes");
}

int printCurrentTimeMs(String prefix, {String? suffix, int? lastTimeMs}) {
var currentTimeMs = DateTime.now().millisecondsSinceEpoch;
var timeElapseString =
lastTimeMs == null ? "" : ", time elapse:${currentTimeMs - lastTimeMs}ms ";
print(
"$prefix current time($currentTimeMs)$timeElapseString${suffix ?? ""}");
return currentTimeMs;
}

整个过程如下:

过程分析

Dart端发起文件读写请求

其中file.readAsBytes() 是具体执行读取文件的地方,他的定义如下:

1
2
// -> sdk\lib\io\file_impl.dart
Future<Uint8List> readAsBytes();

在我们创建File时,实际上创建的是_Fileclass _File extends FileSystemEntity implements File)对象:

1
2
3
4
5
6
7
8
9
10
11
// -> sdk\lib\io\file_impl.dart

// abstract class File implements FileSystemEntity
@pragma("vm:entry-point")
factory File(String path) {
final IOOverrides? overrides = IOOverrides.current;
if (overrides == null) {
return new _File(path);
}
return overrides.createFile(path);
}

_FileFile 的实现类,所以file.readAsBytes()实际调用的是_File 实现的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
// -> sdk\lib\io\file_impl.dart

// Read the file in blocks of size 64k.
const int _blockSize = 64 * 1024;

class _File extends FileSystemEntity implements File {

Future<Uint8List> readAsBytes() {
Future<Uint8List> readDataChunked(RandomAccessFile file) {
// 分段读取文件,每次只读取_blockSize大小的内容
var builder = new BytesBuilder(copy: false);
var completer = new Completer<Uint8List>();
void read() {
// 每次只异步读取一部分文本
file.read(_blockSize).then((data) {
if (data.length > 0) {
builder.add(data);
read();
} else {
completer.complete(builder.takeBytes());
}
}, onError: completer.completeError);
}

read();
return completer.future;
}

return open().then((file) {
return file.length().then((length) {
if (length == 0) {
// May be character device, try to read it in chunks.
return readDataChunked(file);
}
return file.read(length);
}).whenComplete(file.close);
});
}

}

可以看到,无论是普通的文件格式,还是character device,最后都是调用了_RandomAccessFileopen()read(int bytes)方法异步读取文件。

设备文件分为Block Device Driver和Character Device Drive两类。
Character Device Driver又被称为字符设备或裸设备raw devices; Block Device Driver通常成为块设备。
而Block Device Driver是以固定大小长度来传送转移资料 ; Character Device Driver是以不定长度的字元传送资料。 https://www.cnblogs.com/qlee/archive/2011/07/27/2118406.html#:~:text=Character

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// -> flutter\bin\cache\pkg\sky_engine\lib\io\file_impl.dart

class _RandomAccessFile implements RandomAccessFile {
final String path;

bool _asyncDispatched = false;
// 读取文件的信息
late _FileResourceInfo _resourceInfo;
// 对文件的操作引用
_RandomAccessFileOps _ops;

@pragma("vm:entry-point")
_RandomAccessFile(int pointer, this.path)
: _ops = new _RandomAccessFileOps(pointer) {
_resourceInfo = new _FileResourceInfo(this);
_maybeConnectHandler();
}

// 异步读取文件
Future<Uint8List> read(int bytes) {
// TODO(40614): Remove once non-nullability is sound.
ArgumentError.checkNotNull(bytes, "bytes");
// 异步读取文件,实际上是将发送指令到IO Service,然后等待返回结果
return _dispatch(_IOService.fileRead, [null, bytes]).then((response) {
if (_isErrorResponse(response)) {
throw _exceptionFromResponse(response, "read failed", path);
}
_resourceInfo.addRead(response[1].length);
// 读取的文件内容
Uint8List result = response[1];
return result;
});
}

// 同步读取文件
Uint8List readSync(int bytes) {
// TODO(40614): Remove once non-nullability is sound.
ArgumentError.checkNotNull(bytes, "bytes");
_checkAvailable();
// 同步读取文件是对文件直接操作
var result = _ops.read(bytes);
if (result is OSError) {
throw new FileSystemException("readSync failed", path, result);
}
_resourceInfo.addRead(result.length);
return result;
}

Future<RandomAccessFile> open({FileMode mode = FileMode.read}) {
// FileMode https://github.com/dart-lang/sdk/blob/main/sdk/lib/io/io_service.dart
if (mode != FileMode.read &&
mode != FileMode.write &&
mode != FileMode.append &&
mode != FileMode.writeOnly &&
mode != FileMode.writeOnlyAppend) {
return new Future.error(
new ArgumentError('Invalid file mode for this operation'));
}
return _dispatchWithNamespace(
// 请求操作为“打开文件”,参数为:null,文件路径,操作文件的mode
_IOService.fileOpen, [null, _rawPath, mode._mode]).then((response) {
if (_isErrorResponse(response)) {
throw _exceptionFromResponse(response, "Cannot open file", path);
}
// 从IO Service那里异步获得文件句柄response和path
return new _RandomAccessFile(response, path);
});
}

}

_RandomAccessFile中,除了同步读写文件是对返回的文件引用直接操作外,很多操作都能看到通过_dispatch()方法与IO Service通信,让我们看一下这个方法的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
// -> sdk\lib\io\file_impl.dart

// _RandomAccessFile
Future _dispatch(int request, List data, {bool markClosed = false}) {
if (closed) {
return new Future.error(new FileSystemException("File closed", path));
}
if (_asyncDispatched) {
var msg = "An async operation is currently pending";
return new Future.error(new FileSystemException(msg, path));
}
if (markClosed) {
// Set closed to true to ensure that no more async requests can be issued
// for this file.
closed = true;
}
_asyncDispatched = true;
data[0] = _pointer();
// 主要代码在这里,通过_IOService的_dispatch发送指令
return **_IOService._dispatch**(request, data).whenComplete(() {
_asyncDispatched = false;
});
}

// open create之类的操作会调用这个方法,不过最后也是调用_IOService._dispatch(request, data)通信
static Future _dispatchWithNamespace(int request, List data) {
data[0] = _namespacePointer();
// 与IO Service进行异步通信,request标记请求操作的类型,data则是数据
return **_IOService._dispatch**(request, data);
}

查阅_IOService的源码后发现这是个external 方法.

1
external static Future _dispatch(int request, List data);

An external function is connected to its body by an implementation-specific mechanism. Attempting to invoke an external function that has not been connected to its body will throw a NoSuchMethodError or some subclass thereof.
****https://github.com/dart-lang/sdk/issues/4300

根据external的定义,_dispatch方法在不同的机器上面实现不同。我们只看和app相关的实现(在sdk\lib\_internal\vm目录下,vm同级目录还有js等实现),具体的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// -> sdk\lib\_internal\vm\bin\io_service_patch.dart

// _IOService
class _IOService {
// 用于向IO Service发送消息
static _IOServicePorts _servicePorts = new _IOServicePorts();
// We use a static variable here to hold onto the last result of
// calling the IO Service frome the native.
static RawReceivePort? _receivePort;
// the other side(other isolate) will send message back with the _replyToPort
static late SendPort _replyToPort;
// a map holding the registered callbacks for each received message.
static HashMap<int, Completer> _messageMap = new HashMap<int, Completer>();
static int _id = 0;

/// [request] IO操作的类型,具体值在[sdk/lib/io/io_service.dart]中的_IOService类中定义
/// 主要有对文件、目录、网络进行操作的请求
/// [data] 对应的数据,如果是文件,则是文件路径,如果是目录,则是目录路径等等
@patch
static Future _dispatch(int request, List data) {
int id;
do {
// create a special id to identify the request.
id = _getNextId();
} while (_messageMap.containsKey(id));
// 通过_servicePorts获取一个新的SendPort以便向IOService发送消息,
// 这个SendPort是IO Service返回给dart用来向他发消息的
final SendPort servicePort = _servicePorts._getPort(id);
_ensureInitialize();
final Completer completer = new Completer();
_messageMap[id] = completer;
try {
// 向IOService发送消息,当request执行完毕之后,
// 会调用_replyToPort触发在root zone的回调_receivePort!.handler
**servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
} catch (error) {
_messageMap.remove(id)!.complete(error);
if (_messageMap.length == 0) {
_finalize();
}
}
return completer.future;
}

static void _ensureInitialize() {
if (_receivePort == null) {
_receivePort = new RawReceivePort(null, 'IO Service');
// 其他地方可以使用_replyToPort来发消息触发_receivePort 执行handler方法
_replyToPort = _receivePort!.sendPort;
_receivePort!.handler = (data) {
// 在这里处理IOService执行完方法返回的数据
assert(data is List && data.length == 2);
// data[0]就是我们在_dispatch方法中获取的id,
// 将处理结果data[1]通过Completer.complete返回
_messageMap.remove(data[0])!.complete(data[1]);
// 释放这个触发这个回调的SendPort
_servicePorts._returnPort(data[0]);
if (_messageMap.length == 0) {
_finalize();
}
};
}
}
...
}

可以看到,最后是通过RawReceivePort/SendPort进行跨Isolate通信

_IOService使用_servicePorts对native层发送消息触发IO操作,然后使用_receivePort监听,当IO操作完成时会通过_replyToPort 回调结果,会在 _receivePort!.handler方法中根据当时请求的id找到Completer将结果传递回去。

这样当时我们在 file.readAsBytes()时获取到的Future便会收到回调,从而完成文件操作的流程。

1
2
3
4
5
file.readAsBytes().then((value) {
printCurrentTimeMs("file.readAsBytes() finish",
lastTimeMs: startTime,
suffix: "\nfile.readAsBytes() result:${value.length}");
});

下面是到目前为止涉及到的类关系示意图:

IO Service中转

那么,这个IO Service是做什么的,他又是如何实现与dart中的调用方双向通信,以及执行调用方需要的功能呢?

位于sdk\lib\_internal\vm\bin\io_service_patch.dart的_IOService是一个中转站,向上承接来自Dart代码的IO请求指令(先行返回Future),向下将这些指令转发至Native层的IO Service,并监听回调,当native层处理完这些IO指令之后,将结果通过Future返回给Dart调用方。

让我们再看一下他的具体实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
// -> sdk\lib\_internal\vm\bin\io_service_patch.dart

// _IOService
class _IOService {
// 用于向IO Service发送消息
static _IOServicePorts _servicePorts = new _IOServicePorts();
// We use a static variable here to hold onto the last result of
// calling the IO Service frome the native.
static RawReceivePort? _receivePort;
// the other side(other isolate) will send message back with the _replyToPort
static late SendPort _replyToPort;
// a map holding the registered callbacks for each received message.
static HashMap<int, Completer> _messageMap = new HashMap<int, Completer>();
static int _id = 0;

/// [request] IO操作的类型,具体值在[sdk/lib/io/io_service.dart]中的_IOService类中定义
/// 主要有对文件、目录、网络进行操作的请求
/// [data] 对应的数据,如果是文件,则是文件路径,如果是目录,则是目录路径等等
@patch
static Future _dispatch(int request, List data) {
int id;
do {
// create a special id to identify the request.
id = _getNextId();
} while (_messageMap.containsKey(id));
// 通过_servicePorts获取一个新的SendPort以便向IOService发送消息,
// 这个SendPort是IO Service返回给dart用来向他发消息的
final SendPort **servicePort** = _servicePorts.**_getPort(id);**
_ensureInitialize();
final Completer completer = new Completer();
_messageMap[id] = completer;
try {
// 向IOService发送消息,当request执行完毕之后,
// 会调用_replyToPort触发在root zone的回调_receivePort!.handler
**servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
} catch (error) {
_messageMap.remove(id)!.complete(error);
if (_messageMap.length == 0) {
_finalize();
}
}
return completer.future;
}

static void _ensureInitialize() {
if (_receivePort == null) {
_receivePort = new RawReceivePort(null, 'IO Service');
// 其他地方可以使用_replyToPort来发消息触发_receivePort 执行handler方法
_replyToPort = _receivePort!.sendPort;
_receivePort!.handler = (data) {
// 在这里处理IOService执行完方法返回的数据
assert(data is List && data.length == 2);
// data[0]就是我们在_dispatch方法中获取的id,
// 将处理结果data[1]通过Completer.complete返回
_messageMap.remove(data[0])!.complete(data[1]);
// 释放这个触发这个回调的SendPort
_servicePorts._returnPort(data[0]);
if (_messageMap.length == 0) {
_finalize();
}
};
}
}
...
}

可以看到:

  • _IOService持有_IOServicePorts _servicePorts以便获取SendPort servicePort和native层通信,
  • 在之前的代码分析中,我们已经知道_IOService还在_ensureInitialize()中监听着RawReceivePort? _receivePort的回调,
  • 这样当_IOService_dispatch()方法中将_replyToPort_receivePort的SendPort)传递给servicePort后,一旦native通过_replyToPort发送处理结果,_IOService立马可以收到并通过Completer.complete返回给Dart中的调用方

上述这些步骤能够实施的关键,在于Dart层的_IOService如何与native层的_IOService关联起来呢?

让我们来分析一下SendPort servicePort的获取过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
// -> sdk\lib\_internal\vm\bin\io_service_patch.dart

class _IOService {
// 用于向IO Service发送消息
static _IOServicePorts _servicePorts = new _IOServicePorts();
...
}

class _IOServicePorts {
// We limit the number of IO Service ports per isolate so that we don't
// spawn too many threads all at once, which can crash the VM on Windows.
static const int maxPorts = 32;
List<SendPort> _ports = <SendPort>[];
List<SendPort> _freePorts = <SendPort>[];
Map<int, SendPort> _usedPorts = new HashMap<int, SendPort>();

_IOServicePorts();

SendPort _getPort(int forRequestId) {
if (_freePorts.isEmpty && _usedPorts.length < maxPorts) {
// 如果没有可用的SendPort,就新建SendPort用于远程服务通信
final SendPort port = **_newServicePort()**;
_ports.add(port);
_freePorts.add(port);
}
if (!_freePorts.isEmpty) {
// 有空闲SendPort,使用
final SendPort port = _freePorts.removeLast();
assert(!_usedPorts.containsKey(forRequestId));
_usedPorts[forRequestId] = port;
return port;
}
// We have already allocated the max number of ports. Re-use an
// existing one.
final SendPort port = _ports[forRequestId % maxPorts];
_usedPorts[forRequestId] = port;
return port;
}

// 释放掉占用的port
void _returnPort(int forRequestId) {
final SendPort port = _usedPorts.remove(forRequestId)!;
if (!_usedPorts.values.contains(port)) {
_freePorts.add(port);
}
}

@pragma("vm:external-name", "IOService_NewServicePort")
external static SendPort _newServicePort();
}

可以看到这里最后的关键方法是SendPort _newServicePort(),这是一个external方法,在native实现。

Native处理Dart的指令

IOService_NewServicePort

SendPort是由_newServicePort()方法创建的,这是一个external方法,他的native层实现名称是IOService_NewServicePort

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
// -> runtime\bin\io_service.cc

void FUNCTION_NAME(IOService_NewServicePort)(Dart_NativeArguments args) {
Dart_SetReturnValue(args, Dart_Null());
// 创建一个新的native port
Dart_Port service_port = **IOService::GetServicePort();**
if (service_port != ILLEGAL_PORT) {
// 【注意】这里根据service_port创建了Dart里面的SendPort对象
// Return a send port for the service port.
Dart_Handle send_port = Dart_NewSendPort(service_port);
// 将当前IOService对应的send_port返回给调用方
Dart_SetReturnValue(args, send_port);
}
}

Dart_Port IOService::GetServicePort() {
// 注意这里的参数
// 分别是 native port的名称,收到native port以后得回调方法,是否同时处理
return **Dart_NewNativePort("IOService", IOServiceCallback, true);**
}

// -> runtime\include\dart_api.h

/**
* Returns a new SendPort with the provided port id.
*
* \param port_id The destination port.
*
* \return A new SendPort if no errors occurs. Otherwise returns
* an error handle.
*/
DART_EXPORT Dart_Handle Dart_NewSendPort(Dart_Port port_id);

注意,在Dart层的_IOServiceSendPort _newServicePort() 方法最后再这里调用了IOService_NewServicePort

这里主要有3个步骤:

  1. 使用Dart_NewNativePort("IOService", IOServiceCallback, true);创建Dart_Port
  2. 使用Dart_NewSendPortDart_Port转化为Dart_Handle(也就是Dart中的SendPort
  3. 返回上面创建好的Dart_Handle,Dart代码拿到返回的Dart_Handle也就是SendPort servicePort之后,就可以和native层的IO Service同通信。

接下来我们看一下前2步分别是怎么实现的:

Dart_NewNativePort

再看一下Dart_NewNativePort的调用参数:

1
2
3
4
5
6
7
8
9
10
Dart_NewNativePort("IOService", IOServiceCallback, true);

// -> runtime\include\dart_native_api.h

// Creates a new native port. When messages are received on this
// native port, then they will be dispatched to the provided native
// message handler.
DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
Dart_NativeMessageHandler handler,
bool handle_concurrently);

IOServiceCallback

Dart_NewNativePort总共有3个参数,Dart_NativeMessageHandler handler是当这个Dart_Port收到消息的时候,会被回调的方法,也就是我们通过Dart端的_IOService.dispatch方法的**servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);语句执行向native发送IO指令时,在native这里真正负责执行的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// -> runtime\bin\io_service.cc

void IOServiceCallback(Dart_Port dest_port_id, Dart_CObject* message) {
Dart_Port reply_port_id = ILLEGAL_PORT;
CObject* response = CObject::IllegalArgumentError();
CObjectArray request(message);
// 这里的参数顺序,与Dart层的_IOService(sdk\lib\_internal\vm\bin\io_service_patch.dart)的_dispatch()中的
// **servicePort**.send(<dynamic>[id, **_replyToPort**, request, data]);
// 代码中的参数顺序一致
if ((message->type == Dart_CObject_kArray) && (request.Length() == 4) &&
request[0]->IsInt32() && request[1]->IsSendPort() &&
request[2]->IsInt32() && request[3]->IsArray()) {
CObjectInt32 message_id(request[0]);
CObjectSendPort **reply_port**(request[1]);
CObjectInt32 request_id(request[2]);
CObjectArray data(request[3]);
**reply_port_id** = **reply_port**.Value();
// 这里解析完收到的参数后,回去执行对应的文件操作
switch (request_id.Value()) {
**IO_SERVICE_REQUEST_LIST(CASE_REQUEST);**
default:
UNREACHABLE();
}
}

CObjectArray result(CObject::NewArray(2));
result.SetAt(0, request[0]);
// response在上面的IO_SERVICE_REQUEST_LIST执行完毕后就会被赋值
result.SetAt(1, **response**);
ASSERT(reply_port_id != ILLEGAL_PORT);
**Dart_PostCObject(reply_port_id, result.AsApiCObject());**
}

#define **CASE_REQUEST**(type, method, id) \
case IOService::k##type##method##Request: \
response = type::method##Request(data); \
break;

IOService具体的执行是在IO_SERVICE_REQUEST_LIST根据解析到的参数执行对应的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// -> runtime\bin\io_service.h

// This list must be kept in sync with the list in sdk/lib/io/io_service.dart
#define IO_SERVICE_REQUEST_LIST(V) \
V(File, Exists, 0) \
V(File, Create, 1) \
V(File, Delete, 2) \
V(File, Rename, 3) \
V(File, Copy, 4) \
V(File, Open, 5) \
V(File, ResolveSymbolicLinks, 6) \
V(File, Close, 7) \
V(File, Position, 8) \
V(File, SetPosition, 9) \
V(File, Truncate, 10) \
V(File, Length, 11) \
V(File, LengthFromPath, 12) \
V(File, LastAccessed, 13) \
V(File, SetLastAccessed, 14) \
V(File, LastModified, 15) \
V(File, SetLastModified, 16) \
V(File, Flush, 17) \
V(File, ReadByte, 18) \
V(File, WriteByte, 19) \
V(File, Read, 20) \
V(File, ReadInto, 21) \
V(File, WriteFrom, 22) \
V(File, CreateLink, 23) \
V(File, DeleteLink, 24) \
V(File, RenameLink, 25) \
V(File, LinkTarget, 26) \
V(File, Type, 27) \
V(File, Identical, 28) \
V(File, Stat, 29) \
V(File, Lock, 30) \
V(Socket, Lookup, 31) \
V(Socket, ListInterfaces, 32) \
V(Socket, ReverseLookup, 33) \
V(Directory, Create, 34) \
V(Directory, Delete, 35) \
V(Directory, Exists, 36) \
V(Directory, CreateTemp, 37) \
V(Directory, ListStart, 38) \
V(Directory, ListNext, 39) \
V(Directory, ListStop, 40) \
V(Directory, Rename, 41) \
V(SSLFilter, ProcessFilter, 42)

通过上述代码,可以得知,IOService主要处理的方法有四类:

  • File
  • Directory
  • Socket
  • SSLFilter

IOServiceCallback方法中,我们注意到,程序最后执行的结果是通过Dart_PostCObject返回的,来看一下他是怎么实现的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
// -> runtime\vm\native_api_impl.cc

static bool PostCObjectHelper(Dart_Port port_id, Dart_CObject* message) {
AllocOnlyStackZone zone;
std::unique_ptr<Message> msg = WriteApiMessage(
zone.GetZone(), message, port_id, Message::kNormalPriority);

if (msg == nullptr) {
return false;
}

// Post the message at the given port.
return **PortMap::PostMessage(std::move(msg));**
}

DART_EXPORT bool Dart_PostCObject(Dart_Port port_id, Dart_CObject* message) {
return PostCObjectHelper(port_id, message);
}

// -> runtime\vm\port.cc

bool PortMap::PostMessage(std::unique_ptr<Message> message,
bool before_events) {
MutexLocker ml(mutex_);
if (ports_ == nullptr) {
return false;
}
auto it = ports_->TryLookup(message->dest_port());
if (it == ports_->end()) {
// Ownership of external data remains with the poster.
message->DropFinalizers();
return false;
}
MessageHandler* handler = (*it).handler;
ASSERT(handler != nullptr);
**handler->PostMessage(std::move(message), before_events);**
return true;
}

// -> runtime\vm\message_handler.cc

void MessageHandler::PostMessage(std::unique_ptr<Message> message,
bool before_events) {
Message::Priority saved_priority;

{
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
Isolate* source_isolate = Isolate::Current();
if (source_isolate != nullptr) {
OS::PrintErr(
"[>] Posting message:\n"
"\tlen: %" Pd "\n\tsource: (%" Pd64
") %s\n\tdest: %s\n"
"\tdest_port: %" Pd64 "\n",
message->Size(), static_cast<int64_t>(source_isolate->main_port()),
source_isolate->name(), name(), message->dest_port());
} else {
OS::PrintErr(
"[>] Posting message:\n"
"\tlen: %" Pd
"\n\tsource: <native code>\n"
"\tdest: %s\n"
"\tdest_port: %" Pd64 "\n",
message->Size(), name(), message->dest_port());
}
}

saved_priority = message->priority();
// **将Message加入到MessageQueue中**
if (message->IsOOB()) {
oob_queue_->Enqueue(std::move(message), before_events);
} else {
queue_->Enqueue(std::move(message), before_events);
}
if (paused_for_messages_) {
ml.Notify();
}

if (pool_ != nullptr && !task_running_) {
ASSERT(!delete_me_);
task_running_ = true;
const bool launched_successfully = pool_->Run<MessageHandlerTask>(this);
ASSERT(launched_successfully);
}
}

// Invoke any custom message notification.
MessageNotify(saved_priority);
}

上述代码最后将结果包装成了Message打包进MessageHandler消息队列中,这样便可以在Dart端通过消息分发接收到结果。

Dart_NewNativePort

再来看一下Dart_NewNativePort的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
// -> runtime\vm\native_api_impl.cc

DART_EXPORT Dart_Port Dart_NewNativePort(const char* name,
Dart_NativeMessageHandler handler,
bool handle_concurrently) {
if (name == NULL) {
name = "<UnnamedNativePort>";
}
if (handler == NULL) {
OS::PrintErr("%s expects argument 'handler' to be non-null.\n",
CURRENT_FUNC);
return ILLEGAL_PORT;
}
// 此方法位于sdk/runtime/vm/dart.cc
// Used to Indicate that a Dart API call is active.
if (!Dart::SetActiveApiCall()) {
return ILLEGAL_PORT;
}
// 【注意,这里切换了isolate,退出当前isolate,直到Dart_NewNativePort执行完毕再切换回当前isolate】
// Start the native port without a current isolate.
// 这里的实现可以参考https://github.com/dart-lang/sdk/blob/d437877c500c77d6e08372ba2dbda9c598f5bd8e/runtime/vm/dart_api_impl.cc
**IsolateLeaveScope saver(Isolate::Current());**
// 执行完IsolateLeaveScope 后,会切换出当前isolate直到下面的 return port_id;执行完毕,但是在此期间,下面的代码依旧是在当前isolate所在的IOThread也即系统线程下进行的

NativeMessageHandler* nmh = **new NativeMessageHandler(name, handler);**
// 创建一个Dart_Port并且添加到PortMap中
Dart_Port port_id = **PortMap::CreatePort(nmh);**
if (port_id != ILLEGAL_PORT) {
// 激活这个端口
PortMap::SetPortState(port_id, PortMap::kLivePort);
// 在Dart线程池中执行,在这里Run()中的代码会在一个新的线程中执行
if (!**nmh->Run**(**Dart::thread_pool()**, NULL, NULL, 0)) {
// 执行完毕之后,在之前调用本方法的环境,回调handler,关闭Dart_Port
PortMap::ClosePort(port_id);
port_id = ILLEGAL_PORT;
}
}
Dart::ResetActiveApiCall();
return port_id;
// 上面IsolateLeaveScope saver对象在构造方法中退出了调用方法的分支,执行到这里后saver对象被回收,执行析构函数,又将Isolate切换回来
}

主要的流程有:

  1. 切换退出当前isolate
  2. 创建NativeMessageHandler nmh包裹要处理的回调
  3. 根据上面创建的nmh创建Dart_Port port_id
  4. 执行**nmh->Run()**方法将nmh放到线程池中运行
  5. nmh执行完毕回调后,关闭Dart_Port port_id

也就是说,在Dart中向Native发送指令时,通过Dart的_IOService._dispatch()方法中执行_servicePorts._getPort(id);向Native层的IOService获取用于通信的SendPort servicePort时,会先通过Dart_NewNativePort创建一个NativeMessageHandler(会压入消息栈中),然后创建一个对应的Dart_Port port_id并返回给Dart用来触发消息。

让我们挨个分析一下:

1.退出当前isolate

IsolateLeaveScope

2.创建NativeMessageHandler nmh包裹要处理的回调

3.根据上面创建的nmh创建Dart_Port port_id

看一下PortMap::CreatePort的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
// -> runtime\vm\port.cc

Dart_Port PortMap::CreatePort(MessageHandler* handler) {
ASSERT(handler != NULL);
MutexLocker ml(mutex_);
if (ports_ == nullptr) {
return ILLEGAL_PORT;
}

#if defined(DEBUG)
handler->CheckAccess();
#endif

// 不停的遍历,直到找到一个可用的port(类型为int64_t)
const Dart_Port port = AllocatePort();

// 获取到的port 只能通过isolate_entry访问
// The MessageHandler::ports_ is only accessed by [PortMap], it is guarded
// by the [PortMap::mutex_] we already hold.
MessageHandler::PortSetEntry isolate_entry;
isolate_entry.port = port;
handler->ports_.Insert(isolate_entry);

Entry entry;
entry.port = port;
entry.handler = handler;
entry.state = kNewPort;
ports_->Insert(entry);

if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Opening port: \n"
"\thandler: %s\n"
"\tport: %" Pd64 "\n",
handler->name(), entry.port);
}

return entry.port;
}

Dart_Port PortMap::AllocatePort() {
Dart_Port result;

ASSERT(mutex_->IsOwnedByCurrentThread());

// Keep getting new values while we have an illegal port number or the port
// number is already in use.
do {
// Ensure port ids are representable in JavaScript for the benefit of
// vm-service clients such as Observatory.
const Dart_Port kMask1 = 0xFFFFFFFFFFFFF;
// Ensure port ids are never valid object pointers so that reinterpreting
// an object pointer as a port id never produces a used port id.
const Dart_Port kMask2 = 0x3;
result = (prng_->NextUInt64() & kMask1) | kMask2;

// The two special marker ports are used for the hashset implementation and
// cannot be used as actual ports.
if (result == PortSet<Entry>::kFreePort ||
result == PortSet<Entry>::kDeletedPort) {
continue;
}

ASSERT(!static_cast<ObjectPtr>(static_cast<uword>(result))->IsWellFormed());
} while (ports_->Contains(result));

ASSERT(result != 0);
ASSERT(!ports_->Contains(result));
return result;
}

4.执行**nmh->Run()**方法将nmh放到线程池中运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207

// -> runtime\vm\message_handler.cc
ThreadPool* pool_;
bool MessageHandler::Run(ThreadPool* pool,
StartCallback start_callback,
EndCallback end_callback,
CallbackData data) {
MonitorLocker ml(&monitor_);
if (FLAG_trace_isolates) {
OS::PrintErr(
"[+] Starting message handler:\n"
"\thandler: %s\n",
name());
}
ASSERT(pool_ == NULL);
ASSERT(!delete_me_);
pool_ = pool;
start_callback_ = start_callback;
end_callback_ = end_callback;
callback_data_ = data;
task_running_ = true;
// 在Dart VM Thread的线程池中执行MessageHandler,会是一个新的线程
bool result = pool_->Run<MessageHandlerTask>(this);
if (!result) {
pool_ = nullptr;
start_callback_ = nullptr;
end_callback_ = nullptr;
callback_data_ = 0;
task_running_ = false;
}
return result;
}

// 会在“线程池”运行的时候执行对应的MessageHandler回调
class MessageHandlerTask : public ThreadPool::Task {
public:
explicit MessageHandlerTask(MessageHandler* handler) : handler_(handler) {
ASSERT(handler != NULL);
}

virtual void Run() {
ASSERT(handler_ != NULL);
// 执行具体的逻辑
handler_->TaskCallback();
}

private:
MessageHandler* handler_;

DISALLOW_COPY_AND_ASSIGN(MessageHandlerTask);
};

// -> runtime\include\dart_api.h

// A port is used to send or receive inter-isolate messages
typedef int64_t Dart_Port;

// -> runtime\vm\thread_pool.h

// Runs a task on the thread pool.
template <typename T, typename... Args>
bool Run(Args&&... args) {
return RunImpl(std::unique_ptr<Task>(new T(std::forward<Args>(args)...)));
}

// -> runtime\vm\thread_pool.cc
bool ThreadPool::RunImpl(std::unique_ptr<Task> task) {
Worker* new_worker = nullptr;
{
MonitorLocker ml(&pool_monitor_);
if (shutting_down_) {
return false;
}
// 创建新的Worker
new_worker = ScheduleTaskLocked(&ml, std::move(task));
}
if (new_worker != nullptr) {
// 在线程中执行task
new_worker->StartThread();
}
return true;
}

// -> runtime\vm\thread_pool.cc

// 创建一个Worker
ThreadPool::Worker* ThreadPool::ScheduleTaskLocked(MonitorLocker* ml,
std::unique_ptr<Task> task) {
// Enqueue the new task.
tasks_.Append(task.release());
pending_tasks_++;
ASSERT(pending_tasks_ >= 1);

// Notify existing idle worker (if available).
if (count_idle_ >= pending_tasks_) {
ASSERT(!idle_workers_.IsEmpty());
ml->Notify();
return nullptr;
}

// If we have maxed out the number of threads running, we will not start a
// new one.
if (max_pool_size_ > 0 && (count_idle_ + count_running_) >= max_pool_size_) {
if (!idle_workers_.IsEmpty()) {
ml->Notify();
}
return nullptr;
}

// Otherwise start a new worker.
auto new_worker = new Worker(this);
idle_workers_.Append(new_worker);
count_idle_++;
return new_worker;
}

// 新建的Woker和ThreadPool绑定
ThreadPool::Worker::Worker(ThreadPool* pool)
: pool_(pool), join_id_(OSThread::kInvalidThreadJoinId) {}

// new_worker->StartThread();会调用下面的方法
void ThreadPool::Worker::StartThread() {
int result = OSThread::Start("DartWorker", &Worker::Main,
reinterpret_cast<uword>(this));
if (result != 0) {
FATAL1("Could not start worker thread: result = %d.", result);
}
}

// OSThread::Start每个端不一样,我们选择Android端的实现
// -> runtime\vm\os_thread_android.cc

int OSThread::Start(const char* name,
ThreadStartFunction function,
uword parameter) {
pthread_attr_t attr;
int result = pthread_attr_init(&attr);
RETURN_ON_PTHREAD_FAILURE(result);

result = pthread_attr_setstacksize(&attr, OSThread::GetMaxStackSize());
RETURN_ON_PTHREAD_FAILURE(result);

ThreadStartData* data = new ThreadStartData(name, function, parameter);
// 声明系统线程类型
pthread_t tid;
// 调用系统创建线程的函数 https://blog.csdn.net/liangxanhai/article/details/7767430
// pthread_create参数含义:1. &tid 指向线程的指针,2. &attr 新建线程的属性 3. ThreadStart线程要执行的方法指针 4. data传给参数ThreadStart的参数
// 成功执行线程则返回0
result = pthread_create(&tid, &attr, ThreadStart, data);
RETURN_ON_PTHREAD_FAILURE(result);

result = pthread_attr_destroy(&attr);
RETURN_ON_PTHREAD_FAILURE(result);

return 0;
}

// Dispatch to the thread start function provided by the caller. This trampoline
// is used to ensure that the thread is properly destroyed if the thread just
// exits.
static void* ThreadStart(void* data_ptr) {
if (FLAG_worker_thread_priority != kMinInt) {
if (setpriority(PRIO_PROCESS, gettid(), FLAG_worker_thread_priority) ==
-1) {
FATAL2("Setting thread priority to %d failed: errno = %d\n",
FLAG_worker_thread_priority, errno);
}
}

ThreadStartData* data = reinterpret_cast<ThreadStartData*>(data_ptr);

const char* name = data->name();
OSThread::ThreadStartFunction function = data->function();
uword parameter = data->parameter();
delete data;

// Set the thread name. There is 16 bytes limit on the name (including \0).
// pthread_setname_np ignores names that are too long rather than truncating.
char truncated_name[16];
snprintf(truncated_name, ARRAY_SIZE(truncated_name), "%s", name);
pthread_setname_np(pthread_self(), truncated_name);

// 创建一个系统线程的包装类OSThread和新建的系统线程绑定
// Create new OSThread object and set as TLS for new thread.
OSThread* thread = OSThread::CreateOSThread();
if (thread != NULL) {
// 将线程切换到新创建的系统线程
OSThread::SetCurrent(thread);
thread->set_name(name);
UnblockSIGPROF();
// Call the supplied thread start function handing it its parameters.
// 执行创建ThreadStartData时传入的方法,也就是ThreadPool::Worker::Main(uword args)
function(parameter);
}

return NULL;

OSThread* OSThread::CreateOSThread() {
ASSERT(thread_list_lock_ != NULL);
MutexLocker ml(thread_list_lock_);
if (!creation_enabled_) {
return NULL;
}
OSThread* os_thread = new OSThread();
AddThreadToListLocked(os_thread);
return os_thread;
}

在创建了新的系统线程后,会执行下面的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
// -> runtime\vm\thread_pool.cc

void ThreadPool::Worker::Main(uword args) {
// Call the thread start hook here to notify the embedder that the
// thread pool thread has started.
Dart_ThreadStartCallback start_cb = Dart::thread_start_callback();
if (start_cb != nullptr) {
start_cb();
}

OSThread* os_thread = OSThread::Current();
ASSERT(os_thread != nullptr);

Worker* worker = reinterpret_cast<Worker*>(args);
ThreadPool* pool = worker->pool_;
// 将Worker和系统线程绑定
os_thread->owning_thread_pool_worker_ = worker;
worker->os_thread_ = os_thread;

// Once the worker quits it needs to be joined.
worker->join_id_ = OSThread::GetCurrentThreadJoinId(os_thread);

#if defined(DEBUG)
{
MonitorLocker ml(&pool->pool_monitor_);
ASSERT(pool->idle_workers_.ContainsForDebugging(worker));
}
#endif

pool->WorkerLoop(worker);

worker->os_thread_ = nullptr;
os_thread->owning_thread_pool_worker_ = nullptr;

// Call the thread exit hook here to notify the embedder that the
// thread pool thread is exiting.
Dart_ThreadExitCallback exit_cb = Dart::thread_exit_callback();
if (exit_cb != nullptr) {
exit_cb();
}
}

// -> runtime\vm\os_thread.h
// OSThread
// The ThreadPool::Worker which owns this OSThread. If this OSThread was not
// started by a ThreadPool it will be nullptr. This TLS value is not
// protected and should only be read/written by the OSThread itself.
void* owning_thread_pool_worker_ = nullptr;

// thread_list_lock_ cannot have a static lifetime because the order in which
// destructors run is undefined. At the moment this lock cannot be deleted
// either since otherwise, if a thread only begins to run after we have
// started to run TLS destructors for a call to exit(), there will be a race
// on its deletion in CreateOSThread().
static Mutex* thread_list_lock_;

Dart_NewSendPort

看一下Dart_NewSendPort如何将创建好的Dart_Port service_port转变为Dart的SendPort的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
// -> runtime\vm\dart_api_impl.cc

DART_EXPORT Dart_Handle Dart_NewSendPort(Dart_Port port_id) {
DARTSCOPE(Thread::Current());
CHECK_CALLBACK_STATE(T);
if (port_id == ILLEGAL_PORT) {
return Api::NewError("%s: illegal port_id %" Pd64 ".", CURRENT_FUNC,
port_id);
}
return Api::NewHandle(T, SendPort::New(port_id));
}

// -> runtime\vm\object.cc

SendPortPtr SendPort::New(Dart_Port id, Heap::Space space) {
return New(id, Isolate::Current()->origin_id(), space);
}

SendPortPtr SendPort::New(Dart_Port id,
Dart_Port origin_id,
Heap::Space space) {
ASSERT(id != ILLEGAL_PORT);
// 创建新的SendPort并将Dart_Port id和当前的isolate id与之绑定
SendPort& result = SendPort::Handle();
{
ObjectPtr raw =
Object::Allocate(SendPort::kClassId, SendPort::InstanceSize(), space,
SendPort::ContainsCompressedPointers());
NoSafepointScope no_safepoint;
result ^= raw;
result.StoreNonPointer(&result.untag()->id_, id);
result.StoreNonPointer(&result.untag()->origin_id_, origin_id);
}
return result.ptr();
}

到这里我们发现,Dart_NewNativePort将要处理的事件handler封装起来,最后在非当前isolate的线程中执行。

结论

从上面的分析中,我们可以知道,在Dart中通过File进行文件操作,其实是通过Dart中的_IOService进行消息中转,将用户的IO指令发送到Native层的IOService中;

IOService通过一些列操作,得到一个SendPort servicePort,与此同时对应的IO操作已经压入消息栈中等待触发在单独的线程中执行;

之后在_IOService中servicePort将用户需要的IO操作和与自己通信的_replyToPort = _receivePort!.sendPort; 通过send方法触发IOServiceCallback执行对应的IO操作,并且在最后调用Dart_PostCObject方法将结果压入消息栈中,这会触发Dart层_IOService的_receivePort!.handler回调事件,然后根据事件失败或者成功,使用Completer通过Event loop一步步将事件上报,最终回调用户需要的命令。

参考资料

09、Flutter FFI Dart Native API_又吹风_Bassy的博客-CSDN博客

快手-开眼快创 Flutter 实践 | w4lle’s Notes