Skip to content
This repository has been archived by the owner on Jun 26, 2023. It is now read-only.

Commit

Permalink
fix: make stream return types synchronous (#217)
Browse files Browse the repository at this point in the history
Makes the `close`/`closeRead`/`closeWrite` methods synchronous the same as `abort`/`reset`
  • Loading branch information
achingbrain authored May 23, 2022
1 parent 82c34d4 commit 2fe61b7
Show file tree
Hide file tree
Showing 6 changed files with 23 additions and 27 deletions.
6 changes: 1 addition & 5 deletions packages/libp2p-connection/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -155,11 +155,7 @@ export class ConnectionImpl implements Connection {

// close all streams - this can throw if we're not multiplexed
try {
await Promise.all(
this.streams.map(async s => await s.close().catch(err => {
log.error(err)
}))
)
this.streams.forEach(s => s.close())
} catch (err) {
log.error(err)
}
Expand Down
12 changes: 6 additions & 6 deletions packages/libp2p-connection/test/compliance.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,14 +34,14 @@ describe('compliance tests', () => {
newStream: async (protocols) => {
const id = `${streamId++}`
const stream: Stream = {
...pair(),
close: async () => {
await stream.sink(async function * () {}())
...pair<Uint8Array>(),
close: () => {
void stream.sink(async function * () {}())
connection.removeStream(stream.id)
},
closeRead: async () => {},
closeWrite: async () => {
await stream.sink(async function * () {}())
closeRead: () => {},
closeWrite: () => {
void stream.sink(async function * () {}())
},
id,
abort: () => {},
Expand Down
10 changes: 5 additions & 5 deletions packages/libp2p-connection/test/index.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -54,12 +54,12 @@ describe('connection tests', () => {
const id = `${streamId++}`
const stream: Stream = {
...pair<Uint8Array>(),
close: async () => {
await stream.sink(async function * () {}()).catch()
close: () => {
void stream.sink(async function * () {}())
},
closeRead: async () => {},
closeWrite: async () => {
await stream.sink(async function * () {}())
closeRead: () => {},
closeWrite: () => {
void stream.sink(async function * () {}())
},
id,
abort: () => {},
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -169,9 +169,9 @@ export function mockConnection (maConn: MultiaddrConnection, opts: MockConnectio
export function mockStream (stream: Duplex<Uint8Array>): Stream {
return {
...stream,
close: async () => {},
closeRead: async () => {},
closeWrite: async () => {},
close: () => {},
closeRead: () => {},
closeWrite: () => {},
abort: () => {},
reset: () => {},
timeline: {
Expand Down
10 changes: 5 additions & 5 deletions packages/libp2p-interface-compliance-tests/src/mocks/muxer.ts
Original file line number Diff line number Diff line change
Expand Up @@ -191,22 +191,22 @@ class MuxedStream {
source: this.input,

// Close for reading
close: async () => {
close: () => {
this.input.end()
},

closeRead: async () => {
closeRead: () => {
this.input.end()
},

closeWrite: async () => {
closeWrite: () => {
this.input.end()
},

// Close for reading and writing (local error)
abort: (err?: Error) => {
// End the source with the passed error
this.input.end()
this.input.end(err)
this.abortController.abort()
onSinkEnd(err)
},
Expand Down Expand Up @@ -251,7 +251,7 @@ class MockMuxer implements StreamMuxer {
this.log('closing muxed streams')
for (const stream of this.streams) {
if (err == null) {
void stream.close().catch()
stream.close()
} else {
stream.abort(err)
}
Expand Down
6 changes: 3 additions & 3 deletions packages/libp2p-interfaces/src/connection/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -34,17 +34,17 @@ export interface Stream extends Duplex<Uint8Array> {
/**
* Close a stream for reading and writing
*/
close: () => Promise<void>
close: () => void

/**
* Close a stream for reading only
*/
closeRead: () => Promise<void>
closeRead: () => void

/**
* Close a stream for writing only
*/
closeWrite: () => Promise<void>
closeWrite: () => void

/**
* Call when a local error occurs, should close the stream for reading and writing
Expand Down

0 comments on commit 2fe61b7

Please sign in to comment.