Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

src: improve StreamBase write throughput #23843

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion benchmark/net/net-c2s.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT;

const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16],
len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5],
});
Expand Down
2 changes: 1 addition & 1 deletion benchmark/net/net-pipe.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ const net = require('net');
const PORT = common.PORT;

const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16],
len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5],
});
Expand Down
2 changes: 1 addition & 1 deletion benchmark/net/net-s2c.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const common = require('../common.js');
const PORT = common.PORT;

const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16],
len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5]
});
Expand Down
2 changes: 1 addition & 1 deletion benchmark/net/net-wrap-js-stream-passthrough.js
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ const common = require('../common.js');
const { PassThrough } = require('stream');

const bench = common.createBenchmark(main, {
len: [102400, 1024 * 1024 * 16],
len: [64, 102400, 1024 * 1024 * 16],
type: ['utf', 'asc', 'buf'],
dur: [5],
}, {
Expand Down
5 changes: 3 additions & 2 deletions lib/internal/child_process.js
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
kLastWriteWasAsync,
streamBaseState
} = internalBinding('stream_wrap');
const { Pipe, constants: PipeConstants } = internalBinding('pipe_wrap');
Expand Down Expand Up @@ -717,10 +718,10 @@ function setupChannel(target, channel) {
}

var req = new WriteWrap();
req.async = false;

var string = JSON.stringify(message) + '\n';
var err = channel.writeUtf8String(req, string, handle);
var wasAsyncWrite = streamBaseState[kLastWriteWasAsync];

if (err === 0) {
if (handle) {
Expand All @@ -730,7 +731,7 @@ function setupChannel(target, channel) {
obj.postSend(message, handle, options, callback, target);
}

if (req.async) {
if (wasAsyncWrite) {
req.oncomplete = function() {
control.unref();
if (typeof callback === 'function')
Expand Down
22 changes: 20 additions & 2 deletions lib/internal/stream_base_commons.js
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ const {
WriteWrap,
kReadBytesOrError,
kArrayBufferOffset,
kBytesWritten,
kLastWriteWasAsync,
streamBaseState
} = internalBinding('stream_wrap');
const { UV_EOF } = internalBinding('uv');
Expand All @@ -20,7 +22,12 @@ function handleWriteReq(req, data, encoding) {

switch (encoding) {
case 'buffer':
return handle.writeBuffer(req, data);
{
const ret = handle.writeBuffer(req, data);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = data;
return ret;
}
case 'latin1':
case 'binary':
return handle.writeLatin1String(req, data);
Expand All @@ -35,7 +42,13 @@ function handleWriteReq(req, data, encoding) {
case 'utf-16le':
return handle.writeUcs2String(req, data);
default:
return handle.writeBuffer(req, Buffer.from(data, encoding));
{
const buffer = Buffer.from(data, encoding);
const ret = handle.writeBuffer(req, buffer);
if (streamBaseState[kLastWriteWasAsync])
req.buffer = buffer;
return ret;
}
}
}

Expand All @@ -45,6 +58,8 @@ function createWriteWrap(handle, oncomplete) {
req.handle = handle;
req.oncomplete = oncomplete;
req.async = false;
req.bytes = 0;
req.buffer = null;

return req;
}
Expand Down Expand Up @@ -80,6 +95,9 @@ function writeGeneric(self, req, data, encoding, cb) {
}

function afterWriteDispatched(self, req, err, cb) {
req.bytes = streamBaseState[kBytesWritten];
req.async = !!streamBaseState[kLastWriteWasAsync];

if (err !== 0)
return self.destroy(errnoException(err, 'write', req.error), cb);

Expand Down
2 changes: 0 additions & 2 deletions src/env.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,8 @@ struct PackageConfig {
V(address_string, "address") \
V(aliases_string, "aliases") \
V(args_string, "args") \
V(async, "async") \
V(async_ids_stack_string, "async_ids_stack") \
V(buffer_string, "buffer") \
V(bytes_string, "bytes") \
V(bytes_parsed_string, "bytesParsed") \
V(bytes_read_string, "bytesRead") \
V(bytes_written_string, "bytesWritten") \
Expand Down
33 changes: 7 additions & 26 deletions src/stream_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,11 @@ namespace node {

using v8::Array;
using v8::ArrayBuffer;
using v8::Boolean;
using v8::Context;
using v8::FunctionCallbackInfo;
using v8::HandleScope;
using v8::Integer;
using v8::Local;
using v8::Number;
using v8::Object;
using v8::String;
using v8::Value;
Expand Down Expand Up @@ -56,18 +54,9 @@ int StreamBase::Shutdown(const FunctionCallbackInfo<Value>& args) {
return Shutdown(req_wrap_obj);
}

inline void SetWriteResultPropertiesOnWrapObject(
Environment* env,
Local<Object> req_wrap_obj,
const StreamWriteResult& res) {
req_wrap_obj->Set(
env->context(),
env->bytes_string(),
Number::New(env->isolate(), res.bytes)).FromJust();
req_wrap_obj->Set(
env->context(),
env->async(),
Boolean::New(env->isolate(), res.async)).FromJust();
void StreamBase::SetWriteResult(const StreamWriteResult& res) {
env_->stream_base_state()[kBytesWritten] = res.bytes;
env_->stream_base_state()[kLastWriteWasAsync] = res.async;
}

int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
Expand Down Expand Up @@ -160,7 +149,7 @@ int StreamBase::Writev(const FunctionCallbackInfo<Value>& args) {
}

StreamWriteResult res = Write(*bufs, count, nullptr, req_wrap_obj);
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
SetWriteResult(res);
if (res.wrap != nullptr && storage_size > 0) {
res.wrap->SetAllocatedStorage(storage.release(), storage_size);
}
Expand All @@ -185,10 +174,7 @@ int StreamBase::WriteBuffer(const FunctionCallbackInfo<Value>& args) {
buf.len = Buffer::Length(args[1]);

StreamWriteResult res = Write(&buf, 1, nullptr, req_wrap_obj);

if (res.async)
req_wrap_obj->Set(env->context(), env->buffer_string(), args[1]).FromJust();
SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
SetWriteResult(res);

return res.err;
}
Expand Down Expand Up @@ -247,12 +233,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {

// Immediate failure or success
if (err != 0 || count == 0) {
req_wrap_obj->Set(env->context(), env->async(), False(env->isolate()))
.FromJust();
req_wrap_obj->Set(env->context(),
env->bytes_string(),
Integer::NewFromUnsigned(env->isolate(), data_size))
.FromJust();
SetWriteResult(StreamWriteResult { false, err, nullptr, data_size });
return err;
}

Expand Down Expand Up @@ -295,7 +276,7 @@ int StreamBase::WriteString(const FunctionCallbackInfo<Value>& args) {
StreamWriteResult res = Write(&buf, 1, send_handle, req_wrap_obj);
res.bytes += synchronously_written;

SetWriteResultPropertiesOnWrapObject(env, req_wrap_obj, res);
SetWriteResult(res);
if (res.wrap != nullptr) {
res.wrap->SetAllocatedStorage(data.release(), data_size);
}
Expand Down
4 changes: 4 additions & 0 deletions src/stream_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -332,13 +332,17 @@ class StreamBase : public StreamResource {
enum StreamBaseStateFields {
kReadBytesOrError,
kArrayBufferOffset,
kBytesWritten,
kLastWriteWasAsync,
kNumStreamBaseStateFields
};

private:
Environment* env_;
EmitToJSStreamListener default_listener_;

void SetWriteResult(const StreamWriteResult& res);

friend class WriteWrap;
friend class ShutdownWrap;
friend class Environment; // For kNumStreamBaseStateFields.
Expand Down
2 changes: 2 additions & 0 deletions src/stream_wrap.cc
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,8 @@ void LibuvStreamWrap::Initialize(Local<Object> target,

NODE_DEFINE_CONSTANT(target, kReadBytesOrError);
NODE_DEFINE_CONSTANT(target, kArrayBufferOffset);
NODE_DEFINE_CONSTANT(target, kBytesWritten);
NODE_DEFINE_CONSTANT(target, kLastWriteWasAsync);
target->Set(context, FIXED_ONE_BYTE_STRING(env->isolate(), "streamBaseState"),
env->stream_base_state().GetJSArray()).FromJust();
}
Expand Down
2 changes: 1 addition & 1 deletion test/sequential/test-async-wrap-getasyncid.js
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ if (common.hasCrypto) { // eslint-disable-line node-core/crypto-check
const err = handle.writeLatin1String(wreq, 'hi'.repeat(100000));
if (err)
throw new Error(`write failed: ${getSystemErrorName(err)}`);
if (!wreq.async) {
if (!stream_wrap.streamBaseState[stream_wrap.kLastWriteWasAsync]) {
testUninitialized(wreq, 'WriteWrap');
// Synchronous finish. Write more data until we hit an
// asynchronous write.
Expand Down