@@ -25,7 +25,7 @@ StreamPipe::StreamPipe(StreamBase* source,
2525 source->PushStreamListener (&readable_listener_);
2626 sink->PushStreamListener (&writable_listener_);
2727
28- CHECK ( sink->HasWantsWrite () );
28+ uses_wants_write_ = sink->HasWantsWrite ();
2929
3030 // Set up links between this object and the source/sink objects.
3131 // In particular, this makes sure that they are garbage collected as a group,
@@ -66,7 +66,8 @@ void StreamPipe::Unpipe(bool is_in_deletion) {
6666 is_closed_ = true ;
6767 is_reading_ = false ;
6868 source ()->RemoveStreamListener (&readable_listener_);
69- sink ()->RemoveStreamListener (&writable_listener_);
69+ if (pending_writes_ == 0 )
70+ sink ()->RemoveStreamListener (&writable_listener_);
7071
7172 if (is_in_deletion) return ;
7273
@@ -126,13 +127,16 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
126127 // EOF or error; stop reading and pass the error to the previous listener
127128 // (which might end up in JS).
128129 pipe->is_eof_ = true ;
130+ // Cache `sink()` here because the previous listener might do things
131+ // that eventually lead to an `Unpipe()` call.
132+ StreamBase* sink = pipe->sink ();
129133 stream ()->ReadStop ();
130134 CHECK_NOT_NULL (previous_listener_);
131135 previous_listener_->OnStreamRead (nread, uv_buf_init (nullptr , 0 ));
132136 // If we’re not writing, close now. Otherwise, we’ll do that in
133137 // `OnStreamAfterWrite()`.
134- if (! pipe->is_writing_ ) {
135- pipe-> ShutdownWritable ();
138+ if (pipe->pending_writes_ == 0 ) {
139+ sink-> Shutdown ();
136140 pipe->Unpipe ();
137141 }
138142 return ;
@@ -142,32 +146,40 @@ void StreamPipe::ReadableListener::OnStreamRead(ssize_t nread,
142146}
143147
144148void StreamPipe::ProcessData (size_t nread, AllocatedBuffer&& buf) {
149+ CHECK (uses_wants_write_ || pending_writes_ == 0 );
145150 uv_buf_t buffer = uv_buf_init (buf.data (), nread);
146151 StreamWriteResult res = sink ()->Write (&buffer, 1 );
152+ pending_writes_++;
147153 if (!res.async ) {
148154 writable_listener_.OnStreamAfterWrite (nullptr , res.err );
149155 } else {
150- is_writing_ = true ;
151156 is_reading_ = false ;
152157 res.wrap ->SetAllocatedStorage (std::move (buf));
153158 if (source () != nullptr )
154159 source ()->ReadStop ();
155160 }
156161}
157162
158- void StreamPipe::ShutdownWritable () {
159- sink ()->Shutdown ();
160- }
161-
162163void StreamPipe::WritableListener::OnStreamAfterWrite (WriteWrap* w,
163164 int status) {
164165 StreamPipe* pipe = ContainerOf (&StreamPipe::writable_listener_, this );
165- pipe->is_writing_ = false ;
166+ pipe->pending_writes_ --;
167+ if (pipe->is_closed_ ) {
168+ if (pipe->pending_writes_ == 0 ) {
169+ Environment* env = pipe->env ();
170+ HandleScope handle_scope (env->isolate ());
171+ Context::Scope context_scope (env->context ());
172+ pipe->MakeCallback (env->oncomplete_string (), 0 , nullptr ).ToLocalChecked ();
173+ stream ()->RemoveStreamListener (this );
174+ }
175+ return ;
176+ }
177+
166178 if (pipe->is_eof_ ) {
167179 HandleScope handle_scope (pipe->env ()->isolate ());
168180 InternalCallbackScope callback_scope (pipe,
169181 InternalCallbackScope::kSkipTaskQueues );
170- pipe->ShutdownWritable ();
182+ pipe->sink ()-> Shutdown ();
171183 pipe->Unpipe ();
172184 return ;
173185 }
@@ -179,6 +191,10 @@ void StreamPipe::WritableListener::OnStreamAfterWrite(WriteWrap* w,
179191 prev->OnStreamAfterWrite (w, status);
180192 return ;
181193 }
194+
195+ if (!pipe->uses_wants_write_ ) {
196+ OnStreamWantsWrite (65536 );
197+ }
182198}
183199
184200void StreamPipe::WritableListener::OnStreamAfterShutdown (ShutdownWrap* w,
@@ -202,6 +218,7 @@ void StreamPipe::WritableListener::OnStreamDestroy() {
202218 StreamPipe* pipe = ContainerOf (&StreamPipe::writable_listener_, this );
203219 pipe->sink_destroyed_ = true ;
204220 pipe->is_eof_ = true ;
221+ pipe->pending_writes_ = 0 ;
205222 pipe->Unpipe ();
206223}
207224
@@ -242,8 +259,7 @@ void StreamPipe::Start(const FunctionCallbackInfo<Value>& args) {
242259 StreamPipe* pipe;
243260 ASSIGN_OR_RETURN_UNWRAP (&pipe, args.Holder ());
244261 pipe->is_closed_ = false ;
245- if (pipe->wanted_data_ > 0 )
246- pipe->writable_listener_ .OnStreamWantsWrite (pipe->wanted_data_ );
262+ pipe->writable_listener_ .OnStreamWantsWrite (65536 );
247263}
248264
249265void StreamPipe::Unpipe (const FunctionCallbackInfo<Value>& args) {
@@ -252,6 +268,18 @@ void StreamPipe::Unpipe(const FunctionCallbackInfo<Value>& args) {
252268 pipe->Unpipe ();
253269}
254270
271+ void StreamPipe::IsClosed (const FunctionCallbackInfo<Value>& args) {
272+ StreamPipe* pipe;
273+ ASSIGN_OR_RETURN_UNWRAP (&pipe, args.Holder ());
274+ args.GetReturnValue ().Set (pipe->is_closed_ );
275+ }
276+
277+ void StreamPipe::PendingWrites (const FunctionCallbackInfo<Value>& args) {
278+ StreamPipe* pipe;
279+ ASSIGN_OR_RETURN_UNWRAP (&pipe, args.Holder ());
280+ args.GetReturnValue ().Set (pipe->pending_writes_ );
281+ }
282+
255283namespace {
256284
257285void InitializeStreamPipe (Local<Object> target,
@@ -266,6 +294,8 @@ void InitializeStreamPipe(Local<Object> target,
266294 FIXED_ONE_BYTE_STRING (env->isolate (), " StreamPipe" );
267295 env->SetProtoMethod (pipe, " unpipe" , StreamPipe::Unpipe);
268296 env->SetProtoMethod (pipe, " start" , StreamPipe::Start);
297+ env->SetProtoMethod (pipe, " isClosed" , StreamPipe::IsClosed);
298+ env->SetProtoMethod (pipe, " pendingWrites" , StreamPipe::PendingWrites);
269299 pipe->Inherit (AsyncWrap::GetConstructorTemplate (env));
270300 pipe->SetClassName (stream_pipe_string);
271301 pipe->InstanceTemplate ()->SetInternalFieldCount (1 );
0 commit comments