Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: close streams gracefully #344

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 20 additions & 21 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -68,41 +68,40 @@
"prepublish": "npm run build"
},
"dependencies": {
"@libp2p/crypto": "^1.0.11",
"@libp2p/interface-connection-encrypter": "^4.0.0",
"@libp2p/interface-keys": "^1.0.6",
"@libp2p/interface-metrics": "^4.0.4",
"@libp2p/interface-peer-id": "^2.0.0",
"@libp2p/logger": "^2.0.5",
"@libp2p/peer-id": "^2.0.0",
"@libp2p/crypto": "^2.0.0",
"@libp2p/interface": "^0.1.0",
"@libp2p/logger": "^3.0.0",
"@libp2p/peer-id": "^3.0.0",
"@noble/ciphers": "^0.1.4",
"@noble/curves": "^1.1.0",
"@noble/hashes": "^1.3.1",
"@noble/ciphers": "^0.1.4",
"it-byte-stream": "^1.0.0",
"it-length-prefixed": "^9.0.1",
"it-pair": "^2.0.2",
"it-pb-stream": "^4.0.1",
"it-length-prefixed-stream": "^1.0.0",
"it-pair": "^2.0.6",
"it-pipe": "^3.0.1",
"it-stream-types": "^2.0.1",
"protons-runtime": "^5.0.0",
"uint8arraylist": "^2.3.2",
"uint8arrays": "^4.0.2"
"uint8arraylist": "^2.4.3",
"uint8arrays": "^4.0.4"
},
"devDependencies": {
"@chainsafe/libp2p-yamux": "^4.0.1",
"@libp2p/daemon-client": "^6.0.3",
"@libp2p/daemon-server": "^5.0.2",
"@libp2p/interface-connection-encrypter-compliance-tests": "^5.0.0",
"@libp2p/interop": "^8.0.1",
"@libp2p/peer-id-factory": "^2.0.0",
"@libp2p/tcp": "^7.0.0",
"@chainsafe/libp2p-yamux": "^5.0.0",
"@libp2p/daemon-client": "^7.0.0",
"@libp2p/daemon-server": "^6.0.0",
"@libp2p/interface-compliance-tests": "^4.0.0",
"@libp2p/interface-peer-id": "^2.0.2",
"@libp2p/interop": "^9.0.0",
"@libp2p/peer-id-factory": "^3.0.0",
"@libp2p/tcp": "^8.0.0",
"@multiformats/multiaddr": "^12.1.0",
"@types/sinon": "^10.0.14",
"aegir": "^39.0.5",
"aegir": "^40.0.8",
"benchmark": "^2.1.4",
"execa": "^7.0.0",
"go-libp2p": "^1.0.3",
"iso-random-stream": "^2.0.2",
"libp2p": "0.45.0",
"libp2p": "^0.46.0",
"mkdirp": "^3.0.0",
"p-defer": "^4.0.0",
"protons": "^7.0.0",
Expand Down
2 changes: 1 addition & 1 deletion src/@types/handshake-interface.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import type { bytes } from './basic.js'
import type { NoiseSession } from './handshake.js'
import type { NoiseExtensions } from '../proto/payload.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerId } from '@libp2p/interface/peer-id'

export interface IHandshake {
session: NoiseSession
Expand Down
2 changes: 1 addition & 1 deletion src/@types/libp2p.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { bytes32 } from './basic.js'
import type { NoiseExtensions } from '../proto/payload.js'
import type { ConnectionEncrypter } from '@libp2p/interface-connection-encrypter'
import type { ConnectionEncrypter } from '@libp2p/interface/connection-encrypter'

export interface KeyPair {
publicKey: bytes32
Expand Down
22 changes: 11 additions & 11 deletions src/handshake-xx.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { InvalidCryptoExchangeError, UnexpectedPeerError } from '@libp2p/interface-connection-encrypter/errors'
import { InvalidCryptoExchangeError, UnexpectedPeerError } from '@libp2p/interface/errors'
import { decode0, decode1, decode2, encode0, encode1, encode2 } from './encoder.js'
import { XX } from './handshakes/xx.js'
import {
Expand All @@ -20,8 +20,8 @@ import type { CipherState, NoiseSession } from './@types/handshake.js'
import type { KeyPair } from './@types/libp2p.js'
import type { ICryptoInterface } from './crypto.js'
import type { NoiseExtensions } from './proto/payload.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { ProtobufStream } from 'it-pb-stream'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { LengthPrefixedStream } from 'it-length-prefixed-stream'

export class XXHandshake implements IHandshake {
public isInitiator: boolean
Expand All @@ -30,7 +30,7 @@ export class XXHandshake implements IHandshake {
public remoteExtensions: NoiseExtensions = { webtransportCerthashes: [] }

protected payload: bytes
protected connection: ProtobufStream
protected connection: LengthPrefixedStream
protected xx: XX
protected staticKeypair: KeyPair

Expand All @@ -42,7 +42,7 @@ export class XXHandshake implements IHandshake {
prologue: bytes32,
crypto: ICryptoInterface,
staticKeypair: KeyPair,
connection: ProtobufStream,
connection: LengthPrefixedStream,
remotePeer?: PeerId,
handshake?: XX
) {
Expand All @@ -64,12 +64,12 @@ export class XXHandshake implements IHandshake {
if (this.isInitiator) {
logger.trace('Stage 0 - Initiator starting to send first message.')
const messageBuffer = this.xx.sendMessage(this.session, new Uint8Array(0))
this.connection.writeLP(encode0(messageBuffer))
await this.connection.write(encode0(messageBuffer))
logger.trace('Stage 0 - Initiator finished sending first message.')
logLocalEphemeralKeys(this.session.hs.e)
} else {
logger.trace('Stage 0 - Responder waiting to receive first message...')
const receivedMessageBuffer = decode0((await this.connection.readLP()).subarray())
const receivedMessageBuffer = decode0((await this.connection.read()).subarray())
const { valid } = this.xx.recvMessage(this.session, receivedMessageBuffer)
if (!valid) {
throw new InvalidCryptoExchangeError('xx handshake stage 0 validation fail')
Expand All @@ -83,7 +83,7 @@ export class XXHandshake implements IHandshake {
public async exchange (): Promise<void> {
if (this.isInitiator) {
logger.trace('Stage 1 - Initiator waiting to receive first message from responder...')
const receivedMessageBuffer = decode1((await this.connection.readLP()).subarray())
const receivedMessageBuffer = decode1((await this.connection.read()).subarray())
const { plaintext, valid } = this.xx.recvMessage(this.session, receivedMessageBuffer)
if (!valid) {
throw new InvalidCryptoExchangeError('xx handshake stage 1 validation fail')
Expand All @@ -106,7 +106,7 @@ export class XXHandshake implements IHandshake {
} else {
logger.trace('Stage 1 - Responder sending out first message with signed payload and static key.')
const messageBuffer = this.xx.sendMessage(this.session, this.payload)
this.connection.writeLP(encode1(messageBuffer))
await this.connection.write(encode1(messageBuffer))
logger.trace('Stage 1 - Responder sent the second handshake message with signed payload.')
logLocalEphemeralKeys(this.session.hs.e)
}
Expand All @@ -117,11 +117,11 @@ export class XXHandshake implements IHandshake {
if (this.isInitiator) {
logger.trace('Stage 2 - Initiator sending third handshake message.')
const messageBuffer = this.xx.sendMessage(this.session, this.payload)
this.connection.writeLP(encode2(messageBuffer))
await this.connection.write(encode2(messageBuffer))
logger.trace('Stage 2 - Initiator sent message with signed payload.')
} else {
logger.trace('Stage 2 - Responder waiting for third handshake message...')
const receivedMessageBuffer = decode2((await this.connection.readLP()).subarray())
const receivedMessageBuffer = decode2((await this.connection.read()).subarray())
const { plaintext, valid } = this.xx.recvMessage(this.session, receivedMessageBuffer)
if (!valid) {
throw new InvalidCryptoExchangeError('xx handshake stage 2 validation fail')
Expand Down
2 changes: 1 addition & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import { Noise } from './noise.js'
import type { NoiseInit } from './noise.js'
import type { NoiseExtensions } from './proto/payload.js'
import type { ConnectionEncrypter } from '@libp2p/interface-connection-encrypter'
import type { ConnectionEncrypter } from '@libp2p/interface/connection-encrypter'
export type { ICryptoInterface } from './crypto.js'
export { pureJsCrypto } from './crypto/js.js'

Expand Down
2 changes: 1 addition & 1 deletion src/metrics.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import type { Counter, Metrics } from '@libp2p/interface-metrics'
import type { Counter, Metrics } from '@libp2p/interface/metrics'

export type MetricsRegistry = Record<string, Counter>

Expand Down
16 changes: 8 additions & 8 deletions src/noise.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { decode } from 'it-length-prefixed'
import { lpStream, type LengthPrefixedStream } from 'it-length-prefixed-stream'
import { duplexPair } from 'it-pair/duplex'
import { pbStream, type ProtobufStream } from 'it-pb-stream'
import { pipe } from 'it-pipe'
import { NOISE_MSG_MAX_LENGTH_BYTES } from './constants.js'
import { pureJsCrypto } from './crypto/js.js'
Expand All @@ -14,13 +14,13 @@ import type { IHandshake } from './@types/handshake-interface.js'
import type { INoiseConnection, KeyPair } from './@types/libp2p.js'
import type { ICryptoInterface } from './crypto.js'
import type { NoiseExtensions } from './proto/payload.js'
import type { SecuredConnection } from '@libp2p/interface-connection-encrypter'
import type { Metrics } from '@libp2p/interface-metrics'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { SecuredConnection } from '@libp2p/interface/connection-encrypter'
import type { Metrics } from '@libp2p/interface/metrics'
import type { PeerId } from '@libp2p/interface/peer-id'
import type { Duplex, Source } from 'it-stream-types'

interface HandshakeParams {
connection: ProtobufStream
connection: LengthPrefixedStream
isInitiator: boolean
localPeer: PeerId
remotePeer?: PeerId
Expand Down Expand Up @@ -71,7 +71,7 @@ export class Noise implements INoiseConnection {
* @returns {Promise<SecuredConnection>}
*/
public async secureOutbound (localPeer: PeerId, connection: Duplex<AsyncGenerator<Uint8Array>, AsyncIterable<Uint8Array>, Promise<void>>, remotePeer?: PeerId): Promise<SecuredConnection<NoiseExtensions>> {
const wrappedConnection = pbStream(
const wrappedConnection = lpStream(
connection,
{
lengthEncoder: uint16BEEncode,
Expand Down Expand Up @@ -103,7 +103,7 @@ export class Noise implements INoiseConnection {
* @returns {Promise<SecuredConnection>}
*/
public async secureInbound (localPeer: PeerId, connection: Duplex<AsyncGenerator<Uint8Array>, AsyncIterable<Uint8Array>, Promise<void>>, remotePeer?: PeerId): Promise<SecuredConnection<NoiseExtensions>> {
const wrappedConnection = pbStream(
const wrappedConnection = lpStream(
connection,
{
lengthEncoder: uint16BEEncode,
Expand Down Expand Up @@ -171,7 +171,7 @@ export class Noise implements INoiseConnection {
}

private async createSecureConnection (
connection: ProtobufStream<Duplex<AsyncGenerator<Uint8Array>, AsyncIterable<Uint8Array>, Promise<void>>>,
connection: LengthPrefixedStream<Duplex<AsyncGenerator<Uint8Array>, AsyncIterable<Uint8Array>, Promise<void>>>,
handshake: IHandshake
): Promise<Duplex<AsyncGenerator<Uint8Array>, Source<Uint8Array>, Promise<void>>> {
// Create encryption box/unbox wrapper
Expand Down
2 changes: 1 addition & 1 deletion src/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import { concat as uint8ArrayConcat } from 'uint8arrays/concat'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { type NoiseExtensions, NoiseHandshakePayload } from './proto/payload.js'
import type { bytes } from './@types/basic.js'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerId } from '@libp2p/interface/peer-id'

export async function getPayload (
localPeer: PeerId,
Expand Down
2 changes: 1 addition & 1 deletion test/compliance.spec.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import tests from '@libp2p/interface-connection-encrypter-compliance-tests'
import tests from '@libp2p/interface-compliance-tests/connection-encryption'
import { Noise } from '../src/noise.js'

describe('spec compliance tests', function () {
Expand Down
2 changes: 1 addition & 1 deletion test/fixtures/peer.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { createEd25519PeerId, createFromJSON } from '@libp2p/peer-id-factory'
import type { PeerId } from '@libp2p/interface-peer-id'
import type { PeerId } from '@libp2p/interface/peer-id'

// ed25519 keys
const peers = [{
Expand Down
12 changes: 6 additions & 6 deletions test/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
import { expect } from 'aegir/chai'
import { lpStream } from 'it-length-prefixed-stream'
import { duplexPair } from 'it-pair/duplex'
import { pbStream } from 'it-pb-stream'
import sinon from 'sinon'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import { noise } from '../src/index.js'
import { Noise } from '../src/noise.js'
import { createPeerIdsFromFixtures } from './fixtures/peer.js'
import type { Metrics } from '@libp2p/interface-metrics'
import type { Metrics } from '@libp2p/interface/metrics'

function createCounterSpy (): ReturnType<typeof sinon.spy> {
return sinon.spy({
Expand Down Expand Up @@ -41,11 +41,11 @@ describe('Index', () => {
noiseInit.secureOutbound(localPeer, outboundConnection, remotePeer),
noiseResp.secureInbound(remotePeer, inboundConnection, localPeer)
])
const wrappedInbound = pbStream(inbound.conn)
const wrappedOutbound = pbStream(outbound.conn)
const wrappedInbound = lpStream(inbound.conn)
const wrappedOutbound = lpStream(outbound.conn)

wrappedOutbound.writeLP(uint8ArrayFromString('test'))
await wrappedInbound.readLP()
await wrappedOutbound.write(uint8ArrayFromString('test'))
await wrappedInbound.read()
expect(metricsRegistry.get('libp2p_noise_xxhandshake_successes_total')?.increment.callCount).to.equal(1)
expect(metricsRegistry.get('libp2p_noise_xxhandshake_error_total')?.increment.callCount).to.equal(0)
expect(metricsRegistry.get('libp2p_noise_encrypted_packets_total')?.increment.callCount).to.equal(1)
Expand Down
Loading