Skip to content

Commit

Permalink
Fix race condition in SyncMessagePort (#327)
Browse files Browse the repository at this point in the history
Co-authored-by: Natalie Weizenbaum <[email protected]>
  • Loading branch information
ntkme and nex3 authored Sep 5, 2024
1 parent b5476bc commit 7b29467
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 7 deletions.
19 changes: 19 additions & 0 deletions lib/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@ describe('SyncMessagePort', () => {
);

expect(port.receiveMessage()).toEqual('done!');
expect(port.receiveMessage).toThrow();
});

it('multiple times before the other endpoint starts reading', () => {
Expand All @@ -52,6 +53,24 @@ describe('SyncMessagePort', () => {
expect(port2.receiveMessage()).toEqual('message3');
expect(port2.receiveMessage()).toEqual('message4');
});

it('multiple times and close', () => {
const channel = SyncMessagePort.createChannel();
const port = new SyncMessagePort(channel.port1);

spawnWorker(
`
port.postMessage('message1');
port.postMessage('done!');
port.close();
`,
channel.port2
);

expect(port.receiveMessage()).toEqual('message1');
expect(port.receiveMessage()).toEqual('done!');
expect(port.receiveMessage).toThrow();
});
});

describe('with an asynchronous listener', () => {
Expand Down
18 changes: 11 additions & 7 deletions lib/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,19 @@ enum BufferState {
* the buffer to this state so that it can use `Atomics.wait()` to be notified
* when it switches to `MessageSent`.
*/
AwaitingMessage,
AwaitingMessage = 0b00,
/**
* The state indicating that a message has been sent. Whenever an endpoint
* sends a message, it'll set the buffer to this state so that the other
* endpoint's `Atomics.wait()` call terminates.
*/
MessageSent,
MessageSent = 0b01,
/**
* The state indicating that the channel has been closed. This never
* transitions to any other states.
* The bitmask indicating that the channel has been closed. This is masked on
* top of AwaitingMessage and MessageSent state. It never transitions to any
* other states once closed.
*/
Closed,
Closed = 0b10,
}

/**
Expand Down Expand Up @@ -158,13 +159,16 @@ export class SyncMessagePort extends EventEmitter {
message = receiveMessageOnPort(this.port);
if (message) return message.message;

assert.equal(Atomics.load(this.buffer, 0), BufferState.Closed);
// Update the state to 0b10 after the last message is consumed.
const oldState = Atomics.and(this.buffer, 0, BufferState.Closed);
// Assert the old state was either 0b10 or 0b11.
assert.equal(oldState & BufferState.Closed, BufferState.Closed);
throw new Error("The SyncMessagePort's channel is closed.");
}

/** See `MessagePort.close()`. */
close(): void {
Atomics.store(this.buffer, 0, BufferState.Closed);
Atomics.or(this.buffer, 0, BufferState.Closed);
this.port.close();
}
}

0 comments on commit 7b29467

Please sign in to comment.