Skip to content

Commit

Permalink
fix: drastically simplify the new-block listener
Browse files Browse the repository at this point in the history
This will allow Stargate to run with our code, even though we
don't yet have a light client for it.  Here's hoping for
CosmJS+IBC.
  • Loading branch information
michaelfig committed Sep 2, 2020
1 parent 059f34d commit 2b3160f
Show file tree
Hide file tree
Showing 2 changed files with 118 additions and 82 deletions.
199 changes: 118 additions & 81 deletions packages/cosmic-swingset/lib/ag-solo/chain-cosmos-sdk.js
Original file line number Diff line number Diff line change
@@ -1,15 +1,13 @@
import path from 'path';
import fs from 'fs';
import { execFile } from 'child_process';
import djson from 'deterministic-json';
import { createHash } from 'crypto';
import { open as tempOpen } from 'temp';
// FIXME: Use @agoric/tendermint until
// https://github.com/nomic-io/js-tendermint/issues/25
// is resolved.
import Tendermint from '@agoric/tendermint';

import WebSocket from 'ws';

import anylogger from 'anylogger';
import { makeNotifierKit } from '@agoric/notifier';
import { makePromiseKit } from '@agoric/promise-kit';

const log = anylogger('chain-cosmos-sdk');

Expand Down Expand Up @@ -80,7 +78,7 @@ export async function connectToChain(
}
shuffle(rpcAddresses);

const helperDir = path.join(basedir, 'ag-cosmos-helper-statedir');
const helperDir = path.join(basedir, 'ag-chain-cosmos-statedir');

const queued = {};

Expand Down Expand Up @@ -118,11 +116,10 @@ export async function connectToChain(
const fullArgs = [
...args,
`--chain-id=${chainID}`,
'--output=json',
`--node=tcp://${rpcAddr}`,
`--home=${helperDir}`,
];
log(HELPER, ...fullArgs);
log.debug(HELPER, ...fullArgs);
let ret;
try {
ret = await new Promise((resolve, reject) => {
Expand Down Expand Up @@ -241,7 +238,7 @@ export async function connectToChain(
}
}

const getMailbox = _height =>
const getMailbox = () =>
queuedHelper(
'getMailbox',
1, // Only one helper running at a time.
Expand All @@ -253,64 +250,20 @@ export async function connectToChain(
if (errMsg) {
log.error(errMsg);
}
log(`helper said: ${stdout}`);
try {
// Try to parse the stdout.
return JSON.parse(JSON.parse(stdout).value);
} catch (e) {
log(`failed to parse output:`, e);
if (stdout) {
log(`helper said: ${stdout}`);
try {
// Try to parse the stdout.
return JSON.parse(JSON.parse(stdout).value);
} catch (e) {
log(`failed to parse output:`, e);
}
}
},
undefined,
{}, // defaultIfCancelled
);

// The 'tendermint' package can perform light-client checks (instantiate
// Tendermint() with a
// known-valid starting state, containing {header,validators,commit}, which
// can be fetched by RPC, that we compare against the GCI and chainID).
//
// TODO: decide when these parameters should be updated to something other
// than genesis. This is necessary to ensure we have a recent block
// reference that probably isn't compromised.
const LAST_KNOWN_BLOCKHEIGHT = 1;
const LAST_KNOWN_COMMIT = null;
const getClient = () =>
retryRpcAddr(async rpcAddr => {
const nodeAddr = `ws://${rpcAddr}`;
const rpc = Tendermint.RpcClient(nodeAddr);
const { genesis } = await rpc.genesis();
const { validators } = await rpc.validators({
height: LAST_KNOWN_BLOCKHEIGHT,
});
rpc.close();
if (genesis.chain_id !== chainID) {
throw Error(
`downloaded chainID ${genesis.chain_id} does not match expected chainID ${chainID}`,
);
}
const gci = createHash('sha256')
.update(djson.stringify(genesis))
.digest('hex');
if (gci !== GCI) {
throw Error(`computed GCI ${gci} does not match expected GCI ${GCI}`);
}
const clientState = {
validators,
commit: LAST_KNOWN_COMMIT,
header: { height: LAST_KNOWN_BLOCKHEIGHT, chain_id: chainID },
};
const client = Tendermint(nodeAddr, clientState);
client.on('error', e => log.error(e));
return new Promise((resolve, reject) => {
client.once('error', reject);
client.once('synced', () => {
client.removeListener('error', reject);
resolve({ lightClient: client });
});
});
});

// Validate that our chain egress exists.
await retryRpcAddr(async rpcAddr => {
const args = ['query', 'swingset', 'egress', myAddr];
Expand All @@ -328,7 +281,10 @@ export async function connectToChain(
});
});

if (r.stderr.includes('not found: unknown request')) {
if (r.stderr) {
console.error(r.stderr);
}
if (!r.stdout) {
console.error(`\
=============
${chainID} chain does not yet know of address ${myAddr}${adviseProvision(
Expand All @@ -352,25 +308,104 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseProvision(
// outbox is correctly traced to the block header, and that the block
// header is a legitimate descendant of our previously-validated state.

const c = await getClient();

// TODO: another way to make this cheaper would be to extract the apphash
// from the received block, and only check the mailbox if it changes.
// That's more coarse than checking for only our own slot, but better than
// hitting the rest-server on every single block.
// FIXME: We currently don't use the persistent light client functionality.
// We hope to move to CosmJS with our own IBC implementation for all our light
// client + transaction needs. For now, it's simple enough to notice when
// blocks happen an just to issue mailbox queries.

/**
* Get a notifier that announces every time a block lands.
* @returns {Notifier<any>}
*/
const getBlockNotifier = () => {
const { notifier, updater } = makeNotifierKit();
retryRpcAddr(async rpcAddr => {
// Every time we enter this function, we are establishing a
// new websocket to a potentially different RPC server.
//
// We use the same notifier, though... it's a stable identity.

// This promise is for when we're ready to retry.
const retryPK = makePromiseKit();

// Open the WebSocket.
const ws = new WebSocket(`ws://${rpcAddr}/websocket`);

// This magic identifier just distinguishes our subscription
// from other noise on the Websocket, if there is any.
const MAGIC_ID = 13254;
ws.addEventListener('open', _ => {
// We send a message to subscribe to every
// new block header.
const obj = {
// JSON-RPC version 2.0.
jsonrpc: '2.0',
id: MAGIC_ID,
// We want to subscribe.
method: 'subscribe',
params: {
// Here is the Tendermint event for new blocks.
query: "tm.event = 'NewBlockHeader'",
},
};
// Send that message, and wait for the subscription.
ws.send(JSON.stringify(obj));
});
ws.addEventListener('message', ev => {
// We received a message.
const obj = JSON.parse(ev.data);
if (obj.id === MAGIC_ID) {
// It matches our subscription, so notify.
updater.updateState(obj);
}
});
ws.addEventListener('close', _ => {
// The value `undefined` as the resolution of this retry
// tells the caller to retry again with a different RPC server.
retryPK.resolve(undefined);
});
// Return an unresolved promise that resolves to `undefined`
// when the WebSocket is closed.
return retryPK.promise;
});

c.lightClient.on('update', ({ height }) => {
log(`new block on ${GCI}, fetching mailbox`);
return getMailbox(height)
.then(({ outbox, ack }) => {
// console.debug('have outbox', outbox, ack);
if (outbox) {
inbound(GCI, outbox, ack);
return notifier;
};

// Begin the block notifier cycle.
const blockNotifier = getBlockNotifier();

/**
* This function is entered at most the same number of times
* as the blockNotifier announces a new block.
*
* It then gets the mailbox. There are no optimisations.
*
* @param {number=} lastBlockUpdate
*/
const recurseEachNewBlock = (lastBlockUpdate = undefined) => {
blockNotifier
.getUpdateSince(lastBlockUpdate)
.then(({ updateCount, value }) => {
if (!value) {
throw Error(`${GCI} unexpectedly finished!`);
}
})
.catch(e => log.error(`Failed to fetch ${GCI} mailbox:`, e));
});
c.lightClient.on('close', e => log.error('closed', e));
log.debug(`new block on ${GCI}, fetching mailbox`);
getMailbox()
.then(({ outbox, ack }) => {
// console.debug('have outbox', outbox, ack);
if (outbox) {
inbound(GCI, outbox, ack);
}
})
.catch(e => log.error(`Failed to fetch ${GCI} mailbox:`, e));
recurseEachNewBlock(updateCount);
});
};

// Begin the block consumer.
recurseEachNewBlock();

async function deliver(newMessages, acknum) {
let tmpInfo;
try {
Expand Down Expand Up @@ -421,8 +456,8 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseProvision(
'--keyring-backend=test',
`@${tmpInfo.path}`, // Deliver message over file, as it could be big.
'--gas=auto',
'--gas-adjustment=1.2',
'--from=ag-solo',
'-ojson',
'--broadcast-mode=block', // Don't return until committed.
'--yes',
];
Expand Down Expand Up @@ -458,5 +493,7 @@ ${chainID} chain does not yet know of address ${myAddr}${adviseProvision(
}
}

// Now that we've started consuming blocks, tell our caller how to deliver
// messages.
return deliver;
}
1 change: 0 additions & 1 deletion packages/cosmic-swingset/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,6 @@
"@agoric/swing-store-lmdb": "^0.3.4",
"@agoric/swing-store-simple": "^0.2.4",
"@agoric/swingset-vat": "^0.7.1",
"@agoric/tendermint": "^4.1.3",
"@agoric/transform-eventual-send": "^1.3.2",
"@agoric/weak-store": "^0.0.9",
"@agoric/zoe": "^0.8.1",
Expand Down

0 comments on commit 2b3160f

Please sign in to comment.