-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Implement new writable stream #9
Conversation
lib/writable.js
Outdated
} | ||
} | ||
|
||
inherits(Writable, EventEmitter); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
class Writable extends EventEmitter
at the declaration site?
lib/writable.js
Outdated
this.encoding = encoding; | ||
}; | ||
class WritableBuffer { | ||
constructor( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Unneeded line break around argument.
lib/writable.js
Outdated
constructor( | ||
highWaterMark | ||
) { | ||
this.pointer = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
offset?
lib/writable.js
Outdated
} | ||
|
||
add(chunk) { | ||
const writeSize = this.buffer.length - this.pointer; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remainingSpace, spaceLeft, remainingSize maybe? As writeSize is chunk.length IMO.
lib/writable.js
Outdated
highWaterMark = DEFAULT_HIGH_WATERMARK, | ||
defaultEncoding = 'utf-8', | ||
write = null, | ||
writev = null |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There arguments should be reversed, the defaults go last.
lib/writable.js
Outdated
this.writableLength = 0; | ||
this.doWrite(this.buffer.getData()); | ||
}; | ||
if (this._writev) this._writev(data, cb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_writev
is used to write an array of data at once, but you pass just one big buffer here IIUC. This won't give any benefits.
lib/writable.js
Outdated
this.writing = false; | ||
return; | ||
} | ||
this.writableLength = 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this length should be tracked in your WritableBuffer instance.
lib/writable.js
Outdated
this.emit('close'); | ||
|
||
const err = new Error('[ERR_STREAM_DESTROYED]'); | ||
this.write = (...args) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why don't you just set destroyed
and check it in the write
function, I don't think it's worth overriding a function for such a use case.
lib/writable.js
Outdated
} | ||
|
||
const err = new Error('[ERR_STREAM_WRITE_AFTER_END]'); | ||
this.write = (...args) => { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as the comment in destroy
.
lib/writable.js
Outdated
process.nextTick(cb, err); | ||
}; | ||
|
||
this.emit('finish'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think 'finish' should only be emitted after all of the data has been sent to the underlying system. (i.e. similar to https://nodejs.org/api/stream.html#stream_event_finish)
@lundibundi please, reviewe after refactoring. |
997934e
to
1dc3ecc
Compare
@lundibundi please reviewe. |
lib/writable.js
Outdated
add(chunk) { | ||
const remainingSize = this.buffer.length - this.offset; | ||
|
||
if (chunk.length < this.remainingSize) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
<=
lib/writable.js
Outdated
const currentBufferChunk = chunk.slice(0, this.remainingSize); | ||
this.writeChunk(currentBufferChunk); | ||
this.queue.push(this.buffer); | ||
this.buffer = Buffer.alloc(this.buffer.length); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.buffer = Buffer.alloc(this.highWaterMark);
lib/writable.js
Outdated
} | ||
|
||
if (typeof cb !== 'function') { | ||
cb = () => {}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Use common.safeCallback
or common.once
. But safeCallback
takes array args
so we may need safeFunction
:
const safeFunction = fn => typeof fn === 'function' ? fn : emptiness;
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you want to add a safeFunction to a stream library or to common?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why does this assignment is even needed? Can't you just add
if (cb) ...
on line 117 and later on in this function?
Also, IMO having empty callback here is harmful as
- it will grow the
callbacks
array needlessly - you will have it scheduled on nextTick in case of error
process.nextTick(cb, error);
- it makes passing errors to the callback harder as you may accidentally pass an error to the empty function thus ignoring it.
lib/writable.js
Outdated
} else { | ||
this._write(data, cb); | ||
} | ||
callbacks.forEach(cb => cb()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should be run in the cb
as callback call means that the data has been written, not that it was passed to the underlying backend (flushed). (you can check out node docs or _stream_writable/doWrite & _stream_writable/onwrite).
Also, @bugagashenkj could you add some tests? |
9207dfd
to
2afbe12
Compare
@lundibundi please reviewe. |
lib/writable.js
Outdated
|
||
writeChunk(chunk, encoding) { | ||
if (typeof chunk === 'string') { | ||
this.buffer.write(chunk, this.offset, encoding); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this can write more than chunk.length
due to the encoding. You have to use the return value of the write
call.
lib/writable.js
Outdated
this.buffer = Buffer.alloc(highWaterMark); | ||
} | ||
|
||
writeChunk(chunk, encoding) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should probably be hidden under Symbol.
lib/writable.js
Outdated
this.defaultEncoding = encoding; | ||
} | ||
|
||
error(err, cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also, private?
lib/writable.js
Outdated
|
||
if (this.destroyed) { | ||
const err = new Error('[ERR_STREAM_DESTROYED]'); | ||
this.emit(err, cb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this.emit(err, cb); | |
this.error(err, cb); |
?
test/writable.js
Outdated
metatests.testSync('Writable / destroy', test => { | ||
const [stream, ] = createStream(5); | ||
|
||
stream.on('error', err => { test.isError(err); }); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should use test.plan in this test.
d301605
to
ca1dbd4
Compare
lib/writable.js
Outdated
this.buffer = Buffer.alloc(highWaterMark); | ||
} | ||
|
||
[writeChunkFuncName](chunk) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: I think writeChunkSymbol
will be clearer.
lib/writable.js
Outdated
|
||
uncork() { | ||
if (this.corked) this.corked--; | ||
if (!this.corked && this.writableLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!this.corked && this.writableLength) { | |
if (this.corked === 0 && this.writableLength) { |
Also, you should also add check to avoud this.corked become <0.
lib/writable.js
Outdated
this[errorFuncName](err); | ||
} | ||
|
||
validChunk(chunk, cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also private?
lib/writable.js
Outdated
}; | ||
|
||
if (this._writev) { | ||
if (Array.isArray(data) && data.length > 1) this._writev(data, cb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think there is any merit in checking data.length > 1.
824c0cc
to
3505b5e
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM with a few nits.
Should we add/make benchmarks here or land and then work on those?
/cc @tshemsedinov
lib/writable.js
Outdated
this.buffer = Buffer.alloc(highWaterMark); | ||
} | ||
|
||
[writeChunkSymbol](chunk) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nit: actually, you don't export this class, so there is no need ti hide it's methods behind the symbols, though that's up to you.
lib/writable.js
Outdated
this.defaultEncoding = encoding; | ||
} | ||
|
||
[errorFuncName](err, cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
errorSymbol or errorFuncSymbol?
lib/writable.js
Outdated
} | ||
|
||
if (this.ended) { | ||
const err = new Error('[ERR_STREAM_WRITE_AFTER_END]'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Forgot about errors.
Why do you have errors as strings? Those should be potentially be separate error classes and each have unique code.
@lundibundi please review. |
f795dde
to
245d50e
Compare
lib/fs-writable.js
Outdated
ERR_OUT_OF_RANGE | ||
} = require('./errors'); | ||
|
||
const { FSReqCallback, writeBuffers } = process.binding('fs'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
process.binding
is deprecated in node 11 and will most likely be removed in node 12
nodejs/node#22478 and
https://github.com/nodejs/node/blob/master/doc/changelogs/CHANGELOG_V11.md#2018-10-23-version-1100-current-jasnell
General
Use of process.binding() has been deprecated. Userland code using process.binding() should re-evaluate that use and begin migrating. If there are no supported API alternatives, please open an issue in the Node.js GitHub repository so that a suitable alternative may be discussed.
Could you post the results of benchmarks with and without using those methods?
https://gist.github.com/bugagashenkj/a0f7cd4929d45e0c03ee11e19b3a87cf |
@lundibundi please review. |
8669845
to
4862daa
Compare
lib/fs-writable.js
Outdated
} | ||
|
||
fs.write(this.fd, data, 0, data.length, this.pos, (error, bytes) => { | ||
if (error) this[errorSymbol](error, cb); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missed return
?
lib/fs-writable.js
Outdated
er = er || err; | ||
cb(er); | ||
this.closed = true; | ||
if (!er) this.emit('close'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's the reason behind not emitting 'close' here but still emitting it in case of error in [errorSymbol]
?
lib/fs-writable.js
Outdated
this.emit('error', error); | ||
this.emit('close'); | ||
} | ||
if (cb) cb(error); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now that I think about it, the cb
should be callback for an operation that 'erroed' so IMO it should be called at the same time or before 'error'
event.
SO this should probably be
if (cb) cb(error);
if (this.autoClose) {
this.destroy(error);
} else {
this.emit('error', error);
this.emit('close');
}
lib/fs-writable.js
Outdated
}); | ||
} | ||
|
||
[closeSymbol](cb) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this method unused?
lib/writable.js
Outdated
|
||
uncork() { | ||
if (this.corked > 0) this.corked--; | ||
if (this.corked === 0 && this.writableLength) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer to be explicit
if (this.corked === 0 && this.writableLength) { | |
if (this.corked === 0 && this.writableLength > 0) { |
112f36d
to
fa8c99d
Compare
fa8c99d
to
24597c9
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer callback
instead of cb
in most cases except those where we have two callback
s in enclosed contexts
lib/errors.js
Outdated
@@ -0,0 +1,68 @@ | |||
'use strict'; | |||
|
|||
class ERR_INVALID_ARG_TYPE extends TypeError { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'd prefer UpperCamelCase InvalidArgTypeError
lib/writable.js
Outdated
} | ||
encoding = encoding.toLowerCase(); | ||
if (!Buffer.isEncoding(encoding)) | ||
throw new ERR_UNKNOWN_ENCODING(encoding); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
if (!Buffer.isEncoding(encoding))
throw new ERR_UNKNOWN_ENCODING(encoding);
}
lib/writable.js
Outdated
if (this.ending) { | ||
this.ended = true; | ||
if (this._final) { | ||
this._final(() => this.emit('finish')); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this._final(() => this.emit('finish')); | |
this._final(() => { | |
this.emit('finish'); | |
}); |
lib/writable.js
Outdated
this.callbacks = []; | ||
|
||
const cb = () => { | ||
callbacks.forEach(cb => cb()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
callbacks.forEach(cb => cb()); | |
callbacks.forEach(callback => { | |
callback(); | |
}); |
lib/writable.js
Outdated
return this.buffer.writableLength; | ||
} | ||
|
||
_destroy(err, cb) { cb(err); } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
_destroy(err, cb) { cb(err); } | |
_destroy(err, cb) { | |
cb(err); | |
} |
}); | ||
|
||
module.exports = submodules; | ||
module.exports = Object.assign({}, ...submodules); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
module.exports = Object.assign({}, ...submodules); | |
module.exports = { ...submodules }; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think these two implementations are doing different things.
package.json
Outdated
@@ -25,9 +25,6 @@ | |||
"engines": { | |||
"node": ">=6.0.0" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"node": ">=8.6.0"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In separate commit before this
24597c9
to
3888e5c
Compare
fs.close(this.fd, (er) => { | ||
er = er || err; | ||
callback(er); | ||
this.closed = true; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should set this.closed = true;
before calling a callback so that the callback will see a correct state.
PR-URL: #9
3888e5c
to
e598f03
Compare
/cc @lundibundi |
No description provided.