Skip to content

Commit

Permalink
finish test coverage
Browse files Browse the repository at this point in the history
  • Loading branch information
gmaclennan committed Feb 9, 2023
1 parent 45842c7 commit 504bd2a
Show file tree
Hide file tree
Showing 4 changed files with 219 additions and 25 deletions.
60 changes: 39 additions & 21 deletions lib/rpc/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -62,17 +62,19 @@ class Peer {
action (type) {
switch (type) {
case 'connect':
/* c8 ignore next 3 */
if (this.#state !== 'connecting') {
return // TODO: report error
return // TODO: report error - this should not happen
}
this.#state = 'connected'
break
case 'disconnect':
/* c8 ignore next */
if (this.#state === 'disconnected') return
this.#state = 'disconnected'
for (const pending of this.pendingInvites.values()) {
for (const { reject } of pending) {
reject(new PeerDisconnectedError)
reject(new PeerDisconnectedError())
}
}
this.pendingInvites.clear()
Expand All @@ -95,7 +97,8 @@ class Peer {
}
#assertConnected () {
if (this.#state === 'connected' && !this.#channel.closed) return
throw new PeerDisconnectedError()
/* c8 ignore next */
throw new PeerDisconnectedError() // TODO: report error - this should not happen
}
}

Expand Down Expand Up @@ -124,10 +127,10 @@ export class MapeoRPC extends TypedEmitter {
* @param {object} options
* @param {Invite['projectKey']} options.projectKey project key
* @param {Invite['encryptionKey']} [options.encryptionKey] project encryption key
* @param {number} [options.timeout=60000] timeout waiting for invite response before rejecting (default 1 minute)
* @param {number} [options.timeout] timeout waiting for invite response before rejecting (default 1 minute)
* @returns {Promise<InviteResponse['decision']>}
*/
async invite (peerId, { timeout = 60000, ...invite }) {
async invite (peerId, { timeout, ...invite }) {
const peer = this.#peers.get(peerId)
if (!peer) throw new UnknownPeerError('Unknown peer ' + peerId)
/** @type {Promise<InviteResponse['decision']>} */
Expand All @@ -140,13 +143,15 @@ export class MapeoRPC extends TypedEmitter {
const deferred = { resolve, reject }
pending.push(deferred)

const timeoutId = setTimeout(() => {
const index = pending.indexOf(deferred)
if (index > -1) {
pending.splice(index, 1)
}
origReject(new TimeoutError(`No response after ${timeout}ms`))
}, timeout)
const timeoutId =
timeout &&
setTimeout(() => {
const index = pending.indexOf(deferred)
if (index > -1) {
pending.splice(index, 1)
}
origReject(new TimeoutError(`No response after ${timeout}ms`))
}, timeout)

peer.sendInvite(invite)

Expand Down Expand Up @@ -218,8 +223,9 @@ export class MapeoRPC extends TypedEmitter {

const peerId = keyToId(remotePublicKey)
const existingPeer = this.#peers.get(peerId)
/* c8 ignore next 3 */
if (existingPeer && existingPeer.info.status !== 'disconnected') {
existingPeer.action('disconnect')
existingPeer.action('disconnect') // Should not happen, but in case
}
const peer = new Peer({ publicKey: remotePublicKey, channel })
this.#peers.set(peerId, peer)
Expand All @@ -233,9 +239,11 @@ export class MapeoRPC extends TypedEmitter {
#openPeer (publicKey) {
const peerId = keyToId(publicKey)
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
// No-op if no change in state
if (peer.info.status === 'connected') return
/* c8 ignore next */
if (peer.info.status === 'connected') return // TODO: report error - this should not happen
peer.action('connect')
this.#emitPeers()
}
Expand All @@ -244,21 +252,28 @@ export class MapeoRPC extends TypedEmitter {
#closePeer (publicKey) {
const peerId = publicKey.toString('hex')
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
// No-op if no change in state
/* c8 ignore next */
if (peer.info.status === 'disconnected') return
// TODO: Track reasons for closing
peer.action('disconnect')
this.#emitPeers()
}

get peers () {
return /** @type {PeerInfo[]} */ (
[...this.#peers.values()]
.map(peer => peer.info)
// A peer is only 'connecting' for a single tick, so to avoid complex
// async code around sending messages we don't expose 'connecting' peers
.filter(peerInfo => peerInfo.status !== 'connecting')
)
}

#emitPeers () {
const peerInfos = [...this.#peers.values()]
.map(peer => peer.info)
// A peer is only 'connecting' for a single tick, so to avoid complex
// async code around sending messages we don't expose 'connecting' peers
.filter(peerInfo => peerInfo.status !== 'connecting')
this.emit('peers', /** @type {PeerInfo[]} */ (peerInfos))
this.emit('peers', this.peers)
}

/**
Expand All @@ -270,6 +285,7 @@ export class MapeoRPC extends TypedEmitter {
#handleMessage (peerPublicKey, type, value) {
const peerId = keyToId(peerPublicKey)
const peer = this.#peers.get(peerId)
/* c8 ignore next */
if (!peer) return // TODO: report error - this should not happen
switch (type) {
case 'Invite': {
Expand All @@ -281,6 +297,7 @@ export class MapeoRPC extends TypedEmitter {
const response = InviteResponse.decode(value)
const projectId = keyToId(response.projectKey)
const pending = peer.pendingInvites.get(projectId)
/* c8 ignore next 3 */
if (!pending) {
return // TODO: report error - this should not happen
}
Expand All @@ -290,8 +307,9 @@ export class MapeoRPC extends TypedEmitter {
peer.pendingInvites.set(projectId, [])
break
}
/* c8 ignore next 2 */
default:
// TODO: report unhandled message error
// TODO: report unhandled message error
}
}
}
Expand Down
35 changes: 35 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,10 @@
"homepage": "https://github.com/digidem/mapeo-core#readme",
"devDependencies": {
"@hyperswarm/testnet": "^3.1.0",
"@sinonjs/fake-timers": "^10.0.2",
"@types/json-schema": "^7.0.11",
"@types/node": "^18.11.17",
"@types/sinonjs__fake-timers": "^8.1.2",
"@types/streamx": "^2.9.1",
"brittle": "^3.1.1",
"depcheck": "^1.4.3",
Expand Down
147 changes: 143 additions & 4 deletions tests/rpc.js
Original file line number Diff line number Diff line change
@@ -1,8 +1,15 @@
// @ts-check
import test from 'brittle'
import NoiseSecretStream from '@hyperswarm/secret-stream'
import { MapeoRPC, PeerDisconnectedError } from '../lib/rpc/index.js'
import { keyToId } from '../lib/utils.js'
import {
MapeoRPC,
PeerDisconnectedError,
TimeoutError,
UnknownPeerError
} from '../lib/rpc/index.js'
import FakeTimers from '@sinonjs/fake-timers'
import { once } from 'events'
import { Duplex } from 'streamx'

test('Send invite and accept', async t => {
t.plan(3)
Expand Down Expand Up @@ -52,6 +59,24 @@ test('Send invite and reject', async t => {
replicate(r1, r2)
})

test('Invite to unknown peer', async t => {
const r1 = new MapeoRPC()
const r2 = new MapeoRPC()

const projectKey = Buffer.allocUnsafe(32).fill(0)
const unknownPeerId = Buffer.allocUnsafe(32).fill(1).toString('hex')
replicate(r1, r2)

await once(r1, 'peers')
await t.exception(r1.invite(unknownPeerId, { projectKey }), UnknownPeerError)
await t.exception(
() => r2.inviteResponse(unknownPeerId, {
projectKey,
decision: MapeoRPC.InviteResponse.ACCEPT
}), UnknownPeerError
)
})

test('Send invite and already on project', async t => {
t.plan(3)
const r1 = new MapeoRPC()
Expand Down Expand Up @@ -194,9 +219,123 @@ test('Invite to multiple peers', async t => {
replicate(r3, r1)
})

test('Multiple invites to a peer, only one response', async t => {
t.plan(2)
let count = 0
const r1 = new MapeoRPC()
const r2 = new MapeoRPC()

const projectKey = Buffer.allocUnsafe(32).fill(0)

r1.on('peers', async peers => {
const responses = await Promise.all([
r1.invite(peers[0].id, { projectKey }),
r1.invite(peers[0].id, { projectKey }),
r1.invite(peers[0].id, { projectKey })
])
const expected = Array(3).fill(MapeoRPC.InviteResponse.ACCEPT)
t.alike(responses, expected)
})

r2.on('invite', (peerId, invite) => {
if (++count < 3) return
// Only respond to third invite
t.is(count, 3)
r2.inviteResponse(peerId, {
projectKey: invite.projectKey,
decision: MapeoRPC.InviteResponse.ACCEPT
})
})

replicate(r1, r2)
})

test('Default: invites do not timeout', async t => {
const clock = FakeTimers.install({ shouldAdvanceTime: true })
t.teardown(() => clock.uninstall())
t.plan(1)

const r1 = new MapeoRPC()
const r2 = new MapeoRPC()

const projectKey = Buffer.allocUnsafe(32).fill(0)

r1.once('peers', async peers => {
r1.invite(peers[0].id, { projectKey }).then(
() => t.fail('invite promise should not resolve'),
() => t.fail('invite promise should not reject')
)
await clock.tickAsync('01:00') // Advance 1 hour
t.pass('Waited 1 hour without invite timing out')
})

replicate(r1, r2)
})

test('Invite timeout', async t => {
const clock = FakeTimers.install({ shouldAdvanceTime: true })
t.teardown(() => clock.uninstall())
t.plan(1)

const r1 = new MapeoRPC()
const r2 = new MapeoRPC()

const projectKey = Buffer.allocUnsafe(32).fill(0)

r1.once('peers', async peers => {
t.exception(
r1.invite(peers[0].id, { projectKey, timeout: 5000 }),
TimeoutError
)
clock.tick(5001)
})

replicate(r1, r2)
})

test('Reconnect peer and send invite', async t => {
const r1 = new MapeoRPC()
const r2 = new MapeoRPC()

const projectKey = Buffer.allocUnsafe(32).fill(0)

const destroy = replicate(r1, r2)
await once(r1, 'peers')
await destroy()

t.is(r1.peers.length, 1)
t.is(r1.peers[0].status, 'disconnected')

r2.on('invite', (peerId, invite) => {
t.ok(invite.projectKey.equals(projectKey), 'invite project key correct')
r2.inviteResponse(peerId, {
projectKey: invite.projectKey,
decision: MapeoRPC.InviteResponse.ACCEPT
})
})

replicate(r1, r2)
const [peers] = await once(r1, 'peers')
t.is(r1.peers.length, 1)
t.is(peers[0].status, 'connected')
const response = await r1.invite(peers[0].id, { projectKey })
t.is(response, MapeoRPC.InviteResponse.ACCEPT)
})

test('invalid stream', t => {
const r1 = new MapeoRPC()
const regularStream = new Duplex()
t.exception(() => r1.connect(regularStream), 'Invalid stream')
})

function replicate (rpc1, rpc2) {
const n1 = new NoiseSecretStream(true)
const n2 = new NoiseSecretStream(false)
const n1 = new NoiseSecretStream(true, undefined, {
// Keep keypairs deterministic for tests, since we use peer.publicKey as an identifier.
keyPair: NoiseSecretStream.keyPair(Buffer.allocUnsafe(32).fill(0))
})
const n2 = new NoiseSecretStream(false, undefined, {
keyPair: NoiseSecretStream.keyPair(Buffer.allocUnsafe(32).fill(1))
})
n1.rawStream.pipe(n2.rawStream).pipe(n1.rawStream)

rpc1.connect(n1)
Expand Down

0 comments on commit 504bd2a

Please sign in to comment.