Skip to content

Commit

Permalink
Merge pull request #84 from powersync-ja/feat/support-build-web-compi…
Browse files Browse the repository at this point in the history
…lers

Support being compiled with `build_web_compilers`
  • Loading branch information
simolus3 authored Feb 3, 2025
2 parents 1c039a3 + 7ff4bfe commit bc3416c
Show file tree
Hide file tree
Showing 7 changed files with 520 additions and 353 deletions.
1 change: 1 addition & 0 deletions .github/workflows/test.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,3 +68,4 @@ jobs:
run: |
export LD_LIBRARY_PATH=$(pwd)/sqlite-autoconf-${{ matrix.sqlite_version }}/.libs
melos test
melos test_build
8 changes: 8 additions & 0 deletions melos.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -51,3 +51,11 @@ scripts:
# as they could change the behaviour of how tests filter packages.
env:
MELOS_TEST: true

test_build:
description: Runs tests with build_test
run: dart run build_runner test -- -p chrome
exec:
concurrency: 1
packageFilters:
dependsOn: build_test
4 changes: 4 additions & 0 deletions packages/sqlite_async/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,7 @@
## 0.11.3

- Support being compiled with `package:build_web_compilers`.

## 0.11.2

- Support latest version of `package:sqlite3_web`.
Expand Down
354 changes: 2 additions & 352 deletions packages/sqlite_async/lib/src/common/port_channel.dart
Original file line number Diff line number Diff line change
@@ -1,352 +1,2 @@
import 'dart:async';
import 'dart:collection';
import 'dart:isolate';

abstract class PortClient {
Future<T> post<T>(Object message);
void fire(Object message);

factory PortClient.parent() {
return ParentPortClient();
}

factory PortClient.child(SendPort upstream) {
return ChildPortClient(upstream);
}
}

class ParentPortClient implements PortClient {
late Future<SendPort> sendPortFuture;
SendPort? sendPort;
final ReceivePort _receivePort = ReceivePort();
final ReceivePort _errorPort = ReceivePort();
bool closed = false;
Object? _closeError;
String? _isolateDebugName;
int _nextId = 1;

Map<int, Completer<Object?>> handlers = HashMap();

ParentPortClient() {
final initCompleter = Completer<SendPort>.sync();
sendPortFuture = initCompleter.future;
sendPortFuture.then((value) {
sendPort = value;
});
_receivePort.listen((message) {
if (message is _InitMessage) {
assert(!initCompleter.isCompleted);
initCompleter.complete(message.port);
} else if (message is _PortChannelResult) {
final handler = handlers.remove(message.requestId);
assert(handler != null);
if (message.success) {
handler!.complete(message.result);
} else {
handler!.completeError(message.error, message.stackTrace);
}
} else if (message == _closeMessage) {
close();
}
}, onError: (e) {
if (!initCompleter.isCompleted) {
initCompleter.completeError(e);
}

close();
}, onDone: () {
if (!initCompleter.isCompleted) {
initCompleter.completeError(ClosedException());
}
close();
});
_errorPort.listen((message) {
final [error, stackTraceString] = message;
final stackTrace = stackTraceString == null
? null
: StackTrace.fromString(stackTraceString);
if (!initCompleter.isCompleted) {
initCompleter.completeError(error, stackTrace);
}
_close(IsolateError(cause: error, isolateDebugName: _isolateDebugName),
stackTrace);
});
}

Future<void> get ready async {
await sendPortFuture;
}

void _cancelAll(Object error, [StackTrace? stackTrace]) {
var handlers = this.handlers;
this.handlers = {};
for (var message in handlers.values) {
message.completeError(error, stackTrace);
}
}

@override
Future<T> post<T>(Object message) async {
if (closed) {
throw _closeError ?? const ClosedException();
}
var completer = Completer<T>.sync();
var id = _nextId++;
handlers[id] = completer;
final port = sendPort ?? await sendPortFuture;
port.send(_RequestMessage(id, message, null));
return await completer.future;
}

@override
void fire(Object message) async {
if (closed) {
throw _closeError ?? ClosedException();
}
final port = sendPort ?? await sendPortFuture;
port.send(_FireMessage(message));
}

RequestPortServer server() {
return RequestPortServer(_receivePort.sendPort);
}

void _close([Object? error, StackTrace? stackTrace]) {
if (!closed) {
closed = true;

_receivePort.close();
_errorPort.close();
if (error == null) {
_cancelAll(const ClosedException());
} else {
_closeError = error;
_cancelAll(error, stackTrace);
}
}
}

void close() {
_close();
}

tieToIsolate(Isolate isolate) {
_isolateDebugName = isolate.debugName;
isolate.addErrorListener(_errorPort.sendPort);
isolate.addOnExitListener(_receivePort.sendPort, response: _closeMessage);
}
}

class SerializedPortClient {
final SendPort sendPort;

SerializedPortClient(this.sendPort);

ChildPortClient open() {
return ChildPortClient(sendPort);
}
}

class ChildPortClient implements PortClient {
final SendPort sendPort;
final ReceivePort receivePort = ReceivePort();
int _nextId = 1;
bool closed = false;

final Map<int, Completer<Object?>> handlers = HashMap();

ChildPortClient(this.sendPort) {
receivePort.listen((message) {
if (message is _PortChannelResult) {
final handler = handlers.remove(message.requestId);
assert(handler != null);
if (message.success) {
handler!.complete(message.result);
} else {
handler!.completeError(message.error, message.stackTrace);
}
}
});
}

@override
Future<T> post<T>(Object message) async {
if (closed) {
throw const ClosedException();
}
var completer = Completer<T>.sync();
var id = _nextId++;
handlers[id] = completer;
sendPort.send(_RequestMessage(id, message, receivePort.sendPort));
return await completer.future;
}

@override
void fire(Object message) {
if (closed) {
throw ClosedException();
}
sendPort.send(_FireMessage(message));
}

void _cancelAll(Object error) {
var handlers = HashMap<int, Completer<Object?>>.from(this.handlers);
this.handlers.clear();
for (var message in handlers.values) {
message.completeError(error);
}
}

void close() {
closed = true;
_cancelAll(const ClosedException());
receivePort.close();
}
}

class RequestPortServer {
final SendPort port;

RequestPortServer(this.port);

open(Future<Object?> Function(Object? message) handle) {
return PortServer.forSendPort(port, handle);
}
}

class PortServer {
final ReceivePort _receivePort = ReceivePort();
final Future<Object?> Function(Object? message) handle;
final SendPort? replyPort;

PortServer(this.handle) : replyPort = null {
_init();
}

PortServer.forSendPort(SendPort port, this.handle) : replyPort = port {
port.send(_InitMessage(_receivePort.sendPort));
_init();
}

SendPort get sendPort {
return _receivePort.sendPort;
}

SerializedPortClient client() {
return SerializedPortClient(sendPort);
}

void close() {
_receivePort.close();
}

void _init() {
_receivePort.listen((request) async {
if (request is _FireMessage) {
handle(request.message);
} else if (request is _RequestMessage) {
if (request.id == 0) {
// Fire and forget
handle(request.message);
} else {
final replyPort = request.reply ?? this.replyPort;
try {
var result = await handle(request.message);
replyPort!.send(_PortChannelResult.success(request.id, result));
} catch (e, stacktrace) {
replyPort!
.send(_PortChannelResult.error(request.id, e, stacktrace));
}
}
}
});
}
}

const _closeMessage = '_Close';

class _InitMessage {
final SendPort port;

_InitMessage(this.port);
}

class _FireMessage {
final Object message;

const _FireMessage(this.message);
}

class _RequestMessage {
final int id;
final Object message;
final SendPort? reply;

_RequestMessage(this.id, this.message, this.reply);
}

class ClosedException implements Exception {
const ClosedException();

@override
String toString() {
return 'ClosedException';
}
}

class IsolateError extends Error {
final Object cause;
final String? isolateDebugName;

IsolateError({required this.cause, this.isolateDebugName});

@override
String toString() {
if (isolateDebugName != null) {
return 'IsolateError in $isolateDebugName: $cause';
} else {
return 'IsolateError: $cause';
}
}
}

class _PortChannelResult<T> {
final int requestId;
final bool success;
final T? _result;
final Object? _error;
final StackTrace? stackTrace;

const _PortChannelResult.success(this.requestId, T result)
: success = true,
_error = null,
stackTrace = null,
_result = result;
const _PortChannelResult.error(this.requestId, Object error,
[this.stackTrace])
: success = false,
_result = null,
_error = error;

T get value {
if (success) {
return _result as T;
} else {
if (_error != null && stackTrace != null) {
Error.throwWithStackTrace(_error, stackTrace!);
} else {
throw _error!;
}
}
}

T get result {
assert(success);
return _result as T;
}

Object get error {
assert(!success);
return _error!;
}
}
export 'port_channel_native.dart'
if (dart.library.js_interop) 'port_channel_stub.dart';
Loading

0 comments on commit bc3416c

Please sign in to comment.