From 7fac72b1ac8c5a726456f0552ee3c952b2705f7e Mon Sep 17 00:00:00 2001 From: Carlo Baldassi Date: Sun, 9 Sep 2012 19:34:03 +0200 Subject: [PATCH 1/2] Added process redirections to/from files --- base/export.jl | 2 + base/process.jl | 164 ++++++++++++++++++++++++++++++++++++++++--- src/julia-parser.scm | 2 +- 3 files changed, 158 insertions(+), 10 deletions(-) diff --git a/base/export.jl b/base/export.jl index 923a23e48b5af..da791b1ac027b 100644 --- a/base/export.jl +++ b/base/export.jl @@ -299,6 +299,8 @@ export >, >=, >>, + .>>, + .<<, >>>, \, ^, diff --git a/base/process.jl b/base/process.jl index 4de878754c5e0..6a0737af5517c 100644 --- a/base/process.jl +++ b/base/process.jl @@ -161,9 +161,40 @@ function exec(thunk::Function) exit(0) end +type FileSink + s::IOStream + own::Bool + function FileSink(s::IOStream, own::Bool) + if fd(s) == -1 + error("Cannot use the given IOStream as FileSink") + end + this = new(s, own) + if own + finalizer(this, close_sink) + end + return this + end +end + +FileSink(s::IOStream) = FileSink(s, false) + +function FileSink(filename::String, args...) + s = open(filename, args...) + return FileSink(s, true) +end + +function close_sink(sink::FileSink) + if sink.own + close(sink.s) + end +end + +fd(sink::FileSink) = fd(sink.s) + type Cmd exec::Executable pipes::Dict{FileDes,PipeEnd} + sinks::Dict{FileDes,FileSink} pipeline::Set{Cmd} pid::Int32 status::ProcessStatus @@ -175,6 +206,7 @@ type Cmd end this = new(exec, Dict{FileDes,PipeEnd}(), + Dict{FileDes,FileSink}(), Set{Cmd}(), 0, ProcessNotRun(), @@ -211,6 +243,12 @@ end exec(cmd::Cmd) = exec(cmd.exec) +function close_sinks(cmd::Cmd) + for (f,s) in cmd.sinks + close_sink(s) + end +end + ## Port: a file descriptor on a particular command ## type Port @@ -218,12 +256,17 @@ type Port fd::FileDes end -fd(cmd::Cmd, f::FileDes) = Port(cmd,f) +function fd(cmd::Cmd, f::FileDes) + if !has(cmd.pipes, f) && !has(cmd.sinks, f) + return Port(cmd,f) + end + error("no ", f, " available in ", cmd) +end function fd(cmds::Set{Cmd}, f::FileDes) set = Set{Port}() for cmd in cmds - if !has(cmd.pipes, f) + if !has(cmd.pipes, f) && !has(cmd.sinks, f) add(set, fd(cmd,f)) end end @@ -277,11 +320,14 @@ end output(cmds::Cmds) = stdout(cmds) & stderr(cmds) function connect(port::Port, pend::PipeEnd) - if !has(port.cmd.pipes, port.fd) + if !has(port.cmd.pipes, port.fd) && !has(port.cmd.sinks, port.fd) port.cmd.pipes[port.fd] = pend - elseif port.cmd.pipes[port.fd] != pend + elseif has(port.cmd.pipes, port.fd) && port.cmd.pipes[port.fd] != pend error(port.cmd, " is already connected to ", fd(port.cmd.pipes[port.fd])) + elseif has(port.cmd.sinks, port.fd) + error(port.cmd, " is already connected to ", + fd(port.cmd.sinks[port.fd])) end return pend end @@ -339,6 +385,78 @@ end (|)(src::Ports, dst::Cmds) = (src | stdin(dst); dst) (|)(src::Cmds, dst::Cmds) = (stdout(src) | stdin(dst); src & dst) +redir(port::Port, sink::FileSink) = port.cmd.sinks[port.fd] = sink +function redir(ports::Ports, sink::FileSink) + for port in ports + redir(port, sink) + end +end + +function (>)(src::String, dst::Cmds) + redir(stdin(dst), FileSink(src, "r")) + return dst +end + +(<)(dst::Cmds, src::String) = (>)(src, dst) + +function (>)(src::IOStream, dst::Cmds) + redir(stdin(dst), FileSink(src)) + return dst +end + +(<)(dst::Cmds, src::IOStream) = (>)(src, dst) + +function (>)(src::Cmds, dst::String) + redir(stdout(src), FileSink(dst, "w")) + return src +end + +function (>>)(src::Cmds, dst::String) + redir(stdout(src), FileSink(dst, "a")) + return src +end + +(<)(dst::String, src::Cmds) = (>)(src, dst) +(<<)(dst::String, src::Cmds) = (>>)(src, dst) + +function (>)(src::Cmds, dst::IOStream) + redir(stdout(src), FileSink(dst)) + return src +end + +(<)(dst::IOStream, src::Cmds) = (>)(src, dst) + +function (.>)(src::Cmds, dst::String) + redir(stderr(src), FileSink(dst, "w")) + return src +end + +function (.>>)(src::Cmds, dst::String) + redir(stderr(src), FileSink(dst, "a")) + return src +end + +(.<)(dst::String, src::Cmds) = (.>)(src, dst) +(.<<)(dst::String, src::Cmds) = (.>>)(src, dst) + +function (.>)(src::Cmds, dst::IOStream) + redir(stderr(src), FileSink(dst)) + return src +end + +(.<)(dst::IOStream, src::Cmds) = (.>)(src, dst) + +#TODO: here-strings +#function (>>>)(src::String, dst::Cmds) + #redir(stdin(dst), FileSink(src, "r")) + #return dst +#end +# +#(<<<)(dst::Cmds, src::String) = (>>>)(src, dst) + + + + # spawn(cmd) starts all processes connected to cmd function spawn(cmd::Cmd) @@ -364,6 +482,7 @@ function spawn(cmd::Cmd) c.status = ProcessRunning() ptrs = isa(c.exec,Vector{ByteString}) ? _jl_pre_exec(c.exec) : nothing dup2_fds = Array(Int32, 2*numel(c.pipes)) + dup2_sinks = Array(Int32, 2*numel(c.sinks)) close_fds_ = copy(fds) i = 0 for (f,p) in c.pipes @@ -371,11 +490,26 @@ function spawn(cmd::Cmd) dup2_fds[i+=1] = f.fd del(close_fds_, fd(p)) end + i = 0 + for (f,s) in c.sinks + dup2_sinks[i+=1] = fd(s) + dup2_sinks[i+=1] = f.fd + end close_fds = Array(Int32, numel(close_fds_)) i = 0 for f in close_fds_ close_fds[i+=1] = f.fd end + + # save the stderr descriptor because it may be redirected, but we may need to + # print errors from Julia + bk_stderr_fd = ccall(:dup, Int32, (Int32,), STDERR.fd) + if bk_stderr_fd == -1 + println(stderr_stream, "dup: ", strerror()) + exit(0x7f) + end + bk_stderr_stream = fdio(bk_stderr_fd, true) + # now actually do the fork and exec without writes pid = fork() if pid == 0 @@ -385,7 +519,18 @@ function spawn(cmd::Cmd) # dup2 manually inlined to avoid potential heap stomping r = ccall(:dup2, Int32, (Int32, Int32), dup2_fds[i], dup2_fds[i+1]) if r == -1 - println(stderr_stream, "dup2: ", strerror()) + println(bk_stderr_stream, "dup2: ", strerror()) + exit(0x7f) + end + i += 2 + end + i = 1 + n = length(dup2_sinks) + while i <= n + # dup2 manually inlined to avoid potential heap stomping + r = ccall(:dup2, Int32, (Int32, Int32), dup2_sinks[i], dup2_sinks[i+1]) + if r == -1 + println(bk_stderr_stream, "dup2: ", strerror()) exit(0x7f) end i += 2 @@ -396,14 +541,14 @@ function spawn(cmd::Cmd) # close manually inlined to avoid potential heap stomping r = ccall(:close, Int32, (Int32,), close_fds[i]) if r != 0 - println(stderr_stream, "close: ", strerror()) + println(bk_stderr_stream, "close: ", strerror()) exit(0x7f) end i += 1 end if !isequal(ptrs, nothing) ccall(:execvp, Int32, (Ptr{Uint8}, Ptr{Ptr{Uint8}}), ptrs[1], ptrs) - println(stderr_stream, "exec: ", strerror()) + println(bk_stderr_stream, "exec: ", strerror()) exit(0x7f) end # other ways of execing (e.g. a julia function) @@ -412,12 +557,13 @@ function spawn(cmd::Cmd) try exec(c) catch err - show(stderr, err) + show(bk_stderr_stream, err) exit(0x7f) end error("exec should not return but has") end c.pid = pid + close(bk_stderr_stream) # do it manually since gc is disabled end for f in fds_ close(f) @@ -442,7 +588,7 @@ successful(cmd::Cmd) = isa(cmd.status,ProcessRunning) || cmd.successful(cmd.status) wait(cmd::Cmd, nohang::Bool) = - (cmd.status = process_status(wait(cmd.pid,nohang)); successful(cmd)) + (cmd.status = process_status(wait(cmd.pid,nohang)); close_sinks(cmd); successful(cmd)) # wait for a set of command processes to finish diff --git a/src/julia-parser.scm b/src/julia-parser.scm index 926b2a20c3531..a0e73a4494228 100644 --- a/src/julia-parser.scm +++ b/src/julia-parser.scm @@ -10,7 +10,7 @@ (> < >= <= == === != |.>| |.<| |.>=| |.<=| |.==| |.!=| |.=| |.!| |<:| |>:|) (: |..|) (+ - |.+| |.-| |\|| $) - (<< >> >>>) + (<< >> >>> |.<<| |.>>|) (* / |./| % & |.*| |\\| |.\\|) (// .//) (^ |.^|) From 0bc3c1922ba5e7be8550b206c6d825d6af6c26ec Mon Sep 17 00:00:00 2001 From: Carlo Baldassi Date: Sun, 8 Jul 2012 17:28:09 +0200 Subject: [PATCH 2/2] process: added here-strings, more redir operators --- base/export.jl | 5 ++++ base/process.jl | 59 ++++++++++++++++++++++++++++++++++---------- src/julia-parser.scm | 4 +-- 3 files changed, 53 insertions(+), 15 deletions(-) diff --git a/base/export.jl b/base/export.jl index da791b1ac027b..0ce0c1c7ce426 100644 --- a/base/export.jl +++ b/base/export.jl @@ -302,6 +302,11 @@ export .>>, .<<, >>>, + <<<, + &>, + &>>, + &<, + &<<, \, ^, |, diff --git a/base/process.jl b/base/process.jl index 6a0737af5517c..d9d683c6e8272 100644 --- a/base/process.jl +++ b/base/process.jl @@ -193,8 +193,10 @@ fd(sink::FileSink) = fd(sink.s) type Cmd exec::Executable + name::String pipes::Dict{FileDes,PipeEnd} sinks::Dict{FileDes,FileSink} + closed_fds::Vector{FileDes} pipeline::Set{Cmd} pid::Int32 status::ProcessStatus @@ -205,8 +207,10 @@ type Cmd error("Cmd: too few words to exec") end this = new(exec, + "", Dict{FileDes,PipeEnd}(), Dict{FileDes,FileSink}(), + FileDes[], Set{Cmd}(), 0, ProcessNotRun(), @@ -226,7 +230,9 @@ setsuccess(cmd::Cmd, f::Function) = (cmd.successful=f; cmd) ignorestatus(cmd::Cmd) = setsuccess(cmd, ignore_success) function show(io, cmd::Cmd) - if isa(cmd.exec,Vector{ByteString}) + if cmd.name != "" + show(io, cmd.name) + elseif isa(cmd.exec,Vector{ByteString}) esc = shell_escape(cmd.exec...) print(io, '`') for c in esc @@ -237,7 +243,7 @@ function show(io, cmd::Cmd) end print(io, '`') else - invoke(show, (Any,), cmd.exec) + invoke(show, (Any, Any,), io, cmd.exec) end end @@ -257,7 +263,7 @@ type Port end function fd(cmd::Cmd, f::FileDes) - if !has(cmd.pipes, f) && !has(cmd.sinks, f) + if !has(cmd.pipes, f) && !has(cmd.sinks, f) && !contains(cmd.closed_fds, f) return Port(cmd,f) end error("no ", f, " available in ", cmd) @@ -266,12 +272,12 @@ end function fd(cmds::Set{Cmd}, f::FileDes) set = Set{Port}() for cmd in cmds - if !has(cmd.pipes, f) && !has(cmd.sinks, f) + if !has(cmd.pipes, f) && !has(cmd.sinks, f) && !contains(cmd.closed_fds, f) add(set, fd(cmd,f)) end end if isempty(set) - error("no ", f, " available: ", cmds) + error("no ", f, " available in ", cmds) end set end @@ -320,6 +326,9 @@ end output(cmds::Cmds) = stdout(cmds) & stderr(cmds) function connect(port::Port, pend::PipeEnd) + if contains(port.cmd.closed_fds, port.fd) + error(port.cmd, " port ", port.fd, " is closed") + end if !has(port.cmd.pipes, port.fd) && !has(port.cmd.sinks, port.fd) port.cmd.pipes[port.fd] = pend elseif has(port.cmd.pipes, port.fd) && port.cmd.pipes[port.fd] != pend @@ -392,6 +401,7 @@ function redir(ports::Ports, sink::FileSink) end end +# redirect stdout function (>)(src::String, dst::Cmds) redir(stdin(dst), FileSink(src, "r")) return dst @@ -426,6 +436,7 @@ end (<)(dst::IOStream, src::Cmds) = (>)(src, dst) +# redirect stderr function (.>)(src::Cmds, dst::String) redir(stderr(src), FileSink(dst, "w")) return src @@ -446,15 +457,37 @@ end (.<)(dst::IOStream, src::Cmds) = (.>)(src, dst) -#TODO: here-strings -#function (>>>)(src::String, dst::Cmds) - #redir(stdin(dst), FileSink(src, "r")) - #return dst -#end -# -#(<<<)(dst::Cmds, src::String) = (>>>)(src, dst) +# redirect both stdout and stderr +function (&>)(src::Cmds, dst::String) + redir(output(src), FileSink(dst, "w")) + return src +end + +function (&>>)(src::Cmds, dst::String) + redir(output(src), FileSink(dst, "a")) + return src +end +(&<)(dst::String, src::Cmds) = (&>)(src, dst) +(&<<)(dst::String, src::Cmds) = (&>>)(src, dst) + +function (&>)(src::Cmds, dst::IOStream) + redir(output(src), FileSink(dst)) + return src +end + +(&<)(dst::IOStream, src::Cmds) = (&>)(src, dst) + +# here-strings: +function (>>>)(src::String, dst::Cmds) + hscmd = Cmd(()->print(src)) + push(hscmd.closed_fds, STDIN) + push(hscmd.closed_fds, STDERR) + hscmd.name = "here-string<" * src * ">" + return hscmd | dst +end +(<<<)(dst::Cmds, src::String) = (>>>)(src, dst) # spawn(cmd) starts all processes connected to cmd @@ -508,7 +541,7 @@ function spawn(cmd::Cmd) println(stderr_stream, "dup: ", strerror()) exit(0x7f) end - bk_stderr_stream = fdio(bk_stderr_fd, true) + bk_stderr_stream = fdio(bk_stderr_fd) # now actually do the fork and exec without writes pid = fork() diff --git a/src/julia-parser.scm b/src/julia-parser.scm index a0e73a4494228..0b57b487108cb 100644 --- a/src/julia-parser.scm +++ b/src/julia-parser.scm @@ -7,10 +7,10 @@ ; the way the lexer works, every prefix of an operator must also ; be an operator. (<- -- -->) - (> < >= <= == === != |.>| |.<| |.>=| |.<=| |.==| |.!=| |.=| |.!| |<:| |>:|) + (> < >= <= == === != |.>| |.<| |.>=| |.<=| |.==| |.!=| |.=| |.!| |<:| |>:| |&>| |&<|) (: |..|) (+ - |.+| |.-| |\|| $) - (<< >> >>> |.<<| |.>>|) + (<< >> <<< >>> |.<<| |.>>| |&>>| |&<<|) (* / |./| % & |.*| |\\| |.\\|) (// .//) (^ |.^|)