Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix 'write EPIPE' error & failing discovery test #328

Merged
merged 13 commits into from
Oct 20, 2023
25 changes: 14 additions & 11 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions src/discovery/mdns.js
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,19 @@ export class MdnsDiscovery extends TypedEmitter {
* @param {net.Socket} socket
*/
#handleTcpConnection(isInitiator, socket) {
socket.off('error', this.#handleSocketError)
socket.on('error', onSocketError)

/** @param {any} e */
function onSocketError(e) {
if (e.code === 'EPIPE') {
socket.destroy()
if (secretStream) {
secretStream.destroy()
}
}
}

const { remoteAddress } = socket
if (!remoteAddress || !isPrivate(remoteAddress)) {
socket.destroy(new Error('Invalid remoteAddress ' + remoteAddress))
Expand All @@ -127,6 +140,7 @@ export class MdnsDiscovery extends TypedEmitter {

secretStream.on('connect', () => {
// Further errors will be handled in #handleNoiseStreamConnection()
socket.off('error', onSocketError)
secretStream.off('error', this.#handleSocketError)
this.#handleNoiseStreamConnection(secretStream)
})
Expand Down
45 changes: 29 additions & 16 deletions tests/discovery/mdns.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,15 +3,13 @@ import { randomBytes } from 'node:crypto'
import net from 'node:net'
import { KeyManager } from '@mapeo/crypto'
import { setTimeout as delay } from 'node:timers/promises'
import pDefer from 'p-defer'
import { keyToPublicId } from '@mapeo/crypto'
import { ERR_DUPLICATE, MdnsDiscovery } from '../../src/discovery/mdns.js'
import NoiseSecretStream from '@hyperswarm/secret-stream'

// Time in ms to wait for mdns messages to propogate
const MDNS_WAIT_TIME = 5000

test('mdns - discovery and sharing of data', (t) => {
t.plan(2)
const deferred = pDefer()
const identityKeypair1 = new KeyManager(randomBytes(16)).getIdentityKeypair()
const identityKeypair2 = new KeyManager(randomBytes(16)).getIdentityKeypair()

Expand All @@ -24,23 +22,28 @@ test('mdns - discovery and sharing of data', (t) => {
const str = 'hi'

mdnsDiscovery1.on('connection', (stream) => {
stream.on('error', handleConnectionError.bind(null, t))
stream.write(str)
})

mdnsDiscovery2.on('connection', (stream) => {
stream.on('error', handleConnectionError.bind(null, t))
stream.on('data', (d) => {
t.is(d.toString(), str, 'expected data written')
Promise.all([
mdnsDiscovery1.stop({ force: true }),
mdnsDiscovery2.stop({ force: true }),
]).then(() => {
t.pass('teardown complete')
deferred.resolve()
})
})
})

mdnsDiscovery1.start()
mdnsDiscovery2.start()

return deferred.promise
})

test('deduplicate incoming connections', async (t) => {
Expand All @@ -53,13 +56,15 @@ test('deduplicate incoming connections', async (t) => {
await discovery.start()

discovery.on('connection', (conn) => {
conn.on('error', handleConnectionError.bind(null, t))
localConnections.add(conn)
conn.on('close', () => localConnections.delete(conn))
})

const addrInfo = discovery.address()
for (let i = 0; i < 20; i++) {
noiseConnect(addrInfo, remoteKp).then((conn) => {
conn.on('error', handleConnectionError.bind(null, t))
conn.on('connect', () => remoteConnections.add(conn))
conn.on('close', () => remoteConnections.delete(conn))
})
Expand Down Expand Up @@ -103,36 +108,35 @@ async function noiseConnect({ port, address }, keyPair) {
async function testMultiple(t, { period, nPeers = 20 }) {
const peersById = new Map()
const connsById = new Map()
const promises = []
// t.plan(3 * nPeers + 1)

async function spawnPeer() {
async function spawnPeer(onConnected) {
const identityKeypair = new KeyManager(randomBytes(16)).getIdentityKeypair()
const discovery = new MdnsDiscovery({ identityKeypair })
const peerId = keyToPublicId(discovery.publicKey)
peersById.set(peerId, discovery)
const conns = []
connsById.set(peerId, conns)
discovery.on('connection', (conn) => {
conn.on('error', (e) => {
// We expected connections to be closed when duplicates happen. On the
// closing side the error will be ERR_DUPLICATE, but on the other side
// the error will be an ECONNRESET - the error is not sent over the
// connection
const expectedError =
e.message === ERR_DUPLICATE || e.code === 'ECONNRESET'
t.ok(expectedError, 'connection closed with expected error')
})
conn.on('error', handleConnectionError.bind(null, t))
conns.push(conn)
if (conns.length >= nPeers - 1) onConnected()
})
await discovery.start()
return discovery
}

for (let p = 0; p < nPeers; p++) {
setTimeout(spawnPeer, Math.floor(Math.random() * period))
const deferred = pDefer()
promises.push(deferred.promise)
setTimeout(spawnPeer, Math.floor(Math.random() * period), deferred.resolve)
}

await delay(period + MDNS_WAIT_TIME)
// Wait for all peers to connect to at least nPeers - 1 peers (every other peer)
await Promise.all(promises)
// Wait another 1000ms for any deduplication
await delay(1000)

const peerIds = [...peersById.keys()]

Expand All @@ -159,3 +163,12 @@ async function testMultiple(t, { period, nPeers = 20 }) {
await Promise.all(stopPromises)
t.pass('teardown complete')
}

function handleConnectionError(t, e) {
// We expected connections to be closed when duplicates happen. On the
// closing side the error will be ERR_DUPLICATE, but on the other side
// the error will be an ECONNRESET - the error is not sent over the
// connection
const expectedError = e.message === ERR_DUPLICATE || e.code === 'ECONNRESET'
t.ok(expectedError, 'connection closed with expected error')
}