From 57af3bea772e3dda732f0f32abbc5309ec7877ec Mon Sep 17 00:00:00 2001 From: Sean T Allen Date: Sat, 21 Oct 2017 11:33:10 -0400 Subject: [PATCH] Add backpressure to the ProcessMonitor package. This is a breaking API change --- packages/process/_test.pony | 21 +++++++++++---------- packages/process/process_monitor.pony | 20 +++++++++++++++++--- 2 files changed, 28 insertions(+), 13 deletions(-) diff --git a/packages/process/_test.pony b/packages/process/_test.pony index 431942385c5..5f588c1771e 100644 --- a/packages/process/_test.pony +++ b/packages/process/_test.pony @@ -1,4 +1,5 @@ use "ponytest" +use "backpressure" use "capsicum" use "collections" use "files" @@ -38,7 +39,7 @@ class iso _TestStdinStdout is UnitTest let auth = h.env.root as AmbientAuth let pm: ProcessMonitor = - ProcessMonitor(auth, consume notifier, path, + ProcessMonitor(auth, auth, consume notifier, path, consume args, consume vars) pm.write("one, two, three") pm.done_writing() // closing stdin allows "cat" to terminate @@ -74,7 +75,7 @@ class iso _TestStderr is UnitTest vars.push("PATH=/bin") let auth = h.env.root as AmbientAuth - _pm = ProcessMonitor(auth, consume notifier, path, + _pm = ProcessMonitor(auth, auth, consume notifier, path, consume args, consume vars) if _pm isnt None then // write to STDIN of the child process let pm = _pm as ProcessMonitor @@ -117,7 +118,7 @@ class iso _TestFileExecCapabilityIsRequired is UnitTest let auth = h.env.root as AmbientAuth let pm: ProcessMonitor = - ProcessMonitor(auth, consume notifier, path, + ProcessMonitor(auth, auth, consume notifier, path, consume args, consume vars) h.dispose_when_done(pm) h.long_test(30_000_000_000) @@ -143,8 +144,8 @@ class iso _TestNonExecutablePathResultsInExecveError is UnitTest let auth = h.env.root as AmbientAuth let notifier = _setup_notifier(h, path) - let pm: ProcessMonitor = ProcessMonitor(auth, consume notifier, path, - consume args, consume vars) + let pm: ProcessMonitor = ProcessMonitor(auth, auth, consume notifier, + path, consume args, consume vars) h.dispose_when_done(pm) h.long_test(30_000_000_000) else @@ -230,8 +231,8 @@ class iso _TestExpect is UnitTest vars.push("PATH=/bin") let auth = h.env.root as AmbientAuth - let pm: ProcessMonitor = ProcessMonitor(auth, consume notifier, path, - consume args, consume vars) + let pm: ProcessMonitor = ProcessMonitor(auth, auth, consume notifier, + path, consume args, consume vars) pm.done_writing() // closing stdin allows "echo" to terminate h.dispose_when_done(pm) h.long_test(30_000_000_000) @@ -262,7 +263,7 @@ class iso _TestWritevOrdering is UnitTest let auth = h.env.root as AmbientAuth let pm: ProcessMonitor = - ProcessMonitor(auth, consume notifier, path, + ProcessMonitor(auth, auth, consume notifier, path, consume args, consume vars) let params: Array[String] iso = recover Array[String](3) end params.push("one") @@ -300,7 +301,7 @@ class iso _TestPrintvOrdering is UnitTest let auth = h.env.root as AmbientAuth let pm: ProcessMonitor = - ProcessMonitor(auth, consume notifier, path, + ProcessMonitor(auth, auth, consume notifier, path, consume args, consume vars) let params: Array[String] iso = recover Array[String](3) end params.push("one") @@ -342,7 +343,7 @@ class iso _TestStdinWriteBuf is UnitTest // fork the child process and attach a ProcessMonitor let auth = h.env.root as AmbientAuth - _pm = ProcessMonitor(auth, consume notifier, path, consume args, + _pm = ProcessMonitor(auth, auth, consume notifier, path, consume args, consume vars) // create a message larger than pipe_cap bytes diff --git a/packages/process/process_monitor.pony b/packages/process/process_monitor.pony index 0517d1defe9..86a3690be1a 100644 --- a/packages/process/process_monitor.pony +++ b/packages/process/process_monitor.pony @@ -87,8 +87,10 @@ a runtime error. Document waitpid behaviour (stops world) """ +use "backpressure" use "collections" use "files" + use @pony_os_errno[I32]() use @pony_asio_event_create[AsioEventID](owner: AsioEventNotify, fd: U32, flags: U32, nsec: U64, noisy: Bool) @@ -166,6 +168,8 @@ actor ProcessMonitor let _notifier: ProcessNotify var _child_pid: I32 = -1 + let _backpressure_auth: BackpressureAuth + var _stdin_event: AsioEventID = AsioEvent.none() var _stdout_event: AsioEventID = AsioEvent.none() var _stderr_event: AsioEventID = AsioEvent.none() @@ -193,6 +197,7 @@ actor ProcessMonitor new create( auth: ProcessMonitorAuth, + backpressure_auth: BackpressureAuth, notifier: ProcessNotify iso, filepath: FilePath, args: Array[String] val, @@ -203,6 +208,7 @@ actor ProcessMonitor and register the asio events. Fork child process and notify our user about incoming data via the notifier. """ + _backpressure_auth = backpressure_auth _notifier = consume notifier // We need permission to execute and the @@ -427,6 +433,7 @@ actor ProcessMonitor close the _stdin_write file descriptor. """ _done_writing = true + Backpressure.release(_backpressure_auth) if _pending.size() == 0 then _close_fd(_stdin_write) end @@ -435,6 +442,7 @@ actor ProcessMonitor """ Terminate child and close down everything. """ + Backpressure.release(_backpressure_auth) try _kill_child()? else @@ -647,6 +655,7 @@ actor ProcessMonitor if errno == _EAGAIN() then // Resource temporarily unavailable, send data later. _pending.push((data, 0)) + Backpressure.apply(_backpressure_auth) return false else // notify caller, close fd and bail out @@ -657,12 +666,14 @@ actor ProcessMonitor elseif len.usize() < data.size() then // Send any remaining data later. _pending.push((data, len.usize())) + Backpressure.apply(_backpressure_auth) return false end return true else // Send later, when the fd is available for writing. _pending.push((data, 0)) + Backpressure.apply(_backpressure_auth) return false end @@ -699,9 +710,12 @@ actor ProcessMonitor else // This chunk has been fully sent. _pending.shift()? - // check if the client has signaled it is done - if (_pending.size() == 0) and _done_writing then - _close_fd(_stdin_write) + if (_pending.size() == 0) then + Backpressure.release(_backpressure_auth) + // check if the client has signaled it is done + if _done_writing then + _close_fd(_stdin_write) + end end end else