Skip to content

Commit

Permalink
Add backpressure to the ProcessMonitor package.
Browse files Browse the repository at this point in the history
This is a breaking API change
  • Loading branch information
SeanTAllen committed Oct 21, 2017
1 parent 7cb55b3 commit 57af3be
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
21 changes: 11 additions & 10 deletions packages/process/_test.pony
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use "ponytest"
use "backpressure"
use "capsicum"
use "collections"
use "files"
Expand Down Expand Up @@ -38,7 +39,7 @@ class iso _TestStdinStdout is UnitTest

let auth = h.env.root as AmbientAuth
let pm: ProcessMonitor =
ProcessMonitor(auth, consume notifier, path,
ProcessMonitor(auth, auth, consume notifier, path,
consume args, consume vars)
pm.write("one, two, three")
pm.done_writing() // closing stdin allows "cat" to terminate
Expand Down Expand Up @@ -74,7 +75,7 @@ class iso _TestStderr is UnitTest
vars.push("PATH=/bin")

let auth = h.env.root as AmbientAuth
_pm = ProcessMonitor(auth, consume notifier, path,
_pm = ProcessMonitor(auth, auth, consume notifier, path,
consume args, consume vars)
if _pm isnt None then // write to STDIN of the child process
let pm = _pm as ProcessMonitor
Expand Down Expand Up @@ -117,7 +118,7 @@ class iso _TestFileExecCapabilityIsRequired is UnitTest

let auth = h.env.root as AmbientAuth
let pm: ProcessMonitor =
ProcessMonitor(auth, consume notifier, path,
ProcessMonitor(auth, auth, consume notifier, path,
consume args, consume vars)
h.dispose_when_done(pm)
h.long_test(30_000_000_000)
Expand All @@ -143,8 +144,8 @@ class iso _TestNonExecutablePathResultsInExecveError is UnitTest

let auth = h.env.root as AmbientAuth
let notifier = _setup_notifier(h, path)
let pm: ProcessMonitor = ProcessMonitor(auth, consume notifier, path,
consume args, consume vars)
let pm: ProcessMonitor = ProcessMonitor(auth, auth, consume notifier,
path, consume args, consume vars)
h.dispose_when_done(pm)
h.long_test(30_000_000_000)
else
Expand Down Expand Up @@ -230,8 +231,8 @@ class iso _TestExpect is UnitTest
vars.push("PATH=/bin")

let auth = h.env.root as AmbientAuth
let pm: ProcessMonitor = ProcessMonitor(auth, consume notifier, path,
consume args, consume vars)
let pm: ProcessMonitor = ProcessMonitor(auth, auth, consume notifier,
path, consume args, consume vars)
pm.done_writing() // closing stdin allows "echo" to terminate
h.dispose_when_done(pm)
h.long_test(30_000_000_000)
Expand Down Expand Up @@ -262,7 +263,7 @@ class iso _TestWritevOrdering is UnitTest

let auth = h.env.root as AmbientAuth
let pm: ProcessMonitor =
ProcessMonitor(auth, consume notifier, path,
ProcessMonitor(auth, auth, consume notifier, path,
consume args, consume vars)
let params: Array[String] iso = recover Array[String](3) end
params.push("one")
Expand Down Expand Up @@ -300,7 +301,7 @@ class iso _TestPrintvOrdering is UnitTest

let auth = h.env.root as AmbientAuth
let pm: ProcessMonitor =
ProcessMonitor(auth, consume notifier, path,
ProcessMonitor(auth, auth, consume notifier, path,
consume args, consume vars)
let params: Array[String] iso = recover Array[String](3) end
params.push("one")
Expand Down Expand Up @@ -342,7 +343,7 @@ class iso _TestStdinWriteBuf is UnitTest

// fork the child process and attach a ProcessMonitor
let auth = h.env.root as AmbientAuth
_pm = ProcessMonitor(auth, consume notifier, path, consume args,
_pm = ProcessMonitor(auth, auth, consume notifier, path, consume args,
consume vars)

// create a message larger than pipe_cap bytes
Expand Down
20 changes: 17 additions & 3 deletions packages/process/process_monitor.pony
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,10 @@ a runtime error.
Document waitpid behaviour (stops world)
"""
use "backpressure"
use "collections"
use "files"

use @pony_os_errno[I32]()
use @pony_asio_event_create[AsioEventID](owner: AsioEventNotify, fd: U32,
flags: U32, nsec: U64, noisy: Bool)
Expand Down Expand Up @@ -166,6 +168,8 @@ actor ProcessMonitor
let _notifier: ProcessNotify
var _child_pid: I32 = -1

let _backpressure_auth: BackpressureAuth

var _stdin_event: AsioEventID = AsioEvent.none()
var _stdout_event: AsioEventID = AsioEvent.none()
var _stderr_event: AsioEventID = AsioEvent.none()
Expand Down Expand Up @@ -193,6 +197,7 @@ actor ProcessMonitor

new create(
auth: ProcessMonitorAuth,
backpressure_auth: BackpressureAuth,
notifier: ProcessNotify iso,
filepath: FilePath,
args: Array[String] val,
Expand All @@ -203,6 +208,7 @@ actor ProcessMonitor
and register the asio events. Fork child process and notify our
user about incoming data via the notifier.
"""
_backpressure_auth = backpressure_auth
_notifier = consume notifier

// We need permission to execute and the
Expand Down Expand Up @@ -427,6 +433,7 @@ actor ProcessMonitor
close the _stdin_write file descriptor.
"""
_done_writing = true
Backpressure.release(_backpressure_auth)
if _pending.size() == 0 then
_close_fd(_stdin_write)
end
Expand All @@ -435,6 +442,7 @@ actor ProcessMonitor
"""
Terminate child and close down everything.
"""
Backpressure.release(_backpressure_auth)
try
_kill_child()?
else
Expand Down Expand Up @@ -647,6 +655,7 @@ actor ProcessMonitor
if errno == _EAGAIN() then
// Resource temporarily unavailable, send data later.
_pending.push((data, 0))
Backpressure.apply(_backpressure_auth)
return false
else
// notify caller, close fd and bail out
Expand All @@ -657,12 +666,14 @@ actor ProcessMonitor
elseif len.usize() < data.size() then
// Send any remaining data later.
_pending.push((data, len.usize()))
Backpressure.apply(_backpressure_auth)
return false
end
return true
else
// Send later, when the fd is available for writing.
_pending.push((data, 0))
Backpressure.apply(_backpressure_auth)
return false
end

Expand Down Expand Up @@ -699,9 +710,12 @@ actor ProcessMonitor
else
// This chunk has been fully sent.
_pending.shift()?
// check if the client has signaled it is done
if (_pending.size() == 0) and _done_writing then
_close_fd(_stdin_write)
if (_pending.size() == 0) then
Backpressure.release(_backpressure_auth)
// check if the client has signaled it is done
if _done_writing then
_close_fd(_stdin_write)
end
end
end
else
Expand Down

0 comments on commit 57af3be

Please sign in to comment.