Skip to content

Commit

Permalink
chore: wait for webworkers to load before sending messages
Browse files Browse the repository at this point in the history
This enables webworkers to perform asynchronous initialization before setting an
`onmessage` handler, and avoids potential bundler-dependent issues wih lost
messages.
  • Loading branch information
jbms committed Mar 15, 2024
1 parent 10718c4 commit 2cdc1f3
Show file tree
Hide file tree
Showing 6 changed files with 59 additions and 28 deletions.
22 changes: 6 additions & 16 deletions src/async_computation/handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,34 +21,24 @@ const handlers = new Map<
(...args: any[]) => Promise<{ value: any; transfer?: Transferable[] }>
>();

function setupChannel(port: MessagePort | any) {
port.onmessage = (msg: any) => {
{
// On most Firefox and Chrome, async computation workers can be created directly from the
// chunk queue worker. On Safari, though, since workers cannot themselves create additional
// workers, instead async computation workers are created by the main thread, and we
// communicate with the chunk queue worker via a separate `MessagePort`, which is provided in
// an additional message with a `port` member.
const newPort = msg.data.port;
if (newPort !== undefined) {
setupChannel(newPort);
return;
}
}
function setupChannel(port: DedicatedWorkerGlobalScope) {
self.onmessage = (msg: any) => {
const { t, id, args } = msg.data as { t: string; id: number; args: any[] };
const handler = handlers.get(t)!;
handler(...args).then(
({ value, transfer }) => port.postMessage({ id, value }, transfer),
({ value, transfer }) => port.postMessage({ id, value }, { transfer }),
(error) =>
port.postMessage({
id,
error: error instanceof Error ? error.message : error.toString(),
}),
);
};
// Notify that the worker is ready to receive messages.
self.postMessage(null);
}

setupChannel(self);
setupChannel(self as DedicatedWorkerGlobalScope);

export function registerAsyncComputation<
Signature extends (...args: any) => any,
Expand Down
23 changes: 16 additions & 7 deletions src/async_computation/request.ts
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import type { AsyncComputationSpec } from "#src/async_computation/index.js";
import type { CancellationToken } from "#src/util/cancellation.js";
import { CANCELED } from "#src/util/cancellation.js";

let numWorkers = 0;
const freeWorkers: Worker[] = [];
const pendingTasks = new Map<
number,
Expand Down Expand Up @@ -47,21 +48,29 @@ function returnWorker(worker: Worker) {
freeWorkers.push(worker);
}

function getNewWorker(): Worker {
function launchWorker() {
++numWorkers;
// Note: For compatibility with multiple bundlers, a browser-compatible URL
// must be used with `new URL`, which means a Node.js subpath import like
// "#src/async_computation.bundle.js" cannot be used.
const port = new Worker(
const worker = new Worker(
new URL("../async_computation.bundle.js", import.meta.url),
{ type: "module" },
);
port.onmessage = (msg) => {
let ready = false;
worker.onmessage = (msg) => {
// First message indicates worker is ready.
if (!ready) {
ready = true;
returnWorker(worker);
return;
}
const { id, value, error } = msg.data as {
id: number;
value?: any;
error?: string;
};
returnWorker(port);
returnWorker(worker);
const callbacks = tasks.get(id)!;
tasks.delete(id);
if (callbacks === undefined) return;
Expand All @@ -72,7 +81,6 @@ function getNewWorker(): Worker {
callbacks.resolve(value);
}
};
return port;
}

export function requestAsyncComputation<
Expand All @@ -95,10 +103,11 @@ export function requestAsyncComputation<
});
if (freeWorkers.length !== 0) {
freeWorkers.pop()!.postMessage(msg, transfer as Transferable[]);
} else if (tasks.size < maxWorkers) {
getNewWorker().postMessage(msg, transfer as Transferable[]);
} else {
pendingTasks.set(id, { msg, transfer });
if (tasks.size > numWorkers && numWorkers < maxWorkers) {
launchWorker();
}
}
return promise;
}
2 changes: 1 addition & 1 deletion src/viewer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ export class DataManagementContext extends RefCounted {
);
this.chunkQueueManager = this.registerDisposer(
new ChunkQueueManager(
new RPC(this.worker),
new RPC(this.worker, /*waitUntilReady=*/ true),
this.gl,
this.frameNumberCounter,
{
Expand Down
33 changes: 32 additions & 1 deletion src/worker_rpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ const DEBUG_MESSAGES = false;

const PROMISE_RESPONSE_ID = "rpc.promise.response";
const PROMISE_CANCEL_ID = "rpc.promise.cancel";
const READY_ID = "rpc.ready";

const handlers = new Map<string, RPCHandler>();

Expand Down Expand Up @@ -108,6 +109,11 @@ registerRPC(PROMISE_RESPONSE_ID, function (this: RPC, x: any) {
}
});

registerRPC(READY_ID, function (this: RPC, x: any) {
x;
this.onPeerReady();
});

interface RPCTarget {
postMessage(message?: any, ports?: any): void;
onmessage: ((ev: MessageEvent) => any) | null;
Expand All @@ -118,7 +124,14 @@ const INITIAL_RPC_ID = IS_WORKER ? -1 : 0;
export class RPC {
private objects = new Map<RpcId, any>();
private nextId: RpcId = INITIAL_RPC_ID;
constructor(public target: RPCTarget) {
private queue: { data: any; transfers?: any[] }[] | undefined;
constructor(
public target: RPCTarget,
waitUntilReady: boolean,
) {
if (waitUntilReady) {
this.queue = [];
}
target.onmessage = (e) => {
const data = e.data;
if (DEBUG_MESSAGES) {
Expand All @@ -128,6 +141,19 @@ export class RPC {
};
}

sendReady() {
this.invoke(READY_ID, {});
}

onPeerReady() {
const { queue } = this;
if (queue === undefined) return;
this.queue = undefined;
for (const { data, transfers } of queue) {
this.target.postMessage(data, transfers);
}
}

get numObjects() {
return this.objects.size;
}
Expand Down Expand Up @@ -167,6 +193,11 @@ export class RPC {
if (DEBUG_MESSAGES) {
console.trace("Sending message", x);
}
const { queue } = this;
if (queue !== undefined) {
queue.push({ data: x, transfers });
return;
}
this.target.postMessage(x, transfers);
}

Expand Down
5 changes: 3 additions & 2 deletions src/worker_rpc_context.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,5 +16,6 @@

import { RPC } from "#src/worker_rpc.js";

export const rpc = new RPC(self);
(<any>self).rpc = rpc;
export const rpc = new RPC(self, /*waitUntilReady=*/ false);
rpc.sendReady();
(globalThis as any).rpc = rpc;
2 changes: 1 addition & 1 deletion tsconfig.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
"target": "ES2017",
"newLine": "LF",
"baseUrl": ".",
"lib": ["dom", "es2020", "dom.iterable"],
"lib": ["dom", "webworker", "es2020", "dom.iterable"],
"typeRoots": [],
"module": "ESNext",
},
Expand Down

0 comments on commit 2cdc1f3

Please sign in to comment.