Skip to content

Commit

Permalink
fixup
Browse files Browse the repository at this point in the history
  • Loading branch information
ronag committed Sep 29, 2023
1 parent d6f1c7d commit 30ad6c0
Showing 1 changed file with 17 additions and 18 deletions.
35 changes: 17 additions & 18 deletions lib/internal/streams/readable.js
Original file line number Diff line number Diff line change
Expand Up @@ -378,11 +378,11 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
if ((state.state & kObjectMode) === 0) {
if (typeof chunk === 'string') {
encoding = encoding || ((state.state & kDefaultUTF8Encoding) !== 0 ? 'utf8' : state[kDefaultEncodingValue]);
if ((state.state | kEncoding) === 0 || state.encoding !== encoding) {
if (addToFront && state.encoding) {
if ((state.state & kEncoding) === 0 || state.encoding !== encoding) {
if (addToFront && (state.state & kEncoding) !== 0) {
// When unshifting, if state.encoding is set, we have to save
// the string in the BufferList with the state encoding.
chunk = Buffer.from(chunk, encoding).toString(state.encoding);
chunk = Buffer.from(chunk, encoding).toString(state[kEncodingValue]);
} else {
chunk = Buffer.from(chunk, encoding);
encoding = '';
Expand Down Expand Up @@ -412,7 +412,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
return false;
else
addChunk(stream, state, chunk, true);
} else if (state.ended) {
} else if ((state.state & kEnded) !== 0) {
errorOrDestroy(stream, new ERR_STREAM_PUSH_AFTER_EOF());
} else if ((state.state & (kDestroyed | kErrored)) !== 0) {
return false;
Expand All @@ -436,7 +436,7 @@ function readableAddChunk(stream, chunk, encoding, addToFront) {
// We can push more data if we are below the highWaterMark.
// Also, if we have no data yet, we can stand some more bytes.
// This is to work around cases where hwm=0, such as the repl.
return !state.ended &&
return (state.state & kEnded) === 0 &&
(state.length < state.highWaterMark || state.length === 0);
}

Expand Down Expand Up @@ -529,7 +529,7 @@ function howMuchToRead(n, state) {
}
if (n <= state.length)
return n;
return state.ended ? state.length : 0;
return (state.state & kEnded) !== 0 ? state.length : 0;
}

// You can override either this method, or the async _read(n) below.
Expand Down Expand Up @@ -561,8 +561,7 @@ Readable.prototype.read = function(n) {
state.length >= state.highWaterMark :
state.length > 0) ||
(state.state & kEnded) !== 0)) {
debug('read: emitReadable', state.length, state.ended);
if (state.length === 0 && state.ended)
if (state.length === 0 && (state.state & kEnded) !== 0)
endReadable(this);
else
emitReadable(this);
Expand Down Expand Up @@ -656,7 +655,7 @@ Readable.prototype.read = function(n) {
n = 0;
} else {
state.length -= n;
if (state.multiAwaitDrain) {
if ((state.state & kMultiAwaitDrain) !== 0) {
state.awaitDrainWriters.clear();
} else {
state.awaitDrainWriters = null;
Expand Down Expand Up @@ -686,7 +685,7 @@ Readable.prototype.read = function(n) {

function onEofChunk(stream, state) {
debug('onEofChunk');
if (state.ended) return;
if ((state.state & kEnded) !== 0) return;
if ((state.state & kDecoder) !== 0) {
const chunk = state[kDecoderValue].end();
if (chunk && chunk.length) {
Expand Down Expand Up @@ -810,8 +809,8 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
const state = this._readableState;

if (state.pipes.length === 1) {
if (!state.multiAwaitDrain) {
state.multiAwaitDrain = true;
if ((state.state & kMultiAwaitDrain) === 0) {
state.state |= kMultiAwaitDrain;
state.awaitDrainWriters = new SafeSet(
state.awaitDrainWriters ? [state.awaitDrainWriters] : [],
);
Expand All @@ -826,7 +825,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
dest !== process.stderr;

const endFn = doEnd ? onend : unpipe;
if (state.endEmitted)
if ((state.state & kEndEmitted) !== 0)
process.nextTick(endFn);
else
src.once('end', endFn);
Expand Down Expand Up @@ -885,7 +884,7 @@ Readable.prototype.pipe = function(dest, pipeOpts) {
if (state.pipes.length === 1 && state.pipes[0] === dest) {
debug('false write response, pause', 0);
state.awaitDrainWriters = dest;
state.multiAwaitDrain = false;
state.state &= ~kMultiAwaitDrain;
} else if (state.pipes.length > 1 && state.pipes.includes(dest)) {
debug('false write response, pause', state.awaitDrainWriters.size);
state.awaitDrainWriters.add(dest);
Expand Down Expand Up @@ -975,7 +974,7 @@ function pipeOnDrain(src, dest) {
if (state.awaitDrainWriters === dest) {
debug('pipeOnDrain', 1);
state.awaitDrainWriters = null;
} else if (state.multiAwaitDrain) {
} else if ((state.state & kMultiAwaitDrain) !== 0) {
debug('pipeOnDrain', state.awaitDrainWriters.size);
state.awaitDrainWriters.delete(dest);
}
Expand Down Expand Up @@ -1043,10 +1042,10 @@ Readable.prototype.on = function(ev, fn) {
state.state |= (kHasFlowing | kNeedReadable | kReadableListening);
state.state &= ~(kFlowing | kEmittedReadable);

debug('on readable', state.length, state.reading);
debug('on readable', state.length, (state.state & kReading) !== 0);
if (state.length) {
emitReadable(this);
} else if (!state.reading) {
} else if ((state.state & kReading) === 0) {
process.nextTick(nReadingNextTick, this);
}
}
Expand Down Expand Up @@ -1123,7 +1122,7 @@ Readable.prototype.resume = function() {
// for readable, but we still have to call
// resume().
state.state |= kHasFlowing;
if (!state.readableListening) {
if ((state.state & kReadableListening) === 0) {
state.state |= kFlowing;
} else {
state.state &= ~kFlowing;
Expand Down

0 comments on commit 30ad6c0

Please sign in to comment.