Skip to content

Commit

Permalink
feat(swingset): Add Node.js Worker (thread) -based VatManager
Browse files Browse the repository at this point in the history
This adds a per-vat option to run the vat code in a separate thread, sharing
the process with the main (kernel) thread, sending VatDelivery and VatSyscall
objects over the postMessage channel. This isn't particularly useful by
itself, but it establishes the protocol for running vats in a
separate *process*, possibly written in a different language or using a
different JS engine (like XS, in #1299).

This 'nodeWorker' managertype has several limitations. The shallow ones are:

* vatPowers is missing transformTildot, which shouldn't be hard to add
* vatPowers.testLog is missing, only used for unit tests so we can probably
live without it
* vatPowers is missing makeGetMeter/transformMetering (and will probably
never get them, since they're only used for within-vat metering and we're
trying to get rid of that)
* metering is not implemented at all
* delivery transcripts (and replay) are not yet implemented

Metering shouldn't be too hard to add, although we'll probably make it an
option, to avoid paying the instrumented-globals penalty when we aren't using
it. We also need to add proper control over vat termination (via meter
exhaustion or manually).

The deeper limitation is that nodeWorkers cannot block to wait for a
syscall (like `callNow`), so they cannot invoke devices.

refs #1127
closes #1384
  • Loading branch information
warner committed Aug 7, 2020
1 parent 4b5c636 commit e6bebac
Show file tree
Hide file tree
Showing 9 changed files with 417 additions and 2 deletions.
2 changes: 1 addition & 1 deletion packages/SwingSet/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
},
"devDependencies": {
"@agoric/install-metering-and-ses": "^0.1.1",
"@agoric/install-ses": "^0.2.0",
"esm": "^3.2.5",
"tap": "^14.10.5",
"tape": "^4.13.2",
Expand All @@ -35,6 +34,7 @@
"@agoric/bundle-source": "^1.1.6",
"@agoric/eventual-send": "^0.9.3",
"@agoric/import-bundle": "^0.0.8",
"@agoric/install-ses": "^0.2.0",
"@agoric/marshal": "^0.2.3",
"@agoric/nat": "^2.0.1",
"@agoric/promise-kit": "^0.1.3",
Expand Down
9 changes: 9 additions & 0 deletions packages/SwingSet/src/controller.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
import fs from 'fs';
import path from 'path';
import re2 from 're2';
import { Worker } from 'worker_threads';
import * as babelCore from '@babel/core';
import * as babelParser from '@agoric/babel-parser';
import babelGenerate from '@babel/generator';
Expand Down Expand Up @@ -226,13 +227,21 @@ export async function buildVatController(
}`,
);

function makeNodeWorker() {
const supercode = require.resolve(
'./kernel/vatManager/nodeWorkerSupervisor.js',
);
return new Worker(supercode);
}

const kernelEndowments = {
waitUntilQuiescent,
hostStorage,
makeVatEndowments,
replaceGlobalMeter,
transformMetering,
transformTildot,
makeNodeWorker,
};

const kernel = buildKernel(kernelEndowments);
Expand Down
2 changes: 2 additions & 0 deletions packages/SwingSet/src/kernel/kernel.js
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ export default function buildKernel(kernelEndowments) {
replaceGlobalMeter,
transformMetering,
transformTildot,
makeNodeWorker,
} = kernelEndowments;
insistStorageAPI(hostStorage);
const { enhancedCrankBuffer, commitCrank } = wrapStorage(hostStorage);
Expand Down Expand Up @@ -565,6 +566,7 @@ export default function buildKernel(kernelEndowments) {
testLog,
transformMetering,
waitUntilQuiescent,
makeNodeWorker,
});

/*
Expand Down
16 changes: 15 additions & 1 deletion packages/SwingSet/src/kernel/vatManager/factory.js
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import { assert } from '@agoric/assert';
import { assertKnownOptions } from '../../assertOptions';
import { makeLocalVatManagerFactory } from './localVatManager';
import { makeNodeWorkerVatManagerFactory } from './nodeWorker';

export function makeVatManagerFactory({
allVatPowers,
Expand All @@ -10,6 +11,7 @@ export function makeVatManagerFactory({
meterManager,
transformMetering,
waitUntilQuiescent,
makeNodeWorker,
}) {
const localFactory = makeLocalVatManagerFactory({
allVatPowers,
Expand All @@ -20,6 +22,11 @@ export function makeVatManagerFactory({
waitUntilQuiescent,
});

const nodeWorkerFactory = makeNodeWorkerVatManagerFactory({
makeNodeWorker,
kernelKeeper,
});

function validateManagerOptions(managerOptions) {
assertKnownOptions(managerOptions, [
'enablePipelining',
Expand Down Expand Up @@ -63,8 +70,15 @@ export function makeVatManagerFactory({
return localFactory.createFromBundle(vatID, bundle, managerOptions);
}

if (managerType === 'nodeWorker') {
// 'setup' based vats must be local. TODO: stop using 'setup' in vats,
// but tests and comms-vat still need it
assert(!setup, `setup()-based vats must use a local Manager`);
return nodeWorkerFactory.createFromBundle(vatID, bundle, managerOptions);
}

throw Error(
`unknown manager type ${managerType}, not 'local'`,
`unknown manager type ${managerType}, not 'local' or 'nodeWorker'`,
);
}

Expand Down
147 changes: 147 additions & 0 deletions packages/SwingSet/src/kernel/vatManager/nodeWorker.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
/* global harden */

// import { Worker } from 'worker_threads'; // not from a Compartment
import { assert } from '@agoric/assert';
import { makePromiseKit } from '@agoric/promise-kit';
import { makeTranscriptManager } from './transcript';

import { createSyscall } from './syscall';

// start a "Worker" (Node's tool for starting new threads) and load a bundle
// into it

/*
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
function wait10ms() {
const { promise: queueEmptyP, resolve } = makePromiseKit();
setTimeout(() => resolve(), 10);
return queueEmptyP;
}
*/

// eslint-disable-next-line no-unused-vars
function parentLog(first, ...args) {
// console.error(`--parent: ${first}`, ...args);
}

export function makeNodeWorkerVatManagerFactory(tools) {
const { makeNodeWorker, kernelKeeper } = tools;

function createFromBundle(vatID, bundle, managerOptions) {
const { vatParameters } = managerOptions;
assert(!managerOptions.metered, 'not supported yet');
assert(!managerOptions.notifyTermination, 'not supported yet');
assert(!managerOptions.enableSetup, 'not supported at all');
if (managerOptions.enableInternalMetering) {
// TODO: warn+ignore, rather than throw, because the kernel enables it
// for all vats, because the Spawner still needs it. When the kernel
// stops doing that, turn this into a regular assert
console.log(`node-worker does not support enableInternalMetering`);
}
const vatKeeper = kernelKeeper.allocateVatKeeperIfNeeded(vatID);
const transcriptManager = makeTranscriptManager(
kernelKeeper,
vatKeeper,
vatID,
);

// prepare to accept syscalls from the worker

// TODO: make the worker responsible for checking themselves: we send
// both the delivery and the expected syscalls, and the supervisor
// compares what the bundle does with what it was told to expect.
// Modulo flow control, we just stream transcript entries at the
// worker and eventually get back an "ok" or an error. When we do
// that, doSyscall won't even see replayed syscalls from the worker.

const { doSyscall, setVatSyscallHandler } = createSyscall(
transcriptManager,
);
function handleSyscall(vatSyscallObject) {
const type = vatSyscallObject[0];
if (type === 'callNow') {
throw Error(`nodeWorker cannot block, cannot use syscall.callNow`);
}
doSyscall(vatSyscallObject);
}

// start the worker and establish a connection

const { promise: workerP, resolve: gotWorker } = makePromiseKit();

function sendToWorker(msg) {
assert(msg instanceof Array);
workerP.then(worker => worker.postMessage(msg));
}

const {
promise: dispatchReadyP,
resolve: dispatchIsReady,
} = makePromiseKit();
let waiting;

function handleUpstream([type, ...args]) {
parentLog(`received`, type);
if (type === 'setUplinkAck') {
parentLog(`upload ready`);
} else if (type === 'gotBundle') {
parentLog(`bundle loaded`);
} else if (type === 'dispatchReady') {
parentLog(`dispatch() ready`);
// wait10ms().then(dispatchIsReady); // stall to let logs get printed
dispatchIsReady();
} else if (type === 'syscall') {
parentLog(`syscall`, args);
const vatSyscallObject = args;
handleSyscall(vatSyscallObject);
} else if (type === 'deliverDone') {
parentLog(`deliverDone`);
if (waiting) {
const resolve = waiting;
waiting = null;
resolve();
}
} else {
parentLog(`unrecognized uplink message ${type}`);
}
}

const worker = makeNodeWorker();
worker.on('message', handleUpstream);
gotWorker(worker);

parentLog(`instructing worker to load bundle..`);
sendToWorker(['setBundle', bundle, vatParameters]);

function deliver(delivery) {
parentLog(`sending delivery`, delivery);
assert(!waiting, `already waiting for delivery`);
const pr = makePromiseKit();
waiting = pr.resolve;
sendToWorker(['deliver', ...delivery]);
return pr.promise;
}

function replayTranscript() {
throw Error(`replayTranscript not yet implemented`);
}

function shutdown() {
// this returns a Promise that fulfills with 1 if we used
// worker.terminate(), otherwise with the `exitCode` passed to
// `process.exit(exitCode)` within the worker.
return worker.terminate();
}

const manager = harden({
replayTranscript,
setVatSyscallHandler,
deliver,
shutdown,
});

return dispatchReadyP.then(() => manager);
}

return harden({ createFromBundle });
}
142 changes: 142 additions & 0 deletions packages/SwingSet/src/kernel/vatManager/nodeWorkerSupervisor.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
/* global harden */
// this file is loaded at the start of a new Worker, which makes it a new JS
// environment (with it's own Realm), so we must install-ses too.
import '@agoric/install-ses';
import { parentPort } from 'worker_threads';
import anylogger from 'anylogger';

import { assert } from '@agoric/assert';
import { importBundle } from '@agoric/import-bundle';
import { Remotable, getInterfaceOf } from '@agoric/marshal';
import { HandledPromise } from '@agoric/eventual-send';
import { waitUntilQuiescent } from '../../waitUntilQuiescent';
import { makeLiveSlots } from '../liveSlots';

// eslint-disable-next-line no-unused-vars
function workerLog(first, ...args) {
// console.error(`---worker: ${first}`, ...args);
}

workerLog(`supervisor started`);

function makeConsole(tag) {
const log = anylogger(tag);
const cons = {};
for (const level of ['debug', 'log', 'info', 'warn', 'error']) {
cons[level] = log[level];
}
return harden(cons);
}

function runAndWait(f, errmsg) {
Promise.resolve()
.then(f)
.then(undefined, err => workerLog(`doProcess: ${errmsg}:`, err));
return waitUntilQuiescent();
}

function sendUplink(msg) {
assert(msg instanceof Array, `msg must be an Array`);
parentPort.postMessage(msg);
}

let dispatch;

async function doProcess(dispatchRecord, errmsg) {
const dispatchOp = dispatchRecord[0];
const dispatchArgs = dispatchRecord.slice(1);
workerLog(`runAndWait`);
await runAndWait(() => dispatch[dispatchOp](...dispatchArgs), errmsg);
workerLog(`doProcess done`);
}

function doNotify(vpid, vp) {
const errmsg = `vat.promise[${vpid}] ${vp.state} failed`;
switch (vp.state) {
case 'fulfilledToPresence':
return doProcess(['notifyFulfillToPresence', vpid, vp.slot], errmsg);
case 'redirected':
throw new Error('not implemented yet');
case 'fulfilledToData':
return doProcess(['notifyFulfillToData', vpid, vp.data], errmsg);
case 'rejected':
return doProcess(['notifyReject', vpid, vp.data], errmsg);
default:
throw Error(`unknown promise state '${vp.state}'`);
}
}

let syscallLog;
parentPort.on('message', ([type, ...margs]) => {
workerLog(`received`, type);
if (type === 'start') {
// TODO: parent should send ['start', vatID]
workerLog(`got start`);
sendUplink(['gotStart']);
} else if (type === 'setBundle') {
const [bundle, vatParameters] = margs;
const endowments = {
console: makeConsole(`SwingSet:vatWorker`),
HandledPromise,
};
importBundle(bundle, { endowments }).then(vatNS => {
workerLog(`got vatNS:`, Object.keys(vatNS).join(','));
sendUplink(['gotBundle']);

function doSyscall(vatSyscallObject) {
sendUplink(['syscall', ...vatSyscallObject]);
}
const syscall = harden({
send: (...args) => doSyscall(['send', ...args]),
callNow: (..._args) => {
throw Error(`nodeWorker cannot syscall.callNow`);
},
subscribe: (...args) => doSyscall(['subscribe', ...args]),
fulfillToData: (...args) => doSyscall(['fulfillToData', ...args]),
fulfillToPresence: (...args) =>
doSyscall(['fulfillToPresence', ...args]),
reject: (...args) => doSyscall(['reject', ...args]),
});

const state = null;
const vatID = 'demo-vatID';
// todo: maybe add transformTildot, makeGetMeter/transformMetering to
// vatPowers, but only if options tell us they're wanted. Maybe
// transformTildot should be async and outsourced to the kernel
// process/thread.
const vatPowers = { Remotable, getInterfaceOf };
dispatch = makeLiveSlots(
syscall,
state,
vatNS.buildRootObject,
vatID,
vatPowers,
vatParameters,
);
workerLog(`got dispatch:`, Object.keys(dispatch).join(','));
sendUplink(['dispatchReady']);
});
} else if (type === 'deliver') {
if (!dispatch) {
workerLog(`error: deliver before dispatchReady`);
return;
}
const [dtype, ...dargs] = margs;
if (dtype === 'message') {
const [targetSlot, msg] = dargs;
const errmsg = `vat[${targetSlot}].${msg.method} dispatch failed`;
doProcess(
['deliver', targetSlot, msg.method, msg.args, msg.result],
errmsg,
).then(() => {
sendUplink(['deliverDone']);
});
} else if (dtype === 'notify') {
doNotify(...dargs).then(() => sendUplink(['deliverDone', syscallLog]));
} else {
throw Error(`bad delivery type ${dtype}`);
}
} else {
workerLog(`unrecognized downlink message ${type}`);
}
});
Loading

0 comments on commit e6bebac

Please sign in to comment.