diff --git a/.github/workflows/UnitTest.yml b/.github/workflows/UnitTest.yml index 67cd575..1a3e49e 100644 --- a/.github/workflows/UnitTest.yml +++ b/.github/workflows/UnitTest.yml @@ -14,7 +14,7 @@ jobs: strategy: fail-fast: false matrix: - julia-version: ['1.0', '1', 'nightly'] + julia-version: ['1.6', '1', 'nightly'] os: [ubuntu-latest] arch: [x64] include: diff --git a/.gitignore b/.gitignore index 1c02e5e..b1f89f7 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,12 @@ +.vscode/ + *.jl.*.cov *.jl.cov *.jl.mem Manifest.toml /docs/build/ + +__pycache__/ +.mypy_cache/ + +.workflows diff --git a/Project.toml b/Project.toml index f48f84e..a921099 100644 --- a/Project.toml +++ b/Project.toml @@ -3,11 +3,22 @@ uuid = "115008b9-7a42-4cba-af26-8bebb992e909" authors = ["Johnny Chen "] version = "0.1.0" +[deps] +Configurations = "5218b696-f38b-4ac9-8b61-a12ec717816d" +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" +SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" +TOML = "fa267f1f-6049-4f14-aa54-33bafae1ed76" + [compat] -julia = "1" +Configurations = "0.17" +JSON3 = "1" +TOML = "1" +julia = "1.6" [extras] +Suppressor = "fd094767-a336-5f1f-9728-57cf17d0bbfb" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test"] +test = ["Suppressor", "Test"] diff --git a/dialects.toml b/dialects.toml new file mode 100644 index 0000000..7dc2576 --- /dev/null +++ b/dialects.toml @@ -0,0 +1,2 @@ +[manifest] +version = "0.1.0" diff --git a/examples/manifest/benchmark/.gitignore b/examples/manifest/benchmark/.gitignore new file mode 100644 index 0000000..a9a1bd3 --- /dev/null +++ b/examples/manifest/benchmark/.gitignore @@ -0,0 +1 @@ +reports/ diff --git a/examples/manifest/benchmark/Project.toml b/examples/manifest/benchmark/Project.toml new file mode 100644 index 0000000..ee1d685 --- /dev/null +++ b/examples/manifest/benchmark/Project.toml @@ -0,0 +1,8 @@ +[deps] +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +PrettyTables = "08abe8d2-0d0c-5749-adfa-8a2ac140af0d" +Workflows = "115008b9-7a42-4cba-af26-8bebb992e909" diff --git a/examples/manifest/benchmark/README.md b/examples/manifest/benchmark/README.md new file mode 100644 index 0000000..b356cba --- /dev/null +++ b/examples/manifest/benchmark/README.md @@ -0,0 +1,6 @@ +# Benchmark example with the manfiest dialect + +This demo benchmarks Julia with Numpy on a few functions `sum`, `rand` and `randn` using scripts +in `scripts`. + +Package `numpy` is required for the python executable. diff --git a/examples/manifest/benchmark/benchmark.toml b/examples/manifest/benchmark/benchmark.toml new file mode 100644 index 0000000..e0b0658 --- /dev/null +++ b/examples/manifest/benchmark/benchmark.toml @@ -0,0 +1,58 @@ +version = "0.1" +dialect = "manifest" + +order = [["1", "2", "3", "4"], ["5"]] + +[[tasks]] +name = "sum" +id = "1" +groups = ["Type:LinearAlgebra", "Type:Benchmark", "Framework:Julia"] +deps = ["scripts/julia"] +runner = "juliamodule" +[tasks.run] +script = "scripts/julia/sum.jl" + +[[tasks]] +name = "rand" +id = "2" +groups = ["Type:LinearAlgebra", "Type:Benchmark", "Framework:Julia"] +deps = ["scripts/julia"] +runner = "juliamodule" +[tasks.run] +script = "scripts/julia/rand.jl" + +[[tasks]] +name = "sum" +id = "3" +groups = ["Type:LinearAlgebra", "Type:Benchmark", "Framework:Numpy"] +deps = ["scripts/python"] +runner = "shell" +[tasks.run] +command = "python scripts/numpy/sum.py" +capture = true + +[[tasks]] +name = "randn" +id = "4" +groups = ["Type:LinearAlgebra", "Type:Benchmark", "Framework:Numpy"] +deps = ["scripts/python"] +runner = "shell" +[tasks.run] +command = "python scripts/numpy/randn.py" +capture = true + +[[tasks]] +name = "summary" +id = "5" +groups = ["Type:LinearAlgebra", "Type:Benchmark"] +deps = [ + "summary.jl", + "@__STDOUT__1", + "@__STDOUT__2", + "@__STDOUT__3", + "@__STDOUT__4", +] +outs = ["reports"] +runner = "shell" +[tasks.run] +command = "julia --startup=no --project=. summary.jl" diff --git a/examples/manifest/benchmark/scripts/julia/rand.jl b/examples/manifest/benchmark/scripts/julia/rand.jl new file mode 100644 index 0000000..9d16618 --- /dev/null +++ b/examples/manifest/benchmark/scripts/julia/rand.jl @@ -0,0 +1,12 @@ +using LinearAlgebra +using BenchmarkTools +using JSON3 +include(joinpath(@__DIR__, "utils.jl")) + +rst = Dict() +for n in [64, 128, 256, 512, 1024, 2048] + b = @benchmark rand($n) samples=100 evals=1 + rst[n] = trial_to_dict(b) +end + +JSON3.write(rst) diff --git a/examples/manifest/benchmark/scripts/julia/sum.jl b/examples/manifest/benchmark/scripts/julia/sum.jl new file mode 100644 index 0000000..3d3df29 --- /dev/null +++ b/examples/manifest/benchmark/scripts/julia/sum.jl @@ -0,0 +1,13 @@ +using LinearAlgebra +using BenchmarkTools +using JSON3 +include(joinpath(@__DIR__, "utils.jl")) + +rst = Dict() +for n in [64, 128, 256, 512, 1024, 2048, 4096, 8192] + x = rand(n) + b = @benchmark sum($x) samples=100 evals=1 + rst[n] = trial_to_dict(b) +end + +JSON3.write(rst) diff --git a/examples/manifest/benchmark/scripts/julia/utils.jl b/examples/manifest/benchmark/scripts/julia/utils.jl new file mode 100644 index 0000000..af7e434 --- /dev/null +++ b/examples/manifest/benchmark/scripts/julia/utils.jl @@ -0,0 +1,10 @@ +using BenchmarkTools + +function trial_to_dict(trial::BenchmarkTools.Trial) + d = Dict{String, Float64}() + d["time"] = mean(trial.times) + d["gctimes"] = mean(trial.gctimes) + d["allocs"] = trial.allocs + d["memory"] = trial.memory + return d +end diff --git a/examples/manifest/benchmark/scripts/numpy/randn.py b/examples/manifest/benchmark/scripts/numpy/randn.py new file mode 100644 index 0000000..bacf574 --- /dev/null +++ b/examples/manifest/benchmark/scripts/numpy/randn.py @@ -0,0 +1,9 @@ +import numpy as np +import json +from utils import belapsed + +rst: dict = {} +for n in [64, 128, 256, 512, 1024, 2048]: + rst[n] = {"time": belapsed(lambda: np.random.randn(n), number=100)} + +print(json.dumps(rst)) diff --git a/examples/manifest/benchmark/scripts/numpy/sum.py b/examples/manifest/benchmark/scripts/numpy/sum.py new file mode 100644 index 0000000..f4d4d71 --- /dev/null +++ b/examples/manifest/benchmark/scripts/numpy/sum.py @@ -0,0 +1,10 @@ +import numpy as np +import json +from utils import belapsed + +rst: dict = {} +for n in [64, 128, 256, 512, 1024, 2048]: + x = np.random.rand(n) + rst[n] = {"time": belapsed(lambda: x.sum(), number=100)} + +print(json.dumps(rst)) diff --git a/examples/manifest/benchmark/scripts/numpy/utils.py b/examples/manifest/benchmark/scripts/numpy/utils.py new file mode 100644 index 0000000..298a9b6 --- /dev/null +++ b/examples/manifest/benchmark/scripts/numpy/utils.py @@ -0,0 +1,10 @@ +import timeit + + +def belapsed(func, *, number=None): + if number: + return timeit.timeit(func, number=number) / number + else: + timer = timeit.Timer(func) + n, t = timer.autorange() + return t / n diff --git a/examples/manifest/benchmark/summary.jl b/examples/manifest/benchmark/summary.jl new file mode 100644 index 0000000..8709954 --- /dev/null +++ b/examples/manifest/benchmark/summary.jl @@ -0,0 +1,48 @@ +src_file = get(ENV, "WORKFLOW_TMP_INFILE", "") +@assert isfile(src_file) "file $src_file not existed" + +using JSON3 +using DataFrames +using CSV +using PrettyTables + +data = open(src_file) do io + JSON3.read(io) +end + +# flatten and merge results into one single big dataframe +dfs = map(String.(keys(data))) do tid + X = JSON3.read(data[tid]) + X_df = map(collect(keys(X))) do sz + d = Dict(X[sz]) + d[:size] = parse(Int, String(sz)) + d[:tid] = tid + d + end |> DataFrame +end +df = reduce(dfs) do X, Y + outerjoin(X, Y; on=intersect(names(X), names(Y)), matchmissing=:equal) +end + +# format markdown reports +buffer = IOBuffer() +for df_sz in groupby(df, :size) + println(buffer, "# size: ", df_sz[!, :size][1], "\n") + + # drop the types line provided by DataFrames + tmp_buffer = IOBuffer() + PrettyTables.pretty_table( + tmp_buffer, + df_sz; + tf=PrettyTables.tf_markdown) + lines = split(String(take!(tmp_buffer)), "\n") + println(buffer, lines[1]) + foreach(l->println(buffer, l), lines[3:end]) + + println(buffer) +end + +# save final results +isdir("reports") || mkdir("reports") +CSV.write("reports/results.csv", df) +write("reports/report.md", take!(buffer)) diff --git a/src/Workflows.jl b/src/Workflows.jl index 0f00140..27df160 100644 --- a/src/Workflows.jl +++ b/src/Workflows.jl @@ -1,5 +1,15 @@ module Workflows -# Write your package code here. +using Configurations +using SHA +using JSON3 + +include("dialects/Dialects.jl") +using .Dialects: load_config, save_config +using .Dialects: AbstractWorkflow, ManifestWorkflow +using .Dialects: task_id, task_deps +include("runners/Runners.jl") +using .Runners: execute_task +include("scheduler.jl") end diff --git a/src/dialects/Dialects.jl b/src/dialects/Dialects.jl new file mode 100644 index 0000000..3205d60 --- /dev/null +++ b/src/dialects/Dialects.jl @@ -0,0 +1,24 @@ +module Dialects + +using Configurations +using TOML +using Printf + +import Configurations: from_dict, to_dict + +const spec_versions = begin + config = TOML.parsefile(joinpath(@__DIR__, "..", "..", "dialects.toml")) + Dict(k=>VersionNumber(v["version"]) for (k, v) in config) +end + +abstract type AbstractTask end +abstract type AbstractWorkflow end +abstract type AbstractExecutionOrder end + +include("traits.jl") +include("orders.jl") +include("manifest.jl") +include("utils.jl") +include("config_io.jl") + +end #module diff --git a/src/dialects/config_io.jl b/src/dialects/config_io.jl new file mode 100644 index 0000000..0db9dcb --- /dev/null +++ b/src/dialects/config_io.jl @@ -0,0 +1,54 @@ +""" + load_config(filename::AbstractString) + +Load workflow configuration from `filename`. +""" +function load_config(filename::AbstractString) + name, ext = splitext(basename(filename)) + config = if ext == ".toml" + # https://toml.io/en/v1.0.0#filename-extension + # TOML files should use the extension `.toml` + TOML.parsefile(filename) + else + throw(ArgumentError("unsupported file extension: \"$ext\".")) + end + dialect = config["dialect"] # required + ver = config["version"] # required + check_version(dialect, VersionNumber(ver)) + + # runtime dispatch to custom dialect implementation + return load_config(Val(Symbol(dialect)), config) +end +load_config(::Val{d}, config) where d = error("unsupported workflow dialect \"$d\".") + +""" + save_config(filename::AbstractString, workflow) + +Save workflow configuration into `filename`. +""" +function save_config(filename::AbstractString, workflow::AbstractWorkflow) + name, ext = splitext(basename(filename)) + if ext == ".toml" + config = to_dict(workflow, TOMLStyle) + config["version"] = string(spec_versions[workflow.dialect]) + open(filename, "w") do io + TOML.print(convert_to_builtin, io, config) + end + else + throw(ArgumentError("unsupported file extension: \"$ext\".")) + end + return +end + +# some custom types need to be converted to built in types before serialization +convert_to_builtin(p::PipelineOrder) = p.stages +convert_to_builtin(v::VersionNumber) = string(v) +convert_to_builtin(v) = v + +# TOML support +function Configurations.to_toml(io::IO, x::AbstractWorkflow; kwargs...) + to_toml(convert_to_builtin, io, x; kwargs...) +end +function Configurations.to_toml(filename::String, x::AbstractWorkflow; kwargs...) + to_toml(convert_to_builtin, filename, x; kwargs...) +end diff --git a/src/dialects/manifest.jl b/src/dialects/manifest.jl new file mode 100644 index 0000000..7097d72 --- /dev/null +++ b/src/dialects/manifest.jl @@ -0,0 +1,71 @@ +@option struct SimpleTask <: AbstractTask + name::String + id::String + groups::Vector{String} = String[] + deps::Vector{String} = String[] + outs::Vector{String} = String[] + runner::String # TODO(johnnychen94): verify runner type + run::Dict{String, Any} = Dict{String, Any}() +end +task_id(t::SimpleTask) = t.id +task_name(t::SimpleTask) = t.name +task_groups(t::SimpleTask) = t.groups +task_deps(t::SimpleTask) = t.deps +task_outs(t::SimpleTask) = t.outs +runner_type(t::SimpleTask) = t.runner +runner_info(t::SimpleTask) = t.run + +@option struct ManifestWorkflow <: AbstractWorkflow + version::VersionNumber + dialect::String = "manifest" + order::AbstractExecutionOrder = PipelineOrder(String[]) + tasks::Dict{String,AbstractTask} = Dict{String,AbstractTask}() + function ManifestWorkflow(version::VersionNumber, dialect::String, order, tasks) + @assert dialect == "manifest" + + order_taskids = Set(union(String[], order.stages...)) + taskids_set = Set(keys(tasks)) + if order_taskids != taskids_set + msg = if length(order_taskids) > length(taskids_set) + d = collect(setdiff(order_taskids, taskids_set)) + @sprintf "\"order\" contains some undefined tasks: %s." d + else + d = collect(setdiff(taskids_set, order_taskids)) + @sprintf "some tasks are not defined in \"order\": %s." d + end + throw(ArgumentError(msg)) + end + + new(version, dialect, order, tasks) + end +end +load_config(::Val{:manifest}, config::AbstractDict) = from_dict(ManifestWorkflow, config) + +function from_dict(::Type{ManifestWorkflow}, ::OptionField{:order}, ::Type{AbstractExecutionOrder}, order) + if order isa AbstractVector + return PipelineOrder(order) + else + throw(ArgumentError("Unsupported order type: $(typeof(order)).")) + end +end +from_dict(::Type{ManifestWorkflow}, ::Type{VersionNumber}, ver::AbstractString) = VersionNumber(ver) + +# To more efficiently get a task of specific task id, we convert the list to dictionary +# before construction. Because this is different from how we represent it in the +# configuration file, which is a list, we hereby patch it with `from_dict`/`to_dict` +# methods. +function from_dict(::Type{ManifestWorkflow}, ::OptionField{:tasks}, ::Type{Dict{String,AbstractTask}}, tasks) + taskids = map(t->t["id"], tasks) + taskids_set = Set(taskids) + if length(taskids) != length(taskids_set) + ids = filter(taskids_set) do id + count(isequal(id), taskids) > 1 + end + throw(ArgumentError("duplicate tasks detected: $(collect(ids)).")) + end + return Dict(t["id"]=>from_dict(SimpleTask, t) for t in tasks) +end + +function to_dict(::Type{ManifestWorkflow}, tasks::Dict{String,AbstractTask}, option::Configurations.ToDictOption) + map(to_dict, values(tasks)) +end diff --git a/src/dialects/orders.jl b/src/dialects/orders.jl new file mode 100644 index 0000000..23ddf36 --- /dev/null +++ b/src/dialects/orders.jl @@ -0,0 +1,63 @@ +""" + PipelineOrder(stages::Vector{Vector{String}}) + +Creates a pipeline execution order object. + +Stages `[["1", "2"], ["3", "4", "5"], ["6"]]` would create the following pipeline: + +```text +| Stage 1 | Stage 2 | Stage 3 | +| ------- | -------- | ------ | +| Task 1 | Task 3 | | +| Task 2 | Task 4 | Task 6 | +| | Task 5 | | +``` + +The task scheduler would first execute all tasks `["1", "2"]` in stage 1, and then all tasks +`["3", "4", "5"]` in stage 2, and finally all tasks `["6"]` in stage 3. The execution order +for tasks in the same stage is undefined. +""" +struct PipelineOrder <: AbstractExecutionOrder + stages::Vector{Vector{String}} + function PipelineOrder(stages::Vector{<:Vector}) + flatten_tasks = union(String[], stages...) + tasks = vcat(stages...) + if length(tasks) != length(flatten_tasks) + ids = filter(flatten_tasks) do id + count(isequal(id), tasks) > 1 + end + throw(ArgumentError("duplicate tasks detected in \"order\": $ids.")) + end + return new(stages) + end +end +PipelineOrder(stages::Vector{String}) = PipelineOrder([[x] for x in stages]) +Base.:(==)(p1::PipelineOrder, p2::PipelineOrder) = p1.stages == p2.stages + +function Base.iterate(o::PipelineOrder) + (length(o.stages) >= 1 && length(o.stages[1]) >= 1) || return nothing + status = falses(sum(length, o.stages)) + status[1] = true + return o.stages[1][1], status +end +function Base.iterate(o::PipelineOrder, state) + length(o.stages) == 0 && return nothing + n = 0 + for i in 1:length(o.stages) + cur_stage = o.stages[i] + stage_status = @view state[n+1:n+length(cur_stage)] + n += length(cur_stage) + all(stage_status) && continue + idx = findfirst(x->!x, stage_status) + if !isnothing(idx) + # Techniquelly, this can be thread unsafe in the sense that one task can be + # executed multiple times in parallel. It's the caller's duty to maintain the + # runtime status pool. + stage_status[idx] = true + return cur_stage[idx], state + end + end + return nothing +end + +# TODO(johnnychen94): add DAG support diff --git a/src/dialects/traits.jl b/src/dialects/traits.jl new file mode 100644 index 0000000..25a431d --- /dev/null +++ b/src/dialects/traits.jl @@ -0,0 +1,55 @@ +""" + task_id(t::AbstractTask)::String + +Return the unique ID of task `t`. + +We assume that `task_id(x) == task_id(y)` if and only if the execution of task `x` is +"equivalent" to that of task `y`. +""" +task_id(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + task_name(t::AbstractTask)::String + +Return the task name of task `t`. + +Unlike [`taskid`](@ref), task names does not need to be unique. +""" +task_name(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + task_groups(t::AbstractTask)::Set + +Return the groups that task `t` belongs to. +""" +task_groups(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + task_deps(t::AbstractTask)::Set + +Return the assumed dependencies that task `t` requires so as to be executed successfully. +""" +task_deps(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + task_outs(t::AbstractTask)::Set + +Return the assumed outputs that task execution of `t` will create, if executed +successfully. +""" +task_outs(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + + +""" + runner_type(t::AbstractTask)::String + +Return the runner type of task `t`. +""" +runner_type(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + runner_info(t::AbstractTask)::Dict{String, Any} + +Return the extra information for task `t` runner. +""" +runner_info(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") diff --git a/src/dialects/utils.jl b/src/dialects/utils.jl new file mode 100644 index 0000000..58c27fb --- /dev/null +++ b/src/dialects/utils.jl @@ -0,0 +1,19 @@ +function check_version(dialect::String, ver::VersionNumber) + is_compat = check_version(Bool, dialect, ver) + if !is_compat + spec_ver = spec_versions[dialect] + @warn "workflow file version $(ver) might not be compatible with the toolchain version $(spec_ver) for dialect $(dialect)." + end + return +end +function check_version(::Type{Bool}, dialect::String, ver::VersionNumber) + if !haskey(spec_versions, dialect) + # if it is unregistered dialect, disable version check for it. + @debug "version info for dialect \"$(dialect)\" not found in dialect registry \"dialects.toml\"." + return true + end + spec_ver = spec_versions[dialect] + ver.major <= spec_ver.major || return false + ver.major == 0 && ver.minor == spec_ver.minor || return false + return true +end diff --git a/src/runners/Runners.jl b/src/runners/Runners.jl new file mode 100644 index 0000000..1f46e5b --- /dev/null +++ b/src/runners/Runners.jl @@ -0,0 +1,31 @@ +module Runners + +using Configurations +using ..Dialects: AbstractTask +using ..Dialects: task_id, task_name, task_groups, task_deps, task_outs +using ..Dialects: runner_type, runner_info + +abstract type AbstractTaskRunner end + +""" + execute_task([exec::AbstractTaskRunner], t::AbstractTask; workdir=".") + +Execute task `t` using runner `exec` in folder `workdir`. +""" +execute_task(task::AbstractTask; kwargs...) = execute_task(build_runner(task), task; kwargs...) +execute_task(r::RT, t::AbstractTask; kwargs...) where RT<:AbstractTaskRunner = error("Not implemented for runner type: $RT.") + +""" + build_runner(t::AbstractTask) + +Build the runner object for task `t`. +""" +build_runner(t::AbstractTask) = build_runner(Val(Symbol(runner_type(t))), runner_info(t)) +build_runner(::Val{runner_type}, run_info::AbstractDict) where runner_type = error("Not implemented for runner type: $runner_type.") + +include("juliamodule.jl") # runner = "juliamodule" +include("shell.jl") # runner = "shell" + +include("compat.jl") + +end #module diff --git a/src/runners/compat.jl b/src/runners/compat.jl new file mode 100644 index 0000000..09b917c --- /dev/null +++ b/src/runners/compat.jl @@ -0,0 +1,55 @@ +@static if VERSION < v"1.7" +# copied from https://github.com/JuliaLang/julia/blob/a400a24a5d9e6609740814e4092538feb499ce60/base/stream.jl#L1290-L1415 + +function redirect_stdio(;stdin=nothing, stderr=nothing, stdout=nothing) + stdin === nothing || redirect_stdin(stdin) + stderr === nothing || redirect_stderr(stderr) + stdout === nothing || redirect_stdout(stdout) +end + +function redirect_stdio(f; stdin=nothing, stderr=nothing, stdout=nothing) + + function resolve(new::Nothing, oldstream, mode) + (new=nothing, close=false, old=nothing) + end + function resolve(path::AbstractString, oldstream,mode) + (new=open(path, mode), close=true, old=oldstream) + end + function resolve(new, oldstream, mode) + (new=new, close=false, old=oldstream) + end + + same_path(x, y) = false + function same_path(x::AbstractString, y::AbstractString) + # if x = y = "does_not_yet_exist.txt" then samefile will return false + (abspath(x) == abspath(y)) || samefile(x,y) + end + if same_path(stderr, stdin) + throw(ArgumentError("stdin and stderr cannot be the same path")) + end + if same_path(stdout, stdin) + throw(ArgumentError("stdin and stdout cannot be the same path")) + end + + new_in , close_in , old_in = resolve(stdin , Base.stdin , "r") + new_out, close_out, old_out = resolve(stdout, Base.stdout, "w") + if same_path(stderr, stdout) + # make sure that in case stderr = stdout = "same/path" + # only a single io is used instead of opening the same file twice + new_err, close_err, old_err = new_out, false, Base.stderr + else + new_err, close_err, old_err = resolve(stderr, Base.stderr, "w") + end + + redirect_stdio(; stderr=new_err, stdin=new_in, stdout=new_out) + + try + return f() + finally + redirect_stdio(;stderr=old_err, stdin=old_in, stdout=old_out) + close_err && close(new_err) + close_in && close(new_in ) + close_out && close(new_out) + end +end +end diff --git a/src/runners/juliamodule.jl b/src/runners/juliamodule.jl new file mode 100644 index 0000000..954a761 --- /dev/null +++ b/src/runners/juliamodule.jl @@ -0,0 +1,34 @@ +@option struct JuliaModuleRunner <: AbstractTaskRunner + script::String + workdir::String = "@__DIR__" + """set false to disable stdout, i.e., redirect to devnull.""" + enable_stdout::Bool = true + """set false to disable stderr, i.e., redirect to devnull.""" + enable_stderr::Bool = true + """set false to suppress the errors with warning if the command fails to execute.""" + strict::Bool = true +end +build_runner(::Val{:juliamodule}, run_info::AbstractDict) = from_dict(JuliaModuleRunner, run_info) + +function execute_task(exec::JuliaModuleRunner, t::AbstractTask; workdir::String=".", kwargs...) + script = strip(exec.script) + workdir = replace(exec.workdir, "@__DIR__" => workdir) + stdout = exec.enable_stdout ? Base.stdout : devnull + stderr = exec.enable_stderr ? Base.stderr : devnull + cd(workdir) do + script = abspath(script) + m = Module(gensym()) + @debug "Executing task $(task_id(t)) in julia module" workdir=pwd() script + Core.eval(m, :(include(x) = Base.include($m, x))) + try + redirect_stdio(stdout=stdout, stderr=stderr, stdin=devnull) do + Core.eval(m, :(Base.include($m, $script))) + end + catch err + # Unwrap the LoadError due to `include("script.jl")` call so that it looks like + # a "real" `julia script.jl` process. + err = err isa LoadError ? err.error : err + exec.strict ? rethrow(err) : @warn "failed to execute task $(task_id(t)) in juliamodule runner" err + end + end +end diff --git a/src/runners/shell.jl b/src/runners/shell.jl new file mode 100644 index 0000000..22a03fb --- /dev/null +++ b/src/runners/shell.jl @@ -0,0 +1,210 @@ +@option struct ShellRunner <: AbstractTaskRunner + command::String + workdir::String = "@__DIR__" + """set true to capture the last non-empty line of stdout as execution result. The captured result will not be printed to stdout.""" + capture::Bool = false + """set false to disable stdout, i.e., redirect to devnull.""" + enable_stdout::Bool = true + """set false to disable stderr, i.e., redirect to devnull.""" + enable_stderr::Bool = true + """set false to suppress the errors with warning if the command fails to execute.""" + strict::Bool = true +end +build_runner(::Val{:shell}, run_info::AbstractDict) = from_dict(ShellRunner, run_info) + +function execute_task(exec::ShellRunner, t::AbstractTask; workdir::String=".", env=nothing) + cmd = strip(exec.command) + workdir = replace(exec.workdir, "@__DIR__" => workdir) + cd(workdir) do + @debug "executing task $(task_id(t)) in shell runner" workdir=pwd() + # TODO: redirect stdout as result + stdout = exec.enable_stdout ? Base.stdout : devnull + stderr = exec.enable_stderr ? Base.stderr : devnull + try + if exec.capture + capture_run(cmd; stdout=stdout, stderr=stderr, env=env) + else + non_capture_run(cmd; stdout=stdout, stderr=stderr, env=env) + end + catch err + exec.strict ? rethrow() : @warn "failed to execute task $(task_id(t)) in shell runner" err + # TODO(johnnychen94): maybe cleanup intermediate results? + end + end +end + + +struct ShellExecutionError <: Exception + msg::String +end + +""" + capture_run(cmd; [stdout], [stderr], [env]) + +Run command `cmd` and capture the last non-empty line of `stdout` as the function result. + +# Examples + +```julia +# supprssing io with `devnull` doesn't affect the capturing +julia>capture_run(`printf "hello world \\n2"`; stdout=devnull) +"2" + +julia>capture_run(`julia -e 'println("hello world"); println(rand(1:4, 4, 4))'`; stdout=devnull) +"[2 1 4 4; 1 2 3 4; 3 2 4 4; 1 1 1 2]" + +# escape sequences are kept faithfully; it's the caller's responsibility to appropriately handle them. +julia> capture_run(`julia -e 'println("\e[0;31mred")'`) +"\e[0;31mred" +``` +""" +function capture_run(cmd::Union{Cmd, Base.OrCmds}; stdout=stdout, stderr=stderr) + interval = 0.1 # 100ms + isnewline_char(x) = x == UInt8('\n') # TODO(johnnychen94): maybe also check CRLF? + + function continue_line!(line, buffer) + data = take!(buffer) + idx = findlast(isnewline_char, data) + if isnothing(idx) + # if the current line is not yet finished + append!(line, data) + else + # otherwise, we get a new line and the previous results can be safely printed + + if length(line) > 0 && (isnewline_char(line[end])) + write(stdout, line) + resize!(line, 0) + end + + # now let's check if the new contents has if only one non-empty line + if last(idx) == length(data) + # ignore trailing newlines by searching backwards from the first position + # that is not '\n'. + prev_idx = findprev(x->!isnewline_char(x), data, first(idx)) + if isnothing(prev_idx) + # with only `\n`s, simply appending to line should be sufficient if this + # happens to be the last line, we will eventually discard the trailing + # newlines + append!(line, data) + else + prev_idx = findprev(isnewline_char, data, prev_idx) + if isnothing(prev_idx) + # if contains only one line, great + append!(line, data) + else + # otherwise, only append the last non-empty line + write(stdout, @view data[1:last(prev_idx)]) + resize!(line, 0) + append!(line, @view data[last(prev_idx)+1:end]) + end + end + else + # otherwise, print previous lines to stdout, and reset the current line with + # new line content + write(stdout, @view data[1:last(idx)]) + resize!(line, 0) + append!(line, @view data[last(idx)+1:end]) + end + end + + return line + end + + line = UInt8[] + buffer = IOBuffer() + proc = try + run(pipeline(cmd, stdout=buffer, stderr=stderr), wait=false) + catch err + err isa Base.IOError && rethrow(ShellExecutionError(err.msg)) + rethrow() + end + + while process_running(proc) + continue_line!(line, buffer) + sleep(interval) + end + + @assert process_exited(proc) + if proc.exitcode != 0 + # If the process existed unexpectedly, the last non-empty line in the buffer is + # almost useless. Thus here we directly rethrow the errors, and let the caller + # decide how to handle the exception. + write(stdout, line) + throw(ShellExecutionError("non-zero exit code: $(proc.exitcode)")) + end + continue_line!(line, buffer) + + # discard trailing '\n's + idx = findprev(x->!isnewline_char(x), line, length(line)) + if isnothing(idx) + return "" + else + resize!(line, idx) + end + # if it contains multiple lines, only capture the last line + idx = findfirst(isnewline_char, line) + if !(isnothing(idx) || (idx == length(line))) + write(stdout, @view line[1:last(idx)]) + return String(line[last(idx)+1:end]) + else + return String(line) + end +end + +function capture_run(cmd::AbstractString; env=nothing, kwargs...) + try + capture_run(build_cmd_pipeline(cmd; env=env); kwargs...) + catch err + if err isa ShellExecutionError + rethrow(ShellExecutionError("failed to execute command `$cmd`: $(err.msg)")) + end + rethrow() + end +end + +""" + non_capture_run(cmd; [stdout], [stderr], [env]) + +Run command `cmd` similar to `run(pipeline(cmd; stdout, stderr); wait=true)` except that it +throws `ShellExecutionError` if the command fails. +""" +function non_capture_run(cmd::Union{Cmd, Base.OrCmds}; stdout=stdout, stderr=stderr) + interval = 0.1 #100ms + proc = try + # set `wait=false` here and do manual check for better exception handling + run(pipeline(cmd, stdout=stdout, stderr=stderr); wait=false) + catch err + err isa Base.IOError && rethrow(ShellExecutionError(err.msg)) + rethrow() + end + while process_running(proc) + sleep(interval) + end + @assert process_exited(proc) + if proc.exitcode != 0 + throw(ShellExecutionError("non-zero exit code: $(proc.exitcode)")) + end +end + +function non_capture_run(cmd::AbstractString; env=nothing, kwargs...) + try + non_capture_run(build_cmd_pipeline(cmd; env=env); kwargs...) + catch err + if err isa ShellExecutionError + rethrow(ShellExecutionError("failed to execute command `$cmd`: $(err.msg)")) + end + rethrow() + end + return "" # to keep consistent with `capture_run` version +end + +# TODO(johnnychen94): support shell pipes such as "grep results.csv julia | cut -d, -f2" (#7) +function build_cmd_pipeline(cmd::AbstractString; env=nothing) + # We can't simply interpolate the entire string by `$(cmd)`: + # ```julia + # cmd = "echo 'hello world'" + # run(`$cmd`) # errors + # run(`echo 'hello world'`) # works + # ``` + return Cmd(Cmd(Base.shell_split(cmd)); env=env) +end diff --git a/src/scheduler.jl b/src/scheduler.jl new file mode 100644 index 0000000..d8952dc --- /dev/null +++ b/src/scheduler.jl @@ -0,0 +1,95 @@ +""" + execute(filename::AbstractString; workdir=dirname(filename), kwargs...) + execute(w::AbstractWorkflow; workdir=".", kwargs...) + +Execute workflow `w` or workflow defined in file `filename`. + +# Parameters + +- `workdir`: the working folder to execute the workflow. +- `cleanup=false`: true to clean the internal tmp files in `.workflows/tmp` folder when + julia exits. +""" +function execute(filename::AbstractString; workdir="", kwargs...) + w = load_config(filename) + workdir = isempty(workdir) ? dirname(filename) : workdir + execute(w; workdir=workdir, kwargs...) +end + +function execute(w::ManifestWorkflow; workdir=".", cleanup=false) + tasklocks = Dict{String,ReentrantLock}() + results = Dict{String,Any}() # store the captured stdout result of shell runner + + tmpdir = workflow_tmpdir(w; workdir=workdir) + @info "execute workflow" workdir cleanup tmpdir + if cleanup + atexit() do + tmpdir = abspath(tmpdir) + rm(tmpdir; recursive=true, force=true) + tmproot = dirname(tmpdir) + if isempty(readdir(tmproot)) + rm(tmproot) + end + end + end + + # TODO(johnnychen94): support threads (#6) + for taskid in w.order + haskey(tasklocks, taskid) || (tasklocks[taskid] = ReentrantLock()) + lock(tasklocks[taskid]) do + t = w.tasks[taskid] + tid = task_id(t) + + # If this task requests STDOUT results from previous tasks, then dump the data + # as json file and store the file path as environment variable. The task + # script/command is resonsible for appropriately handling this. + stdout_deps = _get_stdout_deps(results, t) + tmpfile = joinpath(tmpdir, "deps_$tid.json") + env = _dump_stdout_to_temp(tmpfile, stdout_deps; workdir=workdir) + if !isnothing(env) + @info "prepare task $(tid)" tmpfile + end + + @info "Executing task $(tid)" + results[taskid] = execute_task(t; workdir=workdir, env=env) + end + end + return +end + + +function _dump_stdout_to_temp(filename::String, data::Dict; workdir) + isempty(data) && return nothing + + tmpdir = dirname(filename) + isdir(tmpdir) || mkpath(tmpdir) + open(filename, "w") do io + JSON3.write(io, data) + end + + env = copy(ENV) + env["WORKFLOW_TMP_INFILE"] = relpath(filename, workdir) + return env +end + +function _get_stdout_deps(results, t) + # `results` is readonly here + + # Prepare all `@__STDOUT__*` results required by task `t` and dump into filesystem + function get_result(name) + m = match(r"@__STDOUT__(?.*)", name) + isnothing(m) && return nothing + id = m[:taskid] + if !haskey(results, id) + @warn "task \"$(task_id(t))\" requests stdout of task \"$id\" but failed to get it" + return nothing + end + return id => results[id] + end + return Dict(filter!(x->!isnothing(x), map(get_result, task_deps(t)))) +end + +function workflow_tmpdir(w; workdir=".") + val = bytes2hex(sha256(to_toml(w)))[1:8] + return joinpath(workdir, ".workflows", "tmp", val) +end diff --git a/test/dialects/config_io.jl b/test/dialects/config_io.jl new file mode 100644 index 0000000..3c62b17 --- /dev/null +++ b/test/dialects/config_io.jl @@ -0,0 +1,46 @@ +module ConfigIOTest + +using Workflows +using Workflows.Dialects +using Workflows: load_config, save_config +using Workflows.Dialects: ManifestWorkflow +using Suppressor +using Test + +const tmp_testdir = mktempdir() +const example_dir = joinpath(@__DIR__, "examples") + +@testset "config io" begin + # More comprehensive parsing test goes to dialects, here we only ensure that the + # `load_config` and `save_config` works as expected. + + tmpfile = joinpath(tmp_testdir, "tmp.toml") + config_filename = joinpath(example_dir, "manifest", "good", "standard.toml") + w = load_config(config_filename) + @test w isa ManifestWorkflow + save_config(tmpfile, w) + w2 = load_config(tmpfile) + @test w == w2 + + # extension sensitive + tmptxt = joinpath(tmp_testdir, "tmp.txt") + err = ArgumentError("unsupported file extension: \".txt\".") + @test_throws err save_config(tmptxt, w) + cp(tmpfile, tmptxt) + @test_throws err load_config(tmptxt) + + @testset "version compat" begin + # warn when we load a file with too new version + config_filename = joinpath(example_dir, "manifest", "good", "new_version.toml") + w = @test_nowarn @suppress_err load_config(config_filename) + msg = @capture_err load_config(config_filename) + @test occursin("workflow file version 999.999.999 might not be compatible", msg) + + # save config with current dialect version + tmpfile = joinpath(tmp_testdir, "tmp.toml") + save_config(tmpfile, w) + w = @test_nowarn load_config(tmpfile) + @test w.version == Dialects.spec_versions["manifest"] + end +end +end diff --git a/test/dialects/examples/foreign/foreign.toml b/test/dialects/examples/foreign/foreign.toml new file mode 100644 index 0000000..f3422a8 --- /dev/null +++ b/test/dialects/examples/foreign/foreign.toml @@ -0,0 +1,2 @@ +version = "0.1" +dialect = "foreign" diff --git a/test/dialects/examples/manifest/bad/duplicate_order.toml b/test/dialects/examples/manifest/bad/duplicate_order.toml new file mode 100644 index 0000000..b885332 --- /dev/null +++ b/test/dialects/examples/manifest/bad/duplicate_order.toml @@ -0,0 +1,18 @@ +version = "0.1" +dialect = "manifest" + +order = [["1"], ["1", "2"]] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + +[[tasks]] +name = "task1" +id = "2" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/bad/duplicate_taskid.toml b/test/dialects/examples/manifest/bad/duplicate_taskid.toml new file mode 100644 index 0000000..e15d382 --- /dev/null +++ b/test/dialects/examples/manifest/bad/duplicate_taskid.toml @@ -0,0 +1,18 @@ +version = "0.1" +dialect = "manifest" + +order = [["1"]] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/bad/missing_order.toml b/test/dialects/examples/manifest/bad/missing_order.toml new file mode 100644 index 0000000..c13c069 --- /dev/null +++ b/test/dialects/examples/manifest/bad/missing_order.toml @@ -0,0 +1,19 @@ +version = "0.1" +dialect = "manifest" + +order = ["1"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + + +[[tasks]] +name = "task1" +id = "2" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/bad/missing_task.toml b/test/dialects/examples/manifest/bad/missing_task.toml new file mode 100644 index 0000000..5fe3646 --- /dev/null +++ b/test/dialects/examples/manifest/bad/missing_task.toml @@ -0,0 +1,19 @@ +version = "0.1" +dialect = "manifest" + +order = ["1", "2", "3"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + + +[[tasks]] +name = "task1" +id = "2" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/bad/new_version.toml b/test/dialects/examples/manifest/bad/new_version.toml new file mode 100644 index 0000000..babb2b0 --- /dev/null +++ b/test/dialects/examples/manifest/bad/new_version.toml @@ -0,0 +1,11 @@ +version = "999.999.999" +dialect = "manifest" + +order = ["1"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/good/empty.toml b/test/dialects/examples/manifest/good/empty.toml new file mode 100644 index 0000000..525e59a --- /dev/null +++ b/test/dialects/examples/manifest/good/empty.toml @@ -0,0 +1,4 @@ +version = "0.1" +dialect = "manifest" + +order = [] diff --git a/test/dialects/examples/manifest/good/flat_order.toml b/test/dialects/examples/manifest/good/flat_order.toml new file mode 100644 index 0000000..47800a2 --- /dev/null +++ b/test/dialects/examples/manifest/good/flat_order.toml @@ -0,0 +1,18 @@ +version = "0.1" +dialect = "manifest" + +order = ["1", "2"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + +[[tasks]] +name = "task2" +id = "2" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/good/new_version.toml b/test/dialects/examples/manifest/good/new_version.toml new file mode 100644 index 0000000..6a419f9 --- /dev/null +++ b/test/dialects/examples/manifest/good/new_version.toml @@ -0,0 +1,8 @@ +version = "999.999.999" +dialect = "manifest" +order = ["1"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" diff --git a/test/dialects/examples/manifest/good/optional.toml b/test/dialects/examples/manifest/good/optional.toml new file mode 100644 index 0000000..1c94d32 --- /dev/null +++ b/test/dialects/examples/manifest/good/optional.toml @@ -0,0 +1,11 @@ +version = "0.1" +dialect = "manifest" + +order = [["1"]] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/good/optional_custom.toml b/test/dialects/examples/manifest/good/optional_custom.toml new file mode 100644 index 0000000..7a66033 --- /dev/null +++ b/test/dialects/examples/manifest/good/optional_custom.toml @@ -0,0 +1,15 @@ +version = "0.1" +dialect = "manifest" + +order = [["1"]] + +[[tasks]] +name = "task1" +id = "1" +groups = ["groupA", "groupB"] +deps = ["dirA", "fileB"] +outs = ["@__STDOUT__"] +runner = "juliamodule" +[tasks.run] +workdir = "somewhere" +script = "main.jl" diff --git a/test/dialects/examples/manifest/good/standard.toml b/test/dialects/examples/manifest/good/standard.toml new file mode 100644 index 0000000..157c7bf --- /dev/null +++ b/test/dialects/examples/manifest/good/standard.toml @@ -0,0 +1,36 @@ +version = "0.1" +dialect = "manifest" +order = [["1"], ["2", "3"]] + +[[tasks]] +name = "task1" +id = "1" +groups = ["groupA", "groupC"] +deps = ["dirA", "fileB"] +outs = ["@__STDOUT__"] +runner = "juliamodule" +[tasks.run] +script = "main.jl" +workdir = "@__DIR__" + +[[tasks]] +name = "task3" +id = "3" +groups = ["groupA", "groupB"] +deps = ["dirA", "fileB"] +outs = ["@__STDOUT__"] +runner = "juliamodule" +[tasks.run] +script = "main.jl" +workdir = "@__DIR__" + +[[tasks]] +name = "task2" +id = "2" +groups = ["groupA", "groupB"] +deps = ["dirA", "fileB"] +outs = ["@__STDOUT__"] +runner = "juliamodule" +[tasks.run] +script = "main.jl" +workdir = "@__DIR__" diff --git a/test/dialects/examples/manifest/good/unique_groups.toml b/test/dialects/examples/manifest/good/unique_groups.toml new file mode 100644 index 0000000..b168673 --- /dev/null +++ b/test/dialects/examples/manifest/good/unique_groups.toml @@ -0,0 +1,12 @@ +version = "0.1" +dialect = "manifest" + +order = ["1"] + +[[tasks]] +name = "task1" +id = "1" +groups = ["A", "A", "B", "A"] +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/foreign.jl b/test/dialects/foreign.jl new file mode 100644 index 0000000..cb9b070 --- /dev/null +++ b/test/dialects/foreign.jl @@ -0,0 +1,33 @@ +module ForeignTypeTest + +using Test +using Workflows.Dialects: task_id, task_name, task_deps, task_outs, task_groups +using Workflows.Dialects: runner_type, runner_info +using Workflows.Dialects: load_config, save_config +using Workflows.Dialects: AbstractTask + +struct ForeignTask <: AbstractTask end + +@testset "foreign" begin + +@testset "foreign task" begin + t = ForeignTask() + @test_throws ErrorException task_id(t) + @test_throws ErrorException task_name(t) + @test_throws ErrorException task_deps(t) + @test_throws ErrorException task_outs(t) + @test_throws ErrorException task_groups(t) + @test_throws ErrorException runner_type(t) + @test_throws ErrorException runner_info(t) +end + + +@testset "foreign dialect" begin + config_filename = joinpath(@__DIR__, "examples", "foreign", "foreign.toml") + err = ErrorException("unsupported workflow dialect \"foreign\".") + @test_throws err load_config(config_filename) +end + +end #testset + +end #module diff --git a/test/dialects/manifest.jl b/test/dialects/manifest.jl new file mode 100644 index 0000000..ad092e4 --- /dev/null +++ b/test/dialects/manifest.jl @@ -0,0 +1,93 @@ +module ManifestDialectTest + +using Configurations +using Workflows +using Workflows.Dialects +using Workflows.Dialects: ManifestWorkflow +using Workflows.Dialects: PipelineOrder +using Workflows.Dialects: task_id, task_name, task_groups, task_deps, task_outs +using Workflows.Dialects: runner_type, runner_info +using Test +using TOML + +function load_back(w) + io = IOBuffer() + to_toml(io, w) + config = TOML.parse(String(take!(io))) + from_dict(ManifestWorkflow, config) +end + +@testset "manifest" begin + @testset "properties" begin + casedir = joinpath(@__DIR__, "examples", "manifest", "good") + + filename = joinpath(casedir, "standard.toml") + w = from_toml(ManifestWorkflow, filename) + w2 = load_back(w) + @test w == w2 + end + + @testset "positive cases" begin + casedir = joinpath(@__DIR__, "examples", "manifest", "good") + + # test default options + filename = joinpath(casedir, "optional.toml") + w = from_toml(ManifestWorkflow, filename) + t = w.tasks["1"] + @test task_name(t) == "task1" + @test task_id(t) == "1" + @test task_groups(t) == String[] # optional + @test task_deps(t) == String[] # optional + @test task_outs(t) == String[] # optional + @test runner_type(t) == "juliamodule" + @test runner_info(t)["script"] == "main.jl" + + # ensure default options will be correctly overridden by user configurations + filename = joinpath(casedir, "optional_custom.toml") + w = from_toml(ManifestWorkflow, filename) + t = w.tasks["1"] + @test runner_info(t)["workdir"] == "somewhere" + @test task_groups(t) == String["groupA", "groupB"] + @test task_deps(t) == String["dirA", "fileB"] + @test task_outs(t) == String["@__STDOUT__"] + + # order can be Vector{String} + filename = joinpath(casedir, "flat_order.toml") + w = from_toml(ManifestWorkflow, filename) + @test w.order isa PipelineOrder + @test w.order.stages == [["1"], ["2"]] + + # empty workflow is allowed + filename = joinpath(casedir, "empty.toml") + w = from_toml(ManifestWorkflow, filename) + @test length(w.tasks) == 0 + @test w.order isa PipelineOrder + @test length(w.order.stages) == 0 + end + + @testset "negative cases" begin + casedir = joinpath(@__DIR__, "examples", "manifest", "bad") + + # task ID should be unique across the workflow + filename = joinpath(casedir, "duplicate_taskid.toml") + err = ArgumentError("duplicate tasks detected: [\"1\"].") + @test_throws err from_toml(ManifestWorkflow, filename) + + # DAG required: one task can't be executed multiple times + filename = joinpath(casedir, "duplicate_order.toml") + err = ArgumentError("duplicate tasks detected in \"order\": [\"1\"].") + @test_throws err from_toml(ManifestWorkflow, filename) + + # each task defined in the workflow should have a well-defined execution order + filename = joinpath(casedir, "missing_order.toml") + err = ArgumentError("some tasks are not defined in \"order\": [\"2\"].") + @test_throws err from_toml(ManifestWorkflow, filename) + + # each item listed in the order should have a corresponding task + filename = joinpath(casedir, "missing_task.toml") + err = ArgumentError("\"order\" contains some undefined tasks: [\"3\"].") + @test_throws err from_toml(ManifestWorkflow, filename) + end +end + +end diff --git a/test/examples/manifest/.gitignore b/test/examples/manifest/.gitignore new file mode 100644 index 0000000..4323e36 --- /dev/null +++ b/test/examples/manifest/.gitignore @@ -0,0 +1 @@ +/test_outs/ diff --git a/test/examples/manifest/Project.toml b/test/examples/manifest/Project.toml new file mode 100644 index 0000000..cd53ab1 --- /dev/null +++ b/test/examples/manifest/Project.toml @@ -0,0 +1,3 @@ +[deps] +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" diff --git a/test/examples/manifest/print.jl b/test/examples/manifest/print.jl new file mode 100644 index 0000000..d0c0ef7 --- /dev/null +++ b/test/examples/manifest/print.jl @@ -0,0 +1 @@ +print(read(joinpath("test_outs", "results.json"), String)) diff --git a/test/examples/manifest/scripts/exp.jl b/test/examples/manifest/scripts/exp.jl new file mode 100644 index 0000000..fb9c4ed --- /dev/null +++ b/test/examples/manifest/scripts/exp.jl @@ -0,0 +1,13 @@ +using JSON3 + +rst = Dict() +rst["exp"] = @elapsed sum(exp, 1:10) + +mkpath("test_outs") +open(joinpath("test_outs", "exp.json"), "w") do io + JSON3.write(io, rst) +end + +# this message will be discarded for shell runner with `enable_stdout=false` +println("You should not see this message!") +@warn "You should not see this message!" diff --git a/test/examples/manifest/scripts/large_sum.jl b/test/examples/manifest/scripts/large_sum.jl new file mode 100644 index 0000000..7bf650b --- /dev/null +++ b/test/examples/manifest/scripts/large_sum.jl @@ -0,0 +1,10 @@ +using JSON3 + +rst = Dict{Int,Float64}() +for sz in [1000, 2000, 3000] + x = 1:sz + sum(x) + rst[sz] = @elapsed sum(x) +end + +print(JSON3.write(rst)) diff --git a/test/examples/manifest/scripts/small_sum.jl b/test/examples/manifest/scripts/small_sum.jl new file mode 100644 index 0000000..a575329 --- /dev/null +++ b/test/examples/manifest/scripts/small_sum.jl @@ -0,0 +1,10 @@ +using JSON3 + +rst = Dict{Int,Float64}() +for sz in [10, 100, 1000] + x = 1:sz + sum(x) + rst[sz] = @elapsed sum(x) +end + +JSON3.write(rst) diff --git a/test/examples/manifest/simple.toml b/test/examples/manifest/simple.toml new file mode 100644 index 0000000..5e1dffc --- /dev/null +++ b/test/examples/manifest/simple.toml @@ -0,0 +1,62 @@ +version = "0.1" +dialect = "manifest" + +order = [["0"], ["1", "2", "3"], ["4"], ["5"]] + +[[tasks]] +name = "init" +id = "0" +runner = "shell" +[tasks.run] +command = "julia --startup=no --project=. -e 'using Pkg; Pkg.instantiate()'" +enable_stderr = false +enable_stdout = false + +[[tasks]] +name = "small sum" +id = "1" +groups = ["Type:Base", "Type:Benchmark"] +deps = ["scripts"] +runner = "juliamodule" +[tasks.run] +script = "scripts/small_sum.jl" + +[[tasks]] +name = "norm" +id = "2" +groups = ["Package:LinearAlgebra", "Type:Benchmark"] +deps = ["scripts"] +runner = "shell" +[tasks.run] +command = "julia --startup=no scripts/large_sum.jl" +capture = true + +[[tasks]] +name = "exp" +id = "3" +groups = ["Package:Base", "Type:Benchmark"] +deps = ["scripts"] +outs = ["test_outs/exp.json"] +runner = "shell" +[tasks.run] +command = "julia --startup=no scripts/exp.jl" +enable_stderr = false +enable_stdout = false + +[[tasks]] +name = "summary" +id = "4" +groups = ["Type:LinearAlgebra", "Type:Benchmark"] +deps = ["summary.jl", "@__STDOUT__1", "@__STDOUT__2", "test_outs/exp.json"] +outs = ["test_outs/results.json"] +runner = "shell" +[tasks.run] +command = "julia --startup=no summary.jl" + +[[tasks]] +name = "print result" +id = "5" +deps = ["test_outs/results.json"] +runner = "shell" +[tasks.run] +command = "julia --startup=no --project=. print.jl" diff --git a/test/examples/manifest/summary.jl b/test/examples/manifest/summary.jl new file mode 100644 index 0000000..5b87395 --- /dev/null +++ b/test/examples/manifest/summary.jl @@ -0,0 +1,21 @@ +using JSON3 + +# This environment variable is set before the task execution process is spawned +src_file = ENV["WORKFLOW_TMP_INFILE"] + +@assert isfile(src_file) + +data = open(src_file, "r") do io + JSON3.read(io) +end |> Dict + +extra = open(joinpath("test_outs", "exp.json")) do io + JSON3.read(io) +end |> Dict + +merge!(data, extra) + +mkpath("test_outs") +open(joinpath("test_outs", "results.json"), "w") do io + JSON3.write(io, data) +end diff --git a/test/runners/juliamodule.jl b/test/runners/juliamodule.jl new file mode 100644 index 0000000..871927e --- /dev/null +++ b/test/runners/juliamodule.jl @@ -0,0 +1,146 @@ +@testset "Runner: juliamodule" begin + @testset "workdir" begin + # The workdir setup is a little bit tricky here as we usually triggers the test + # in project root dir with `pkg> test`. To successfully run the tests interactively, + # we need to set pwd to the "test/runners" dir. + scripts_dir = joinpath(@__DIR__, "scripts") + workdir_prefix = basename(@__DIR__) + + taskinfo = Dict{String,Any}( + "name" => "task1", + "id" => "1", + "runner" => "juliamodule", + ) + + # default workdir with relpath: `"@__DIR__"` will be replaced by `workdir="."` + taskinfo["run"] = Dict( + "script" => "scripts/sum.jl" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="scripts/sum.jl", workdir="@__DIR__") + dirs, rst = with_sandbox(includes=[scripts_dir]) do + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == 55 + + # default workdir with relpath + taskinfo["run"] = Dict( + "script" => "sum.jl" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir="@__DIR__") + dirs, rst = with_sandbox(includes=[scripts_dir]) do + # `"@__DIR__"` will be replaced by `runners/scripts` + readdir(workdir_prefix), execute_task(t; workdir="$workdir_prefix/scripts") + end + @test dirs == ["scripts"] + @test rst == 55 + + # custom workdir with relpath + taskinfo["run"] = Dict( + "script" => "sum.jl", + "workdir" => "$workdir_prefix/scripts" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir="$workdir_prefix/scripts") + dirs, rst = with_sandbox(includes=[scripts_dir]) do + # `workdir` doesn't affect here at all + workdir = joinpath(tempdir(), "somewhere") + readdir(workdir_prefix), execute_task(t; workdir=workdir) + end + @test dirs == ["scripts"] + @test rst == 55 + + # custom workdir with relpath and @__DIR__ + taskinfo["run"] = Dict( + "script" => "sum.jl", + "workdir" => "@__DIR__/scripts" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir="@__DIR__/scripts") + dirs, rst = with_sandbox(includes=[scripts_dir]) do + # "@__DIR__/scripts" will be replaced by relative path "runners/scripts" + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == 55 + + # check if we can successfully suppress stdout/stderr + taskinfo["run"] = Dict( + "script" => "scripts/verbose_sum.jl", + "enable_stdout" => false, + "enable_stderr" => false + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + rst = with_sandbox(includes=[scripts_dir]) do + execute_task(t; workdir=workdir_prefix) + end + @test rst == 55 + + # by default, it's strict mode + taskinfo["run"] = Dict( + "script" => "scripts/error_sum.jl", + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + with_sandbox(includes=[scripts_dir]) do + try + execute_task(t; workdir=workdir_prefix) + catch err + @test err isa ErrorException + @test err.msg == "Hi 你好" + end + end + + # but we can suppress exception with warnings + taskinfo["run"] = Dict( + "script" => "scripts/error_sum.jl", + "strict" => false + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + with_sandbox(includes=[scripts_dir]) do + msg = @capture_err execute_task(t; workdir=workdir_prefix) + @test occursin("Warning: failed to execute task 1 in juliamodule runner", msg) + @test occursin("err = Hi 你好", msg) + end + + # custom workdir with abspath and @__DIR__ + taskinfo["run"] = Dict( + "script" => "sum.jl", + "workdir" => joinpath(pwd(), "@__DIR__/scripts") + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir=joinpath(pwd(), "@__DIR__/scripts")) + dirs, rst = with_sandbox() do + # This might be a little bit unintuitive + # "$(pwd())/@__DIR__/scripts" will be replaced by absolute path "$(pwd())/runners/scripts" + readdir(), execute_task(t; workdir=workdir_prefix) + end + @test dirs == [] + @test rst == 55 + + # custom workdir with abspath + taskinfo["run"] = Dict( + "script" => "sum.jl", + "workdir" => scripts_dir + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir=scripts_dir) + dirs, rst = with_sandbox() do + # `workdir` doesn't affect here at all + workdir = joinpath(tempdir(), "somewhere") + readdir(), execute_task(t; workdir=workdir) + end + @test dirs == [] + @test rst == 55 + end +end diff --git a/test/runners/scripts/envcheck.jl b/test/runners/scripts/envcheck.jl new file mode 100644 index 0000000..7af91f0 --- /dev/null +++ b/test/runners/scripts/envcheck.jl @@ -0,0 +1 @@ +println(ENV["HOWLONG"]) diff --git a/test/runners/scripts/error_sum.jl b/test/runners/scripts/error_sum.jl new file mode 100644 index 0000000..dea6914 --- /dev/null +++ b/test/runners/scripts/error_sum.jl @@ -0,0 +1,2 @@ +error("Hi 你好") +sum(1:10) # 55 diff --git a/test/runners/scripts/log_println_sum.jl b/test/runners/scripts/log_println_sum.jl new file mode 100644 index 0000000..442c1ce --- /dev/null +++ b/test/runners/scripts/log_println_sum.jl @@ -0,0 +1,4 @@ +msg = "You are not expected to see this message during test." +println(msg) +@warn msg +println(sum(1:10)) # 55 diff --git a/test/runners/scripts/println_sum.jl b/test/runners/scripts/println_sum.jl new file mode 100644 index 0000000..bca11e7 --- /dev/null +++ b/test/runners/scripts/println_sum.jl @@ -0,0 +1 @@ +println(sum(1:10)) # 55 diff --git a/test/runners/scripts/sum.jl b/test/runners/scripts/sum.jl new file mode 100644 index 0000000..6456c11 --- /dev/null +++ b/test/runners/scripts/sum.jl @@ -0,0 +1 @@ +sum(1:10) # 55 diff --git a/test/runners/scripts/verbose_sum.jl b/test/runners/scripts/verbose_sum.jl new file mode 100644 index 0000000..bdc3798 --- /dev/null +++ b/test/runners/scripts/verbose_sum.jl @@ -0,0 +1,3 @@ +println("You should not see this message!") +@info "You should not see this message!" +sum(1:10) # 55 diff --git a/test/runners/shell.jl b/test/runners/shell.jl new file mode 100644 index 0000000..e0d2ad6 --- /dev/null +++ b/test/runners/shell.jl @@ -0,0 +1,206 @@ +@testset "capture_run" begin + # single line + rst = capture_run(`julia --startup=no -e 'println("hello world")'`) + @test rst == "hello world" + + # last evaluation result is not the return value + rst = capture_run(`julia --startup=no -e 'println("hello world"); 1+1'`) + @test rst == "hello world" + + # stderr is also not the return value + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello world"); @info "hi"'`; stderr=io) + @test String(take!(io)) == "[ Info: hi\n" + @test rst == "hello world" + + # multiple lines + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello world"); print(1+1)'`, stdout=io) + @test String(take!(io)) == "hello world\n" + @test rst == "2" + + # last \n is discarded + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello world"); println(1+1)'`, stdout=io) + @test String(take!(io)) == "hello world\n" + @test rst == "2" + + # if errors, throw exception + io = IOBuffer() + try + capture_run(`julia --startup=no -e 'error("some error")'`; stderr=io) + catch err + @test err == ShellExecutionError("non-zero exit code: 1") + finally + @test occursin("ERROR: some error", String(take!(io))) + end + try + capture_run(`not_a_command`) + catch err + @test err isa ShellExecutionError && occursin("could not spawn `not_a_command`", err.msg) + end + + # multiple trailing '\n's are ignored + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello \n\n\nworld\n\n\n")'`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + @test rst == "world" + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello \n\n\nworld")'`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + @test rst == "world" + if !Sys.iswindows() + io = IOBuffer() + @test "world" == capture_run(`printf "hello \n\n\nworld"`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + io = IOBuffer() + @test "world" == capture_run(`printf "hello \n\n\nworld\n"`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + io = IOBuffer() + @test "world" == capture_run(`printf "hello \n\n\nworld\n\n\n"`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + end + + # Test if a looooooooooong line is kept as expected + function test_large_data(n = 1000) + script = """ + using TOML + data = 1:$n + TOML.print(Dict("data"=>data)) + """ + rst = with_sandbox() do + write("script.jl", script) + capture_run(`julia --startup=no script.jl`; stdout=devnull) + end + @test TOML.parse(rst)["data"] == collect(1:n) + end + for (n_repeat, sz) in [(3, 1), (3, 10), (3, 100), (2, 1000), (1, 10000), (1, 100000)] + for _ in 1:n_repeat + test_large_data(sz) + end + end +end + +@testset "Runner: shell" begin + # The workdir setup is a little bit tricky here as we usually triggers the test + # in project root dir with `pkg> test`. To successfully run the tests interactively, + # we need to set pwd to the "test/runners" dir. + scripts_dir = joinpath(@__DIR__, "scripts") + workdir_prefix = basename(@__DIR__) + + taskinfo = Dict{String,Any}( + "name" => "task1", + "id" => "1", + "runner" => "shell", + ) + + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/sum.jl", + "capture" => true, + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + dirs, rst = with_sandbox(includes=[scripts_dir]) do + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == "" + + # check if we successfully parsed the stdout as return value + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/println_sum.jl", + "capture" => true, + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + dirs, rst = with_sandbox(includes=[scripts_dir]) do + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == "55" + + # non-capture mode shell runner + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/println_sum.jl" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec.capture == false # default is non-capturing + with_sandbox(includes=[scripts_dir]) do + dirs = readdir(workdir_prefix) + rst = @suppress_out execute_task(t; workdir=workdir_prefix) + out = @capture_out execute_task(t; workdir=workdir_prefix) + @test dirs == ["scripts"] + @test rst == "" + @test out == "55\n" + end + + # check if we can successfully suppress stdout/stderr + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/log_println_sum.jl", + "enable_stdout" => false, + "enable_stderr" => false, + "capture" => true, + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + dirs, rst = with_sandbox(includes=[scripts_dir]) do + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == "55" + + # strict mode: error + taskinfo["run"] = Dict( + "command" => "julia --startup=no nothing.jl", + "strict" => true, + "enable_stderr" => false + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec.strict == true + try + execute_task(t) + catch err + @test err == ShellExecutionError("failed to execute command `julia --startup=no nothing.jl`: non-zero exit code: 1") + end + + # non-strict mode: warning instead of error + taskinfo["run"] = Dict( + "command" => "julia --startup=no nothing.jl", + "strict" => false + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec.strict == false + msg = @capture_err execute_task(t) + @test occursin("SystemError: opening file", msg) || occursin("could not open file", msg) + @test occursin("failed to execute command `julia --startup=no nothing.jl`", msg) + + # env passing + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/envcheck.jl", + "capture" => true, + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + dirs, rst = with_sandbox(includes=[scripts_dir]) do + # this would otherwise fail if `env=nothing` + env=copy(ENV); env["HOWLONG"] = "forever!" + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix, env=env) + end + @test dirs == ["scripts"] + @test rst == "forever!" + + # check command split + # cmd = "echo 'hello world'" + # `echo 'hello world'` is different from `$(cmd)` + taskinfo["run"] = Dict( + "command" => "julia --startup=no -e 'println(\"hello world\")'", + "capture" => true + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + rst = execute_task(t) + @test rst == "hello world" +end diff --git a/test/runtests.jl b/test/runtests.jl index 1cb57b2..79d7ec3 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,6 +1,30 @@ +using Configurations +using JSON3 using Workflows +using Workflows.Dialects: SimpleTask +using Workflows.Runners: execute_task, build_runner, capture_run +using Workflows.Runners: ShellExecutionError +using Workflows.Runners: JuliaModuleRunner, ShellRunner +using Suppressor using Test +using TOML + +include("testutils.jl") @testset "Workflows.jl" begin - # Write your tests here. + @info "run dialects test" + @testset "dialects" begin + include("dialects/foreign.jl") + include("dialects/manifest.jl") + include("dialects/config_io.jl") + end + + @info "run runners test" + @testset "runners" begin + include("runners/juliamodule.jl") + include("runners/shell.jl") + end + + @info "run schduler test" + include("scheduler.jl") end diff --git a/test/scheduler.jl b/test/scheduler.jl new file mode 100644 index 0000000..95a2987 --- /dev/null +++ b/test/scheduler.jl @@ -0,0 +1,26 @@ +@testset "scheduler" begin + examples_dir = joinpath(@__DIR__, "examples") + + @testset "manifest" begin + # cleanup before running test so that we don't get affected by previous runs + rm(joinpath(examples_dir, "manifest", ".workflows"), recursive=true, force=true) + rm(joinpath(examples_dir, "manifest", "test_outs"), recursive=true, force=true) + + with_sandbox(includes=["examples"]) do + workdir = joinpath("examples", "manifest") + + filename = joinpath(workdir, "simple.toml") + msg = @capture_out @suppress_err Workflows.execute(filename) + # msg = @capture_out Workflows.execute(filename) # debug + results = @test_nowarn JSON3.read(msg) + + @test sort!(String.(keys(results))) == ["1", "2", "exp"] + @test sort!(String.(keys(JSON3.read(results["1"])))) == ["10", "100", "1000"] + @test sort!(String.(keys(JSON3.read(results["2"])))) == ["1000", "2000", "3000"] + @test results[:exp] isa Float64 + + @test issubset([".workflows", "test_outs", "Manifest.toml"], readdir(workdir)) + @test strip(read(joinpath(workdir, "test_outs", "results.json"), String)) == strip(msg) + end + end +end diff --git a/test/testutils.jl b/test/testutils.jl new file mode 100644 index 0000000..65261c6 --- /dev/null +++ b/test/testutils.jl @@ -0,0 +1,38 @@ +const _sandbox_root = mktempdir() + +""" + with_sandbox(f; includes=[""]) + +run function `f()` in a simple sandbox directory, with `includes` copied there. + +# Examples + +```julia +julia> with_sandbox() do + pwd() +end +"/private/var/folders/34/km3mmt5930gc4pzq1d08jvjw0000gn/T/jl_rNgOVo/jl_BSkXtM" + +julia> with_sandbox(includes=["scripts", "not_exists"]) do + readdir() +end +┌ Warning: source path not existed +│ path = "not_exists" +└ @ Main REPL[35]:9 +1-element Vector{String}: +"scripts" +``` +""" +function with_sandbox(f; includes::Vector{String}=String[], follow_symlinks=false) + sandbox_root = mktempdir(_sandbox_root) + for src_path in includes + dst_path = joinpath(sandbox_root, relpath(abspath(src_path))) + if ispath(src_path) + mkpath(dirname(dst_path)) + cp(src_path, dst_path; follow_symlinks=follow_symlinks) + else + @warn "source path not existed" path=src_path + end + end + cd(f, sandbox_root) +end