Skip to content
This repository has been archived by the owner on Aug 29, 2023. It is now read-only.

Commit

Permalink
fix: update to latest interfaces (#168)
Browse files Browse the repository at this point in the history
  • Loading branch information
achingbrain authored Feb 10, 2022
1 parent 9885823 commit c19462f
Show file tree
Hide file tree
Showing 4 changed files with 147 additions and 131 deletions.
12 changes: 5 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -129,25 +129,23 @@
"dep-check": "aegir dep-check dist/src/**/*.js dist/test/**/*.js",
"build": "tsc",
"pretest": "npm run build",
"test": "aegir test -f ./dist/test",
"test": "aegir test -f ./dist/test/*.js -f ./dist/test/**/*.js",
"test:node": "npm run test -- -t node --cov",
"test:electron-main": "npm run test -- -t electron-main",
"release": "semantic-release"
},
"dependencies": {
"@libp2p/utils": "^1.0.1",
"@libp2p/logger": "^1.0.2",
"@libp2p/utils": "^1.0.6",
"@multiformats/mafmt": "^11.0.0",
"@multiformats/multiaddr": "^10.1.1",
"abortable-iterator": "^4.0.2",
"debug": "^4.3.1",
"err-code": "^3.0.1",
"stream-to-it": "^0.2.2"
},
"devDependencies": {
"@libp2p/interface-compliance-tests": "^1.0.1",
"@libp2p/interfaces": "^1.0.0",
"@types/debug": "^4.1.5",
"@types/mocha": "^9.0.0",
"@libp2p/interface-compliance-tests": "^1.1.2",
"@libp2p/interfaces": "^1.3.2",
"aegir": "^36.1.3",
"it-pipe": "^2.0.3",
"sinon": "^13.0.0",
Expand Down
8 changes: 4 additions & 4 deletions src/index.ts
Original file line number Diff line number Diff line change
@@ -1,17 +1,17 @@
import net from 'net'
import * as mafmt from '@multiformats/mafmt'
import errCode from 'err-code'
import debug from 'debug'
import { logger } from '@libp2p/logger'
import { toMultiaddrConnection } from './socket-to-conn.js'
import { createListener } from './listener.js'
import { multiaddrToNetConfig } from './utils.js'
import { AbortError } from 'abortable-iterator'
import { CODE_CIRCUIT, CODE_P2P } from './constants.js'
import type { Transport, Upgrader, ListenerOptions } from '@libp2p/interfaces/transport'
import type { Transport, Upgrader, ListenerOptions, Listener } from '@libp2p/interfaces/transport'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { Socket } from 'net'

const log = debug('libp2p:tcp')
const log = logger('libp2p:tcp')

/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
Expand Down Expand Up @@ -125,7 +125,7 @@ export class TCP implements Transport<DialOptions, ListenerOptions> {
* anytime a new incoming Connection has been successfully upgraded via
* `upgrader.upgradeInbound`.
*/
createListener (options: ListenerOptions = {}) {
createListener (options: ListenerOptions = {}): Listener {
return createListener({ upgrader: this._upgrader, ...options })
}

Expand Down
254 changes: 136 additions & 118 deletions src/listener.ts
Original file line number Diff line number Diff line change
@@ -1,24 +1,18 @@
import net from 'net'
import { EventEmitter } from 'events'
import debug from 'debug'
import { logger } from '@libp2p/logger'
import { toMultiaddrConnection } from './socket-to-conn.js'
import { CODE_P2P } from './constants.js'
import {
getMultiaddrs,
multiaddrToNetConfig
} from './utils.js'
import { EventEmitter, CustomEvent } from '@libp2p/interfaces'
import type { Connection } from '@libp2p/interfaces/connection'
import type { MultiaddrConnection, Upgrader, Listener } from '@libp2p/interfaces/transport'
import type { MultiaddrConnection, Upgrader, Listener, ListenerEvents, ConnectionHandler } from '@libp2p/interfaces/transport'
import type { Server } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'

const log = Object.assign(
debug('libp2p:tcp:listener'),
{ error: debug('libp2p:tcp:listener:error') })

interface ServerWithMultiaddrConnections extends Server {
__connections: MultiaddrConnection[]
}
const log = logger('libp2p:tcp:listener')

/**
* Attempts to close the given maConn. If a failure occurs, it will be logged
Expand All @@ -36,135 +30,159 @@ interface Context {
upgrader: Upgrader
}

/**
* Create listener
*/
export function createListener (context: Context) {
const {
handler, upgrader
} = context
class TCPListener extends EventEmitter<ListenerEvents> implements Listener {
private peerId?: string
private listeningAddr?: Multiaddr
private readonly server: Server
private connections: MultiaddrConnection[]

constructor (upgrader: Upgrader, handler?: ConnectionHandler) {
super()

this.connections = []

this.server = net.createServer(socket => {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
})

let peerId: string | null
let listeningAddr: Multiaddr
let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, { listeningAddr: this.listeningAddr })
} catch (err) {
log.error('inbound connection failed', err)
return
}

const server: ServerWithMultiaddrConnections = Object.assign(net.createServer(socket => {
// Avoid uncaught errors caused by unstable connections
socket.on('error', err => {
log('socket error', err)
log('new inbound connection %s', maConn.remoteAddr)
try {
upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)

this.trackConn(maConn, socket)

if (handler != null) {
handler(conn)
}

this.dispatchEvent(new CustomEvent('connection', {
detail: conn
}))
})
.catch(async err => {
log.error('inbound connection failed', err)

await attemptClose(maConn)
})
.catch(err => {
log.error('closing inbound connection failed', err)
})
} catch (err) {
log.error('inbound connection failed', err)

attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
})
}
})
this.server.on('error', err => {
this.dispatchEvent(new CustomEvent('error', {
detail: err
}))
})
this.server.on('close', () => {
this.dispatchEvent(new CustomEvent('close'))
})
this.server.on('listening', () => {
this.dispatchEvent(new CustomEvent('listening'))
})
}

let maConn: MultiaddrConnection
try {
maConn = toMultiaddrConnection(socket, { listeningAddr })
} catch (err) {
log.error('inbound connection failed', err)
return
getAddrs () {
let addrs: Multiaddr[] = []
const address = this.server.address()

if (address == null) {
throw new Error('Listener is not ready yet')
}

log('new inbound connection %s', maConn.remoteAddr)
try {
upgrader.upgradeInbound(maConn)
.then((conn) => {
log('inbound connection %s upgraded', maConn.remoteAddr)
trackConn(server, maConn, socket)

if (handler != null) {
handler(conn)
}

listener.emit('connection', conn)
})
.catch(async err => {
log.error('inbound connection failed', err)

await attemptClose(maConn)
})
.catch(err => {
log.error('closing inbound connection failed', err)
})
} catch (err) {
log.error('inbound connection failed', err)

attemptClose(maConn)
.catch(err => {
log.error('closing inbound connection failed', err)
})
if (typeof address === 'string') {
throw new Error('Incorrect server address type')
}
}),
// Keep track of open connections to destroy in case of timeout
{ __connections: [] })

const listener: Listener = Object.assign(new EventEmitter(), {
getAddrs: () => {
let addrs: Multiaddr[] = []
const address = server.address()
if (this.listeningAddr == null) {
throw new Error('Listener is not ready yet')
}

if (address == null) {
throw new Error('Listener is not ready yet')
}
// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (this.listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}

if (typeof address === 'string') {
throw new Error('Incorrect server address type')
}
return addrs.map(ma => this.peerId != null ? ma.encapsulate(`/p2p/${this.peerId}`) : ma)
}

// Because TCP will only return the IPv6 version
// we need to capture from the passed multiaddr
if (listeningAddr.toString().startsWith('/ip4')) {
addrs = addrs.concat(getMultiaddrs('ip4', address.address, address.port))
} else if (address.family === 'IPv6') {
addrs = addrs.concat(getMultiaddrs('ip6', address.address, address.port))
}
async listen (ma: Multiaddr) {
const peerId = ma.getPeerId()

return addrs.map(ma => peerId != null ? ma.encapsulate(`/p2p/${peerId}`) : ma)
},
listen: async (ma: Multiaddr) => {
listeningAddr = ma
peerId = ma.getPeerId()
if (peerId == null) {
ma = ma.decapsulateCode(CODE_P2P)
} else {
this.peerId = peerId
}

if (peerId == null) {
listeningAddr = ma.decapsulateCode(CODE_P2P)
}
this.listeningAddr = ma

return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(listeningAddr)
server.listen(options, (err?: any) => {
if (err != null) {
return reject(err)
}
log('Listening on %s', server.address())
resolve()
})
return await new Promise<void>((resolve, reject) => {
const options = multiaddrToNetConfig(ma)
this.server.listen(options, (err?: any) => {
if (err != null) {
return reject(err)
}
log('Listening on %s', this.server.address())
resolve()
})
},
close: async () => {
if (!server.listening) {
return
}

await Promise.all([
server.__connections.map(async maConn => await attemptClose(maConn))
])
})
}

await new Promise<void>((resolve, reject) => {
server.close(err => (err != null) ? reject(err) : resolve())
})
async close () {
if (!this.server.listening) {
return
}
})

server
.on('listening', () => listener.emit('listening'))
.on('error', err => listener.emit('error', err))
.on('close', () => listener.emit('close'))
await Promise.all([
this.connections.map(async maConn => await attemptClose(maConn))
])

return listener
}
await new Promise<void>((resolve, reject) => {
this.server.close(err => (err != null) ? reject(err) : resolve())
})
}

trackConn (maConn: MultiaddrConnection, socket: net.Socket) {
this.connections.push(maConn)

function trackConn (server: ServerWithMultiaddrConnections, maConn: MultiaddrConnection, socket: net.Socket) {
server.__connections.push(maConn)
const untrackConn = () => {
this.connections = this.connections.filter(c => c !== maConn)
}

const untrackConn = () => {
server.__connections = server.__connections.filter(c => c !== maConn)
socket.once('close', untrackConn)
}
}

/**
* Create listener
*/
export function createListener (context: Context) {
const {
handler, upgrader
} = context

socket.once('close', untrackConn)
return new TCPListener(upgrader, handler)
}
4 changes: 2 additions & 2 deletions src/socket-to-conn.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { abortableSource } from 'abortable-iterator'
import debug from 'debug'
import { logger } from '@libp2p/logger'
// @ts-expect-error no types
import toIterable from 'stream-to-it'
import { ipPortToMultiaddr as toMultiaddr } from '@libp2p/utils/ip-port-to-multiaddr'
Expand All @@ -8,7 +8,7 @@ import type { Socket } from 'net'
import type { Multiaddr } from '@multiformats/multiaddr'
import type { MultiaddrConnection } from '@libp2p/interfaces/transport'

const log = debug('libp2p:tcp:socket')
const log = logger('libp2p:tcp:socket')

interface ToConnectionOptions {
listeningAddr?: Multiaddr
Expand Down

0 comments on commit c19462f

Please sign in to comment.