Skip to content

Commit

Permalink
Process monitor async write buffering
Browse files Browse the repository at this point in the history
Based on work originally done by @lispmeister. I brought it up to date
and fixed problematic test failures.

Add write buffering and asio events for writes to STDIN of the child
We now register an asio event on the STDIN file descriptor.
We write as much as possible. If we can't write everything we store the
ByteSeq in a list together with an offset and set the _stdin_writeable
flag to false. Once the OS signals STDIN is writeable again we try to
write the chunks stored in the list starting with the head chunk.

Closes #945
  • Loading branch information
SeanTAllen committed Aug 22, 2017
1 parent 258c46a commit 315a08e
Show file tree
Hide file tree
Showing 2 changed files with 299 additions and 81 deletions.
180 changes: 152 additions & 28 deletions packages/process/_test.pony
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
use "ponytest"
use "files"
use "capsicum"
use "collections"
use "files"
use "time"

actor Main is TestList
new create(env: Env) => PonyTest(env, this)
Expand All @@ -15,14 +16,18 @@ actor Main is TestList
test(_TestExpect)
test(_TestWritevOrdering)
test(_TestPrintvOrdering)
test(_TestStdinWriteBuf)

class iso _TestStdinStdout is UnitTest
fun name(): String =>
"process/STDIN-STDOUT"

fun exclusion_group(): String =>
"process-monitor"

fun apply(h: TestHelper) =>
let notifier: ProcessNotify iso =
_ProcessClient("one, two, three", "", 0, h)
let size: USize = 15 // length of "one, two, three" string
let notifier: ProcessNotify iso = _ProcessClient(size, "", 0, h)
try
let path = FilePath(h.env.root as AmbientAuth, "/bin/cat")?
let args: Array[String] iso = recover Array[String](1) end
Expand All @@ -35,9 +40,10 @@ class iso _TestStdinStdout is UnitTest
let pm: ProcessMonitor =
ProcessMonitor(auth, consume notifier, path,
consume args, consume vars)
pm.write("one, two, three")
pm.done_writing() // closing stdin allows "cat" to terminate
h.long_test(5_000_000_000)
pm.write("one, two, three")
pm.done_writing() // closing stdin allows "cat" to terminate
h.dispose_when_done(pm)
h.long_test(5_000_000_000)
else
h.fail("Could not create FilePath!")
end
Expand All @@ -46,13 +52,17 @@ class iso _TestStdinStdout is UnitTest
h.complete(false)

class iso _TestStderr is UnitTest
var _pm: (ProcessMonitor | None) = None

fun name(): String =>
"process/STDERR"

fun apply(h: TestHelper) =>
let notifier: ProcessNotify iso =
_ProcessClient("",
"cat: file_does_not_exist: No such file or directory\n", 1, h)
fun exclusion_group(): String =>
"process-monitor"

fun ref apply(h: TestHelper) =>
let notifier: ProcessNotify iso = _ProcessClient(0,
"cat: file_does_not_exist: No such file or directory\n", 1, h)
try
let path = FilePath(h.env.root as AmbientAuth, "/bin/cat")?
let args: Array[String] iso = recover Array[String](2) end
Expand All @@ -64,23 +74,37 @@ class iso _TestStderr is UnitTest
vars.push("PATH=/bin")

let auth = h.env.root as AmbientAuth
let pm: ProcessMonitor =
ProcessMonitor(auth, consume notifier, path,
_pm = ProcessMonitor(auth, consume notifier, path,
consume args, consume vars)
h.long_test(5_000_000_000)
if _pm isnt None then // write to STDIN of the child process
let pm = _pm as ProcessMonitor
pm.done_writing() // closing stdin
h.dispose_when_done(pm)
end
h.long_test(5_000_000_000)
else
h.fail("Could not create FilePath!")
end

fun timed_out(h: TestHelper) =>
try
if _pm isnt None then // kill the child process and cleanup fd
(_pm as ProcessMonitor).dispose()
end
else
h.fail("Error disposing of forked process in STDIN-WriteBuf test")
end
h.complete(false)

class iso _TestFileExecCapabilityIsRequired is UnitTest
fun name(): String =>
"process/TestFileExecCapabilityIsRequired"

fun exclusion_group(): String =>
"process-monitor"

fun apply(h: TestHelper) =>
let notifier: ProcessNotify iso = _ProcessClient("", "", 1, h)
let notifier: ProcessNotify iso = _ProcessClient(0, "", 1, h)
try
let path =
FilePath(h.env.root as AmbientAuth, "/bin/date",
Expand All @@ -95,6 +119,7 @@ class iso _TestFileExecCapabilityIsRequired is UnitTest
let pm: ProcessMonitor =
ProcessMonitor(auth, consume notifier, path,
consume args, consume vars)
h.dispose_when_done(pm)
h.long_test(5_000_000_000)
else
h.fail("Could not create FilePath!")
Expand All @@ -107,6 +132,9 @@ class iso _TestNonExecutablePathResultsInExecveError is UnitTest
fun name(): String =>
"process/_TestNonExecutablePathResultsInExecveError"

fun exclusion_group(): String =>
"process-monitor"

fun apply(h: TestHelper) =>
try
let path = _setup_file(h)?
Expand All @@ -117,6 +145,7 @@ class iso _TestNonExecutablePathResultsInExecveError is UnitTest
let notifier = _setup_notifier(h, path)
let pm: ProcessMonitor = ProcessMonitor(auth, consume notifier, path,
consume args, consume vars)
h.dispose_when_done(pm)
h.long_test(5_000_000_000)
else
h.fail("Could not create FilePath!")
Expand Down Expand Up @@ -162,6 +191,9 @@ class iso _TestExpect is UnitTest
fun name(): String =>
"process/Expect"

fun exclusion_group(): String =>
"process-monitor"

fun apply(h: TestHelper) =>
let notifier =
recover object is ProcessNotify
Expand All @@ -185,7 +217,9 @@ class iso _TestExpect is UnitTest
_h.assert_eq[I32](child_exit_code, 0)
_h.assert_array_eq[String](_out, ["he"; "llo "; "th"; "ere!"])
_h.complete(true)
end end
end
end

try
let path = FilePath(h.env.root as AmbientAuth, "/bin/echo")?
let args: Array[String] iso = recover Array[String](1) end
Expand All @@ -198,7 +232,9 @@ class iso _TestExpect is UnitTest
let auth = h.env.root as AmbientAuth
let pm: ProcessMonitor = ProcessMonitor(auth, consume notifier, path,
consume args, consume vars)
h.long_test(5_000_000_000)
pm.done_writing() // closing stdin allows "echo" to terminate
h.dispose_when_done(pm)
h.long_test(5_000_000_000)
else
h.fail("Could not create FilePath!")
end
Expand All @@ -207,12 +243,15 @@ class iso _TestExpect is UnitTest
h.complete(false)

class iso _TestWritevOrdering is UnitTest
fun name(): String => "process/WritevOrdering"
fun label(): String => "unreliable-osx"
fun name(): String =>
"process/WritevOrdering"

fun exclusion_group(): String =>
"process-monitor"

fun apply(h: TestHelper) =>
let notifier: ProcessNotify iso =
_ProcessClient("onetwothree", "", 0, h)
_ProcessClient(11, "", 0, h)
try
let path = FilePath(h.env.root as AmbientAuth, "/bin/cat")?
let args: Array[String] iso = recover Array[String](1) end
Expand All @@ -232,6 +271,7 @@ class iso _TestWritevOrdering is UnitTest

pm.writev(consume params)
pm.done_writing() // closing stdin allows "cat" to terminate
h.dispose_when_done(pm)
h.long_test(5_000_000_000)
else
h.fail("Could not create FilePath!")
Expand All @@ -241,12 +281,15 @@ class iso _TestWritevOrdering is UnitTest
h.complete(false)

class iso _TestPrintvOrdering is UnitTest
fun name(): String => "process/PrintvOrdering"
fun label(): String => "unreliable-osx"
fun name(): String =>
"process/PrintvOrdering"

fun exclusion_group(): String =>
"process-monitor"

fun apply(h: TestHelper) =>
let notifier: ProcessNotify iso =
_ProcessClient("one\ntwo\nthree\n", "", 0, h)
_ProcessClient(14, "", 0, h)
try
let path = FilePath(h.env.root as AmbientAuth, "/bin/cat")?
let args: Array[String] iso = recover Array[String](1) end
Expand All @@ -266,6 +309,7 @@ class iso _TestPrintvOrdering is UnitTest

pm.printv(consume params)
pm.done_writing() // closing stdin allows "cat" to terminate
h.dispose_when_done(pm)
h.long_test(5_000_000_000)
else
h.fail("Could not create FilePath!")
Expand All @@ -274,19 +318,76 @@ class iso _TestPrintvOrdering is UnitTest
fun timed_out(h: TestHelper) =>
h.complete(false)

class iso _TestStdinWriteBuf is UnitTest
var _pm: (ProcessMonitor | None) = None
let _test_start: U64 = Time.nanos()

fun name(): String =>
"process/STDIN-WriteBuf"

fun exclusion_group(): String =>
"process-monitor"

fun ref apply(h: TestHelper) =>
let pipe_cap: USize = 65536
let notifier: ProcessNotify iso = _ProcessClient((pipe_cap + 1) * 2,
"", 0, h)
try
let path = FilePath(h.env.root as AmbientAuth, "/bin/cat")?
let args: Array[String] iso = recover Array[String](1) end
args.push("cat")
let vars: Array[String] iso = recover Array[String](2) end
vars.push("HOME=/")
vars.push("PATH=/bin")

// fork the child process and attach a ProcessMonitor
let auth = h.env.root as AmbientAuth
_pm = ProcessMonitor(auth, consume notifier, path, consume args,
consume vars)

// create a message larger than pipe_cap bytes
let message: Array[U8] val = recover Array[U8].>undefined(pipe_cap + 1) end

if _pm isnt None then // write to STDIN of the child process
let pm = _pm as ProcessMonitor
pm.write(message)
pm.write(message)
pm.done_writing() // closing stdin allows "cat" to terminate
h.dispose_when_done(pm)
end
h.long_test(5_000_000_000)
else
h.fail("Error running STDIN-WriteBuf test")
end

fun timed_out(h: TestHelper) =>
h.log("_TestStdinWriteBuf.timed_out: ran for " +
(Time.nanos() - _test_start).string() + " ns")
try
if _pm isnt None then // kill the child process and cleanup fd
h.log("_TestStdinWriteBuf.timed_out: calling pm.dispose()")
(_pm as ProcessMonitor).dispose()
end
else
h.fail("Error disposing of forked process in STDIN-WriteBuf test")
end
h.complete(false)

class _ProcessClient is ProcessNotify
"""
Notifications for Process connections.
"""
let _out: String
let _out: USize
let _err: String
let _exit_code: I32
let _h: TestHelper
let _d_stdout: String ref = String
var _d_stdout_chars: USize = 0
let _d_stderr: String ref = String
let _created: U64
var _first_data: U64 = 0

new iso create(
out: String,
out: USize,
err: String,
exit_code: I32,
h: TestHelper)
Expand All @@ -295,12 +396,23 @@ class _ProcessClient is ProcessNotify
_err = err
_exit_code = exit_code
_h = h
_created = Time.nanos()

fun ref stdout(process: ProcessMonitor ref, data: Array[U8] iso) =>
"""
Called when new data is received on STDOUT of the forked process
"""
_d_stdout.append(consume data)
_h.log("\tReceived from stdout: " + data.size().string() + " bytes")
if (_first_data == 0) then
_first_data = Time.nanos()
end
_d_stdout_chars = _d_stdout_chars + (consume data).size()
_h.log("\tReceived so far: " + _d_stdout_chars.string() + " bytes")
_h.log("\tExpecting: " + _out.string() + " bytes")
if _out == _d_stdout_chars then
// we've received our total data
_h.complete(true)
end

fun ref stderr(process: ProcessMonitor ref, data: Array[U8] iso) =>
"""
Expand Down Expand Up @@ -330,10 +442,22 @@ class _ProcessClient is ProcessNotify
Called when ProcessMonitor terminates to cleanup ProcessNotify
We receive the exit code of the child process from ProcessMonitor.
"""
let last_data: U64 = Time.nanos()
_h.log("dispose: child exit code: " + child_exit_code.string())
_h.log("dispose: stdout: " + _d_stdout)
_h.log("dispose: stdout: " + _d_stdout_chars.string() + " bytes")
_h.log("dispose: stderr: " + _d_stderr)
_h.assert_eq[String box](_out, _d_stdout)
if (_first_data > 0) then
_h.log("dispose: received first data after: \t" + (_first_data - _created).string()
+ " ns")
end
_h.log("dispose: total data process_time: \t" + (last_data - _first_data).string()
+ " ns")
_h.log("dispose: ProcessNotify lifetime: \t" + (last_data - _created).string()
+ " ns")

_h.assert_eq[USize](_out, _d_stdout_chars)
_h.assert_eq[String box](_err, _d_stderr)
_h.assert_eq[I32](_exit_code, child_exit_code)
_h.complete(true)


Loading

0 comments on commit 315a08e

Please sign in to comment.