diff --git a/src/stream_pipe.cc b/src/stream_pipe.cc index 832a20d324f0ea..504b840972f22d 100644 --- a/src/stream_pipe.cc +++ b/src/stream_pipe.cc @@ -25,7 +25,7 @@ StreamPipe::StreamPipe(StreamBase* source, source->PushStreamListener(&readable_listener_); sink->PushStreamListener(&writable_listener_); - CHECK(sink->HasWantsWrite()); + uses_wants_write_ = sink->HasWantsWrite(); // Set up links between this object and the source/sink objects. // In particular, this makes sure that they are garbage collected as a group, @@ -66,7 +66,8 @@ void StreamPipe::Unpipe() { is_closed_ = true; is_reading_ = false; source()->RemoveStreamListener(&readable_listener_); - sink()->RemoveStreamListener(&writable_listener_); + if (pending_writes_ == 0) + sink()->RemoveStreamListener(&writable_listener_); // Delay the JS-facing part with SetImmediate, because this might be from // inside the garbage collector, so we can’t run JS here. @@ -123,13 +124,16 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread, // EOF or error; stop reading and pass the error to the previous listener // (which might end up in JS). pipe->is_eof_ = true; + // Cache `sink()` here because the previous listener might do things + // that eventually lead to an `Unpipe()` call. + StreamBase* sink = pipe->sink(); stream()->ReadStop(); CHECK_NOT_NULL(previous_listener_); previous_listener_->OnStreamRead(nread, uv_buf_init(nullptr, 0)); // If we’re not writing, close now. Otherwise, we’ll do that in // `OnStreamAfterWrite()`. - if (!pipe->is_writing_) { - pipe->ShutdownWritable(); + if (pipe->pending_writes_ == 0) { + sink->Shutdown(); pipe->Unpipe(); } return; @@ -139,12 +143,13 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread, } void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) { + CHECK(uses_wants_write_ || pending_writes_ == 0); uv_buf_t buffer = uv_buf_init(buf.data(), nread); StreamWriteResult res = sink()->Write(&buffer, 1); + pending_writes_++; if (!res.async) { writable_listener_.OnStreamAfterWrite(nullptr, res.err); } else { - is_writing_ = true; is_reading_ = false; res.wrap->SetAllocatedStorage(std::move(buf)); if (source() != nullptr) @@ -152,19 +157,26 @@ void StreamPipe::ProcessData(size_t nread, AllocatedBuffer&& buf) { } } -void StreamPipe::ShutdownWritable() { - sink()->Shutdown(); -} - void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w, int status) { StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); - pipe->is_writing_ = false; + pipe->pending_writes_--; + if (pipe->is_closed_) { + if (pipe->pending_writes_ == 0) { + Environment* env = pipe->env(); + HandleScope handle_scope(env->isolate()); + Context::Scope context_scope(env->context()); + pipe->MakeCallback(env->oncomplete_string(), 0, nullptr).ToLocalChecked(); + stream()->RemoveStreamListener(this); + } + return; + } + if (pipe->is_eof_) { HandleScope handle_scope(pipe->env()->isolate()); InternalCallbackScope callback_scope(pipe, InternalCallbackScope::kSkipTaskQueues); - pipe->ShutdownWritable(); + pipe->sink()->Shutdown(); pipe->Unpipe(); return; } @@ -176,6 +188,10 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w, prev->OnStreamAfterWrite(w, status); return; } + + if (!pipe->uses_wants_write_) { + OnStreamWantsWrite(65536); + } } void StreamPipe::WritableListener::OnStreamAfterShutdown(ShutdownWrap* w, @@ -199,6 +215,7 @@ void StreamPipe::WritableListener::OnStreamDestroy() { StreamPipe* pipe = ContainerOf(&StreamPipe::writable_listener_, this); pipe->sink_destroyed_ = true; pipe->is_eof_ = true; + pipe->pending_writes_ = 0; pipe->Unpipe(); } @@ -239,8 +256,7 @@ void StreamPipe::Start(const FunctionCallbackInfo& args) { StreamPipe* pipe; ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); pipe->is_closed_ = false; - if (pipe->wanted_data_ > 0) - pipe->writable_listener_.OnStreamWantsWrite(pipe->wanted_data_); + pipe->writable_listener_.OnStreamWantsWrite(65536); } void StreamPipe::Unpipe(const FunctionCallbackInfo& args) { @@ -249,6 +265,18 @@ void StreamPipe::Unpipe(const FunctionCallbackInfo& args) { pipe->Unpipe(); } +void StreamPipe::IsClosed(const FunctionCallbackInfo& args) { + StreamPipe* pipe; + ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); + args.GetReturnValue().Set(pipe->is_closed_); +} + +void StreamPipe::PendingWrites(const FunctionCallbackInfo& args) { + StreamPipe* pipe; + ASSIGN_OR_RETURN_UNWRAP(&pipe, args.Holder()); + args.GetReturnValue().Set(pipe->pending_writes_); +} + namespace { void InitializeStreamPipe(Local target, @@ -263,6 +291,8 @@ void InitializeStreamPipe(Local target, FIXED_ONE_BYTE_STRING(env->isolate(), "StreamPipe"); env->SetProtoMethod(pipe, "unpipe", StreamPipe::Unpipe); env->SetProtoMethod(pipe, "start", StreamPipe::Start); + env->SetProtoMethod(pipe, "isClosed", StreamPipe::IsClosed); + env->SetProtoMethod(pipe, "pendingWrites", StreamPipe::PendingWrites); pipe->Inherit(AsyncWrap::GetConstructorTemplate(env)); pipe->SetClassName(stream_pipe_string); pipe->InstanceTemplate()->SetInternalFieldCount(1); diff --git a/src/stream_pipe.h b/src/stream_pipe.h index 061ad9842e8f6d..4cc5668c4c5137 100644 --- a/src/stream_pipe.h +++ b/src/stream_pipe.h @@ -17,6 +17,8 @@ class StreamPipe : public AsyncWrap { static void New(const v8::FunctionCallbackInfo& args); static void Start(const v8::FunctionCallbackInfo& args); static void Unpipe(const v8::FunctionCallbackInfo& args); + static void IsClosed(const v8::FunctionCallbackInfo& args); + static void PendingWrites(const v8::FunctionCallbackInfo& args); SET_NO_MEMORY_INFO() SET_MEMORY_INFO_NAME(StreamPipe) @@ -26,14 +28,13 @@ class StreamPipe : public AsyncWrap { inline StreamBase* source(); inline StreamBase* sink(); - inline void ShutdownWritable(); - + int pending_writes_ = 0; bool is_reading_ = false; - bool is_writing_ = false; bool is_eof_ = false; bool is_closed_ = true; bool sink_destroyed_ = false; bool source_destroyed_ = false; + bool uses_wants_write_ = false; // Set a default value so that when we’re coming from Start(), we know // that we don’t want to read just yet.