From f58fb0a2a35d2f602302a0cbc94c3dec049b079d Mon Sep 17 00:00:00 2001 From: Johnny Chen Date: Thu, 10 Feb 2022 13:01:14 +0800 Subject: [PATCH 1/7] ignore vscode settings and python caches --- .gitignore | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/.gitignore b/.gitignore index 1c02e5e..43324e6 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,10 @@ +.vscode/ + *.jl.*.cov *.jl.cov *.jl.mem Manifest.toml /docs/build/ + +__pycache__/ +.mypy_cache/ From f4fecb864dc463fa04f925389d1a4d7e13d0e398 Mon Sep 17 00:00:00 2001 From: Johnny Chen Date: Thu, 10 Feb 2022 07:38:53 +0800 Subject: [PATCH 2/7] initial manifest dialect implementation This commit mainly focus on defining the basic structure of the "manifest" dialect, which is the simplest, and the most verbose configuration file.The main purpose of this "manifest" dialect specification is to provide an interchangable format that we can build more human-friendly dialects on top of it, e.g., "benchmark", "pkgtest". This commit includes the read and load of the TOML configuration file. The execution of the workflow is not included here. --- Project.toml | 10 +- dialects.toml | 2 + src/Workflows.jl | 5 +- src/dialects/Dialects.jl | 24 +++++ src/dialects/config_io.jl | 54 +++++++++++ src/dialects/manifest.jl | 71 ++++++++++++++ src/dialects/orders.jl | 37 ++++++++ src/dialects/traits.jl | 55 +++++++++++ src/dialects/utils.jl | 19 ++++ test/dialects/config_io.jl | 46 +++++++++ test/dialects/examples/foreign/foreign.toml | 2 + .../manifest/bad/duplicate_order.toml | 18 ++++ .../manifest/bad/duplicate_taskid.toml | 18 ++++ .../examples/manifest/bad/missing_order.toml | 19 ++++ .../examples/manifest/bad/missing_task.toml | 19 ++++ .../examples/manifest/bad/new_version.toml | 11 +++ .../examples/manifest/good/empty.toml | 4 + .../examples/manifest/good/flat_order.toml | 18 ++++ .../examples/manifest/good/new_version.toml | 8 ++ .../examples/manifest/good/optional.toml | 11 +++ .../manifest/good/optional_custom.toml | 15 +++ .../examples/manifest/good/standard.toml | 36 +++++++ .../examples/manifest/good/unique_groups.toml | 12 +++ test/dialects/foreign.jl | 33 +++++++ test/dialects/manifest.jl | 93 +++++++++++++++++++ test/runtests.jl | 7 +- 26 files changed, 644 insertions(+), 3 deletions(-) create mode 100644 dialects.toml create mode 100644 src/dialects/Dialects.jl create mode 100644 src/dialects/config_io.jl create mode 100644 src/dialects/manifest.jl create mode 100644 src/dialects/orders.jl create mode 100644 src/dialects/traits.jl create mode 100644 src/dialects/utils.jl create mode 100644 test/dialects/config_io.jl create mode 100644 test/dialects/examples/foreign/foreign.toml create mode 100644 test/dialects/examples/manifest/bad/duplicate_order.toml create mode 100644 test/dialects/examples/manifest/bad/duplicate_taskid.toml create mode 100644 test/dialects/examples/manifest/bad/missing_order.toml create mode 100644 test/dialects/examples/manifest/bad/missing_task.toml create mode 100644 test/dialects/examples/manifest/bad/new_version.toml create mode 100644 test/dialects/examples/manifest/good/empty.toml create mode 100644 test/dialects/examples/manifest/good/flat_order.toml create mode 100644 test/dialects/examples/manifest/good/new_version.toml create mode 100644 test/dialects/examples/manifest/good/optional.toml create mode 100644 test/dialects/examples/manifest/good/optional_custom.toml create mode 100644 test/dialects/examples/manifest/good/standard.toml create mode 100644 test/dialects/examples/manifest/good/unique_groups.toml create mode 100644 test/dialects/foreign.jl create mode 100644 test/dialects/manifest.jl diff --git a/Project.toml b/Project.toml index f48f84e..d7452ca 100644 --- a/Project.toml +++ b/Project.toml @@ -3,11 +3,19 @@ uuid = "115008b9-7a42-4cba-af26-8bebb992e909" authors = ["Johnny Chen "] version = "0.1.0" +[deps] +Configurations = "5218b696-f38b-4ac9-8b61-a12ec717816d" +Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" +TOML = "fa267f1f-6049-4f14-aa54-33bafae1ed76" + [compat] +Configurations = "0.17" +TOML = "1" julia = "1" [extras] +Suppressor = "fd094767-a336-5f1f-9728-57cf17d0bbfb" Test = "8dfed614-e22c-5e08-85e1-65c5234f0b40" [targets] -test = ["Test"] +test = ["Suppressor", "Test"] diff --git a/dialects.toml b/dialects.toml new file mode 100644 index 0000000..7dc2576 --- /dev/null +++ b/dialects.toml @@ -0,0 +1,2 @@ +[manifest] +version = "0.1.0" diff --git a/src/Workflows.jl b/src/Workflows.jl index 0f00140..2bb6da7 100644 --- a/src/Workflows.jl +++ b/src/Workflows.jl @@ -1,5 +1,8 @@ module Workflows -# Write your package code here. +using Configurations + +include("dialects/Dialects.jl") +using .Dialects: load_config, save_config end diff --git a/src/dialects/Dialects.jl b/src/dialects/Dialects.jl new file mode 100644 index 0000000..3205d60 --- /dev/null +++ b/src/dialects/Dialects.jl @@ -0,0 +1,24 @@ +module Dialects + +using Configurations +using TOML +using Printf + +import Configurations: from_dict, to_dict + +const spec_versions = begin + config = TOML.parsefile(joinpath(@__DIR__, "..", "..", "dialects.toml")) + Dict(k=>VersionNumber(v["version"]) for (k, v) in config) +end + +abstract type AbstractTask end +abstract type AbstractWorkflow end +abstract type AbstractExecutionOrder end + +include("traits.jl") +include("orders.jl") +include("manifest.jl") +include("utils.jl") +include("config_io.jl") + +end #module diff --git a/src/dialects/config_io.jl b/src/dialects/config_io.jl new file mode 100644 index 0000000..0db9dcb --- /dev/null +++ b/src/dialects/config_io.jl @@ -0,0 +1,54 @@ +""" + load_config(filename::AbstractString) + +Load workflow configuration from `filename`. +""" +function load_config(filename::AbstractString) + name, ext = splitext(basename(filename)) + config = if ext == ".toml" + # https://toml.io/en/v1.0.0#filename-extension + # TOML files should use the extension `.toml` + TOML.parsefile(filename) + else + throw(ArgumentError("unsupported file extension: \"$ext\".")) + end + dialect = config["dialect"] # required + ver = config["version"] # required + check_version(dialect, VersionNumber(ver)) + + # runtime dispatch to custom dialect implementation + return load_config(Val(Symbol(dialect)), config) +end +load_config(::Val{d}, config) where d = error("unsupported workflow dialect \"$d\".") + +""" + save_config(filename::AbstractString, workflow) + +Save workflow configuration into `filename`. +""" +function save_config(filename::AbstractString, workflow::AbstractWorkflow) + name, ext = splitext(basename(filename)) + if ext == ".toml" + config = to_dict(workflow, TOMLStyle) + config["version"] = string(spec_versions[workflow.dialect]) + open(filename, "w") do io + TOML.print(convert_to_builtin, io, config) + end + else + throw(ArgumentError("unsupported file extension: \"$ext\".")) + end + return +end + +# some custom types need to be converted to built in types before serialization +convert_to_builtin(p::PipelineOrder) = p.stages +convert_to_builtin(v::VersionNumber) = string(v) +convert_to_builtin(v) = v + +# TOML support +function Configurations.to_toml(io::IO, x::AbstractWorkflow; kwargs...) + to_toml(convert_to_builtin, io, x; kwargs...) +end +function Configurations.to_toml(filename::String, x::AbstractWorkflow; kwargs...) + to_toml(convert_to_builtin, filename, x; kwargs...) +end diff --git a/src/dialects/manifest.jl b/src/dialects/manifest.jl new file mode 100644 index 0000000..7097d72 --- /dev/null +++ b/src/dialects/manifest.jl @@ -0,0 +1,71 @@ +@option struct SimpleTask <: AbstractTask + name::String + id::String + groups::Vector{String} = String[] + deps::Vector{String} = String[] + outs::Vector{String} = String[] + runner::String # TODO(johnnychen94): verify runner type + run::Dict{String, Any} = Dict{String, Any}() +end +task_id(t::SimpleTask) = t.id +task_name(t::SimpleTask) = t.name +task_groups(t::SimpleTask) = t.groups +task_deps(t::SimpleTask) = t.deps +task_outs(t::SimpleTask) = t.outs +runner_type(t::SimpleTask) = t.runner +runner_info(t::SimpleTask) = t.run + +@option struct ManifestWorkflow <: AbstractWorkflow + version::VersionNumber + dialect::String = "manifest" + order::AbstractExecutionOrder = PipelineOrder(String[]) + tasks::Dict{String,AbstractTask} = Dict{String,AbstractTask}() + function ManifestWorkflow(version::VersionNumber, dialect::String, order, tasks) + @assert dialect == "manifest" + + order_taskids = Set(union(String[], order.stages...)) + taskids_set = Set(keys(tasks)) + if order_taskids != taskids_set + msg = if length(order_taskids) > length(taskids_set) + d = collect(setdiff(order_taskids, taskids_set)) + @sprintf "\"order\" contains some undefined tasks: %s." d + else + d = collect(setdiff(taskids_set, order_taskids)) + @sprintf "some tasks are not defined in \"order\": %s." d + end + throw(ArgumentError(msg)) + end + + new(version, dialect, order, tasks) + end +end +load_config(::Val{:manifest}, config::AbstractDict) = from_dict(ManifestWorkflow, config) + +function from_dict(::Type{ManifestWorkflow}, ::OptionField{:order}, ::Type{AbstractExecutionOrder}, order) + if order isa AbstractVector + return PipelineOrder(order) + else + throw(ArgumentError("Unsupported order type: $(typeof(order)).")) + end +end +from_dict(::Type{ManifestWorkflow}, ::Type{VersionNumber}, ver::AbstractString) = VersionNumber(ver) + +# To more efficiently get a task of specific task id, we convert the list to dictionary +# before construction. Because this is different from how we represent it in the +# configuration file, which is a list, we hereby patch it with `from_dict`/`to_dict` +# methods. +function from_dict(::Type{ManifestWorkflow}, ::OptionField{:tasks}, ::Type{Dict{String,AbstractTask}}, tasks) + taskids = map(t->t["id"], tasks) + taskids_set = Set(taskids) + if length(taskids) != length(taskids_set) + ids = filter(taskids_set) do id + count(isequal(id), taskids) > 1 + end + throw(ArgumentError("duplicate tasks detected: $(collect(ids)).")) + end + return Dict(t["id"]=>from_dict(SimpleTask, t) for t in tasks) +end + +function to_dict(::Type{ManifestWorkflow}, tasks::Dict{String,AbstractTask}, option::Configurations.ToDictOption) + map(to_dict, values(tasks)) +end diff --git a/src/dialects/orders.jl b/src/dialects/orders.jl new file mode 100644 index 0000000..85564c5 --- /dev/null +++ b/src/dialects/orders.jl @@ -0,0 +1,37 @@ +""" + PipelineOrder(stages::Vector{Vector{String}}) + +Creates a pipeline execution order object. + +Stages `[["1", "2"], ["3", "4", "5"], ["6"]]` would create the following pipeline: + +```text +| Stage 1 | Stage 2 | Stage 3 | +| ------- | -------- | ------ | +| Task 1 | Task 3 | | +| Task 2 | Task 4 | Task 6 | +| | Task 5 | | +``` + +The task scheduler would first execute all tasks `["1", "2"]` in stage 1, and then all tasks +`["3", "4", "5"]` in stage 2, and finally all tasks `["6"]` in stage 3. The execution order +for tasks in the same stage is undefined. +""" +struct PipelineOrder <: AbstractExecutionOrder + stages::Vector{Vector{String}} + function PipelineOrder(stages::Vector{<:Vector}) + flatten_tasks = union(String[], stages...) + tasks = vcat(stages...) + if length(tasks) != length(flatten_tasks) + ids = filter(flatten_tasks) do id + count(isequal(id), tasks) > 1 + end + throw(ArgumentError("duplicate tasks detected in \"order\": $ids.")) + end + return new(stages) + end +end +PipelineOrder(stages::Vector{String}) = PipelineOrder([[x] for x in stages]) +Base.:(==)(p1::PipelineOrder, p2::PipelineOrder) = p1.stages == p2.stages + +# TODO(johnnychen94): add DAG support diff --git a/src/dialects/traits.jl b/src/dialects/traits.jl new file mode 100644 index 0000000..25a431d --- /dev/null +++ b/src/dialects/traits.jl @@ -0,0 +1,55 @@ +""" + task_id(t::AbstractTask)::String + +Return the unique ID of task `t`. + +We assume that `task_id(x) == task_id(y)` if and only if the execution of task `x` is +"equivalent" to that of task `y`. +""" +task_id(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + task_name(t::AbstractTask)::String + +Return the task name of task `t`. + +Unlike [`taskid`](@ref), task names does not need to be unique. +""" +task_name(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + task_groups(t::AbstractTask)::Set + +Return the groups that task `t` belongs to. +""" +task_groups(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + task_deps(t::AbstractTask)::Set + +Return the assumed dependencies that task `t` requires so as to be executed successfully. +""" +task_deps(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + task_outs(t::AbstractTask)::Set + +Return the assumed outputs that task execution of `t` will create, if executed +successfully. +""" +task_outs(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + + +""" + runner_type(t::AbstractTask)::String + +Return the runner type of task `t`. +""" +runner_type(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") + +""" + runner_info(t::AbstractTask)::Dict{String, Any} + +Return the extra information for task `t` runner. +""" +runner_info(::T) where T<:AbstractTask = error("Not implemented for task type $(T).") diff --git a/src/dialects/utils.jl b/src/dialects/utils.jl new file mode 100644 index 0000000..58c27fb --- /dev/null +++ b/src/dialects/utils.jl @@ -0,0 +1,19 @@ +function check_version(dialect::String, ver::VersionNumber) + is_compat = check_version(Bool, dialect, ver) + if !is_compat + spec_ver = spec_versions[dialect] + @warn "workflow file version $(ver) might not be compatible with the toolchain version $(spec_ver) for dialect $(dialect)." + end + return +end +function check_version(::Type{Bool}, dialect::String, ver::VersionNumber) + if !haskey(spec_versions, dialect) + # if it is unregistered dialect, disable version check for it. + @debug "version info for dialect \"$(dialect)\" not found in dialect registry \"dialects.toml\"." + return true + end + spec_ver = spec_versions[dialect] + ver.major <= spec_ver.major || return false + ver.major == 0 && ver.minor == spec_ver.minor || return false + return true +end diff --git a/test/dialects/config_io.jl b/test/dialects/config_io.jl new file mode 100644 index 0000000..3c62b17 --- /dev/null +++ b/test/dialects/config_io.jl @@ -0,0 +1,46 @@ +module ConfigIOTest + +using Workflows +using Workflows.Dialects +using Workflows: load_config, save_config +using Workflows.Dialects: ManifestWorkflow +using Suppressor +using Test + +const tmp_testdir = mktempdir() +const example_dir = joinpath(@__DIR__, "examples") + +@testset "config io" begin + # More comprehensive parsing test goes to dialects, here we only ensure that the + # `load_config` and `save_config` works as expected. + + tmpfile = joinpath(tmp_testdir, "tmp.toml") + config_filename = joinpath(example_dir, "manifest", "good", "standard.toml") + w = load_config(config_filename) + @test w isa ManifestWorkflow + save_config(tmpfile, w) + w2 = load_config(tmpfile) + @test w == w2 + + # extension sensitive + tmptxt = joinpath(tmp_testdir, "tmp.txt") + err = ArgumentError("unsupported file extension: \".txt\".") + @test_throws err save_config(tmptxt, w) + cp(tmpfile, tmptxt) + @test_throws err load_config(tmptxt) + + @testset "version compat" begin + # warn when we load a file with too new version + config_filename = joinpath(example_dir, "manifest", "good", "new_version.toml") + w = @test_nowarn @suppress_err load_config(config_filename) + msg = @capture_err load_config(config_filename) + @test occursin("workflow file version 999.999.999 might not be compatible", msg) + + # save config with current dialect version + tmpfile = joinpath(tmp_testdir, "tmp.toml") + save_config(tmpfile, w) + w = @test_nowarn load_config(tmpfile) + @test w.version == Dialects.spec_versions["manifest"] + end +end +end diff --git a/test/dialects/examples/foreign/foreign.toml b/test/dialects/examples/foreign/foreign.toml new file mode 100644 index 0000000..f3422a8 --- /dev/null +++ b/test/dialects/examples/foreign/foreign.toml @@ -0,0 +1,2 @@ +version = "0.1" +dialect = "foreign" diff --git a/test/dialects/examples/manifest/bad/duplicate_order.toml b/test/dialects/examples/manifest/bad/duplicate_order.toml new file mode 100644 index 0000000..b885332 --- /dev/null +++ b/test/dialects/examples/manifest/bad/duplicate_order.toml @@ -0,0 +1,18 @@ +version = "0.1" +dialect = "manifest" + +order = [["1"], ["1", "2"]] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + +[[tasks]] +name = "task1" +id = "2" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/bad/duplicate_taskid.toml b/test/dialects/examples/manifest/bad/duplicate_taskid.toml new file mode 100644 index 0000000..e15d382 --- /dev/null +++ b/test/dialects/examples/manifest/bad/duplicate_taskid.toml @@ -0,0 +1,18 @@ +version = "0.1" +dialect = "manifest" + +order = [["1"]] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/bad/missing_order.toml b/test/dialects/examples/manifest/bad/missing_order.toml new file mode 100644 index 0000000..c13c069 --- /dev/null +++ b/test/dialects/examples/manifest/bad/missing_order.toml @@ -0,0 +1,19 @@ +version = "0.1" +dialect = "manifest" + +order = ["1"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + + +[[tasks]] +name = "task1" +id = "2" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/bad/missing_task.toml b/test/dialects/examples/manifest/bad/missing_task.toml new file mode 100644 index 0000000..5fe3646 --- /dev/null +++ b/test/dialects/examples/manifest/bad/missing_task.toml @@ -0,0 +1,19 @@ +version = "0.1" +dialect = "manifest" + +order = ["1", "2", "3"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + + +[[tasks]] +name = "task1" +id = "2" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/bad/new_version.toml b/test/dialects/examples/manifest/bad/new_version.toml new file mode 100644 index 0000000..babb2b0 --- /dev/null +++ b/test/dialects/examples/manifest/bad/new_version.toml @@ -0,0 +1,11 @@ +version = "999.999.999" +dialect = "manifest" + +order = ["1"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/good/empty.toml b/test/dialects/examples/manifest/good/empty.toml new file mode 100644 index 0000000..525e59a --- /dev/null +++ b/test/dialects/examples/manifest/good/empty.toml @@ -0,0 +1,4 @@ +version = "0.1" +dialect = "manifest" + +order = [] diff --git a/test/dialects/examples/manifest/good/flat_order.toml b/test/dialects/examples/manifest/good/flat_order.toml new file mode 100644 index 0000000..47800a2 --- /dev/null +++ b/test/dialects/examples/manifest/good/flat_order.toml @@ -0,0 +1,18 @@ +version = "0.1" +dialect = "manifest" + +order = ["1", "2"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" + +[[tasks]] +name = "task2" +id = "2" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/good/new_version.toml b/test/dialects/examples/manifest/good/new_version.toml new file mode 100644 index 0000000..6a419f9 --- /dev/null +++ b/test/dialects/examples/manifest/good/new_version.toml @@ -0,0 +1,8 @@ +version = "999.999.999" +dialect = "manifest" +order = ["1"] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" diff --git a/test/dialects/examples/manifest/good/optional.toml b/test/dialects/examples/manifest/good/optional.toml new file mode 100644 index 0000000..1c94d32 --- /dev/null +++ b/test/dialects/examples/manifest/good/optional.toml @@ -0,0 +1,11 @@ +version = "0.1" +dialect = "manifest" + +order = [["1"]] + +[[tasks]] +name = "task1" +id = "1" +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/examples/manifest/good/optional_custom.toml b/test/dialects/examples/manifest/good/optional_custom.toml new file mode 100644 index 0000000..7a66033 --- /dev/null +++ b/test/dialects/examples/manifest/good/optional_custom.toml @@ -0,0 +1,15 @@ +version = "0.1" +dialect = "manifest" + +order = [["1"]] + +[[tasks]] +name = "task1" +id = "1" +groups = ["groupA", "groupB"] +deps = ["dirA", "fileB"] +outs = ["@__STDOUT__"] +runner = "juliamodule" +[tasks.run] +workdir = "somewhere" +script = "main.jl" diff --git a/test/dialects/examples/manifest/good/standard.toml b/test/dialects/examples/manifest/good/standard.toml new file mode 100644 index 0000000..157c7bf --- /dev/null +++ b/test/dialects/examples/manifest/good/standard.toml @@ -0,0 +1,36 @@ +version = "0.1" +dialect = "manifest" +order = [["1"], ["2", "3"]] + +[[tasks]] +name = "task1" +id = "1" +groups = ["groupA", "groupC"] +deps = ["dirA", "fileB"] +outs = ["@__STDOUT__"] +runner = "juliamodule" +[tasks.run] +script = "main.jl" +workdir = "@__DIR__" + +[[tasks]] +name = "task3" +id = "3" +groups = ["groupA", "groupB"] +deps = ["dirA", "fileB"] +outs = ["@__STDOUT__"] +runner = "juliamodule" +[tasks.run] +script = "main.jl" +workdir = "@__DIR__" + +[[tasks]] +name = "task2" +id = "2" +groups = ["groupA", "groupB"] +deps = ["dirA", "fileB"] +outs = ["@__STDOUT__"] +runner = "juliamodule" +[tasks.run] +script = "main.jl" +workdir = "@__DIR__" diff --git a/test/dialects/examples/manifest/good/unique_groups.toml b/test/dialects/examples/manifest/good/unique_groups.toml new file mode 100644 index 0000000..b168673 --- /dev/null +++ b/test/dialects/examples/manifest/good/unique_groups.toml @@ -0,0 +1,12 @@ +version = "0.1" +dialect = "manifest" + +order = ["1"] + +[[tasks]] +name = "task1" +id = "1" +groups = ["A", "A", "B", "A"] +runner = "juliamodule" +[tasks.run] +script = "main.jl" diff --git a/test/dialects/foreign.jl b/test/dialects/foreign.jl new file mode 100644 index 0000000..cb9b070 --- /dev/null +++ b/test/dialects/foreign.jl @@ -0,0 +1,33 @@ +module ForeignTypeTest + +using Test +using Workflows.Dialects: task_id, task_name, task_deps, task_outs, task_groups +using Workflows.Dialects: runner_type, runner_info +using Workflows.Dialects: load_config, save_config +using Workflows.Dialects: AbstractTask + +struct ForeignTask <: AbstractTask end + +@testset "foreign" begin + +@testset "foreign task" begin + t = ForeignTask() + @test_throws ErrorException task_id(t) + @test_throws ErrorException task_name(t) + @test_throws ErrorException task_deps(t) + @test_throws ErrorException task_outs(t) + @test_throws ErrorException task_groups(t) + @test_throws ErrorException runner_type(t) + @test_throws ErrorException runner_info(t) +end + + +@testset "foreign dialect" begin + config_filename = joinpath(@__DIR__, "examples", "foreign", "foreign.toml") + err = ErrorException("unsupported workflow dialect \"foreign\".") + @test_throws err load_config(config_filename) +end + +end #testset + +end #module diff --git a/test/dialects/manifest.jl b/test/dialects/manifest.jl new file mode 100644 index 0000000..ad092e4 --- /dev/null +++ b/test/dialects/manifest.jl @@ -0,0 +1,93 @@ +module ManifestDialectTest + +using Configurations +using Workflows +using Workflows.Dialects +using Workflows.Dialects: ManifestWorkflow +using Workflows.Dialects: PipelineOrder +using Workflows.Dialects: task_id, task_name, task_groups, task_deps, task_outs +using Workflows.Dialects: runner_type, runner_info +using Test +using TOML + +function load_back(w) + io = IOBuffer() + to_toml(io, w) + config = TOML.parse(String(take!(io))) + from_dict(ManifestWorkflow, config) +end + +@testset "manifest" begin + @testset "properties" begin + casedir = joinpath(@__DIR__, "examples", "manifest", "good") + + filename = joinpath(casedir, "standard.toml") + w = from_toml(ManifestWorkflow, filename) + w2 = load_back(w) + @test w == w2 + end + + @testset "positive cases" begin + casedir = joinpath(@__DIR__, "examples", "manifest", "good") + + # test default options + filename = joinpath(casedir, "optional.toml") + w = from_toml(ManifestWorkflow, filename) + t = w.tasks["1"] + @test task_name(t) == "task1" + @test task_id(t) == "1" + @test task_groups(t) == String[] # optional + @test task_deps(t) == String[] # optional + @test task_outs(t) == String[] # optional + @test runner_type(t) == "juliamodule" + @test runner_info(t)["script"] == "main.jl" + + # ensure default options will be correctly overridden by user configurations + filename = joinpath(casedir, "optional_custom.toml") + w = from_toml(ManifestWorkflow, filename) + t = w.tasks["1"] + @test runner_info(t)["workdir"] == "somewhere" + @test task_groups(t) == String["groupA", "groupB"] + @test task_deps(t) == String["dirA", "fileB"] + @test task_outs(t) == String["@__STDOUT__"] + + # order can be Vector{String} + filename = joinpath(casedir, "flat_order.toml") + w = from_toml(ManifestWorkflow, filename) + @test w.order isa PipelineOrder + @test w.order.stages == [["1"], ["2"]] + + # empty workflow is allowed + filename = joinpath(casedir, "empty.toml") + w = from_toml(ManifestWorkflow, filename) + @test length(w.tasks) == 0 + @test w.order isa PipelineOrder + @test length(w.order.stages) == 0 + end + + @testset "negative cases" begin + casedir = joinpath(@__DIR__, "examples", "manifest", "bad") + + # task ID should be unique across the workflow + filename = joinpath(casedir, "duplicate_taskid.toml") + err = ArgumentError("duplicate tasks detected: [\"1\"].") + @test_throws err from_toml(ManifestWorkflow, filename) + + # DAG required: one task can't be executed multiple times + filename = joinpath(casedir, "duplicate_order.toml") + err = ArgumentError("duplicate tasks detected in \"order\": [\"1\"].") + @test_throws err from_toml(ManifestWorkflow, filename) + + # each task defined in the workflow should have a well-defined execution order + filename = joinpath(casedir, "missing_order.toml") + err = ArgumentError("some tasks are not defined in \"order\": [\"2\"].") + @test_throws err from_toml(ManifestWorkflow, filename) + + # each item listed in the order should have a corresponding task + filename = joinpath(casedir, "missing_task.toml") + err = ArgumentError("\"order\" contains some undefined tasks: [\"3\"].") + @test_throws err from_toml(ManifestWorkflow, filename) + end +end + +end diff --git a/test/runtests.jl b/test/runtests.jl index 1cb57b2..febf168 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -2,5 +2,10 @@ using Workflows using Test @testset "Workflows.jl" begin - # Write your tests here. + @info "run dialects test" + @testset "dialects" begin + include("dialects/foreign.jl") + include("dialects/manifest.jl") + include("dialects/config_io.jl") + end end From a2b8313d1fc3fe3fdfb88e6d5efd6a1935be1907 Mon Sep 17 00:00:00 2001 From: Johnny Chen Date: Sat, 12 Feb 2022 13:49:43 +0800 Subject: [PATCH 3/7] initial implementation of juliamodule runner --- src/Workflows.jl | 2 + src/runners/Runners.jl | 28 ++++++ src/runners/juliamodule.jl | 34 +++++++ test/runners/juliamodule.jl | 146 ++++++++++++++++++++++++++++ test/runners/scripts/error_sum.jl | 2 + test/runners/scripts/sum.jl | 1 + test/runners/scripts/verbose_sum.jl | 3 + test/runtests.jl | 13 ++- test/testutils.jl | 38 ++++++++ 9 files changed, 266 insertions(+), 1 deletion(-) create mode 100644 src/runners/Runners.jl create mode 100644 src/runners/juliamodule.jl create mode 100644 test/runners/juliamodule.jl create mode 100644 test/runners/scripts/error_sum.jl create mode 100644 test/runners/scripts/sum.jl create mode 100644 test/runners/scripts/verbose_sum.jl create mode 100644 test/testutils.jl diff --git a/src/Workflows.jl b/src/Workflows.jl index 2bb6da7..98cc0e7 100644 --- a/src/Workflows.jl +++ b/src/Workflows.jl @@ -4,5 +4,7 @@ using Configurations include("dialects/Dialects.jl") using .Dialects: load_config, save_config +include("runners/Runners.jl") +using .Runners end diff --git a/src/runners/Runners.jl b/src/runners/Runners.jl new file mode 100644 index 0000000..51b790b --- /dev/null +++ b/src/runners/Runners.jl @@ -0,0 +1,28 @@ +module Runners + +using Configurations +using ..Dialects: AbstractTask +using ..Dialects: task_id, task_name, task_groups, task_deps, task_outs +using ..Dialects: runner_type, runner_info + +abstract type AbstractTaskRunner end + +""" + execute_task([exec::AbstractTaskRunner], t::AbstractTask; workdir=".") + +Execute task `t` using runner `exec` in folder `workdir`. +""" +execute_task(task::AbstractTask; kwargs...) = execute_task(build_runner(task), task; kwargs...) +execute_task(r::RT, t::AbstractTask; kwargs...) where RT<:AbstractTaskRunner = error("Not implemented for runner type: $RT.") + +""" + build_runner(t::AbstractTask) + +Build the runner object for task `t`. +""" +build_runner(t::AbstractTask) = build_runner(Val(Symbol(runner_type(t))), runner_info(t)) +build_runner(::Val{runner_type}, run_info::AbstractDict) where runner_type = error("Not implemented for runner type: $runner_type.") + +include("juliamodule.jl") # runner = "juliamodule" + +end #module diff --git a/src/runners/juliamodule.jl b/src/runners/juliamodule.jl new file mode 100644 index 0000000..954a761 --- /dev/null +++ b/src/runners/juliamodule.jl @@ -0,0 +1,34 @@ +@option struct JuliaModuleRunner <: AbstractTaskRunner + script::String + workdir::String = "@__DIR__" + """set false to disable stdout, i.e., redirect to devnull.""" + enable_stdout::Bool = true + """set false to disable stderr, i.e., redirect to devnull.""" + enable_stderr::Bool = true + """set false to suppress the errors with warning if the command fails to execute.""" + strict::Bool = true +end +build_runner(::Val{:juliamodule}, run_info::AbstractDict) = from_dict(JuliaModuleRunner, run_info) + +function execute_task(exec::JuliaModuleRunner, t::AbstractTask; workdir::String=".", kwargs...) + script = strip(exec.script) + workdir = replace(exec.workdir, "@__DIR__" => workdir) + stdout = exec.enable_stdout ? Base.stdout : devnull + stderr = exec.enable_stderr ? Base.stderr : devnull + cd(workdir) do + script = abspath(script) + m = Module(gensym()) + @debug "Executing task $(task_id(t)) in julia module" workdir=pwd() script + Core.eval(m, :(include(x) = Base.include($m, x))) + try + redirect_stdio(stdout=stdout, stderr=stderr, stdin=devnull) do + Core.eval(m, :(Base.include($m, $script))) + end + catch err + # Unwrap the LoadError due to `include("script.jl")` call so that it looks like + # a "real" `julia script.jl` process. + err = err isa LoadError ? err.error : err + exec.strict ? rethrow(err) : @warn "failed to execute task $(task_id(t)) in juliamodule runner" err + end + end +end diff --git a/test/runners/juliamodule.jl b/test/runners/juliamodule.jl new file mode 100644 index 0000000..871927e --- /dev/null +++ b/test/runners/juliamodule.jl @@ -0,0 +1,146 @@ +@testset "Runner: juliamodule" begin + @testset "workdir" begin + # The workdir setup is a little bit tricky here as we usually triggers the test + # in project root dir with `pkg> test`. To successfully run the tests interactively, + # we need to set pwd to the "test/runners" dir. + scripts_dir = joinpath(@__DIR__, "scripts") + workdir_prefix = basename(@__DIR__) + + taskinfo = Dict{String,Any}( + "name" => "task1", + "id" => "1", + "runner" => "juliamodule", + ) + + # default workdir with relpath: `"@__DIR__"` will be replaced by `workdir="."` + taskinfo["run"] = Dict( + "script" => "scripts/sum.jl" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="scripts/sum.jl", workdir="@__DIR__") + dirs, rst = with_sandbox(includes=[scripts_dir]) do + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == 55 + + # default workdir with relpath + taskinfo["run"] = Dict( + "script" => "sum.jl" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir="@__DIR__") + dirs, rst = with_sandbox(includes=[scripts_dir]) do + # `"@__DIR__"` will be replaced by `runners/scripts` + readdir(workdir_prefix), execute_task(t; workdir="$workdir_prefix/scripts") + end + @test dirs == ["scripts"] + @test rst == 55 + + # custom workdir with relpath + taskinfo["run"] = Dict( + "script" => "sum.jl", + "workdir" => "$workdir_prefix/scripts" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir="$workdir_prefix/scripts") + dirs, rst = with_sandbox(includes=[scripts_dir]) do + # `workdir` doesn't affect here at all + workdir = joinpath(tempdir(), "somewhere") + readdir(workdir_prefix), execute_task(t; workdir=workdir) + end + @test dirs == ["scripts"] + @test rst == 55 + + # custom workdir with relpath and @__DIR__ + taskinfo["run"] = Dict( + "script" => "sum.jl", + "workdir" => "@__DIR__/scripts" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir="@__DIR__/scripts") + dirs, rst = with_sandbox(includes=[scripts_dir]) do + # "@__DIR__/scripts" will be replaced by relative path "runners/scripts" + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == 55 + + # check if we can successfully suppress stdout/stderr + taskinfo["run"] = Dict( + "script" => "scripts/verbose_sum.jl", + "enable_stdout" => false, + "enable_stderr" => false + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + rst = with_sandbox(includes=[scripts_dir]) do + execute_task(t; workdir=workdir_prefix) + end + @test rst == 55 + + # by default, it's strict mode + taskinfo["run"] = Dict( + "script" => "scripts/error_sum.jl", + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + with_sandbox(includes=[scripts_dir]) do + try + execute_task(t; workdir=workdir_prefix) + catch err + @test err isa ErrorException + @test err.msg == "Hi 你好" + end + end + + # but we can suppress exception with warnings + taskinfo["run"] = Dict( + "script" => "scripts/error_sum.jl", + "strict" => false + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + with_sandbox(includes=[scripts_dir]) do + msg = @capture_err execute_task(t; workdir=workdir_prefix) + @test occursin("Warning: failed to execute task 1 in juliamodule runner", msg) + @test occursin("err = Hi 你好", msg) + end + + # custom workdir with abspath and @__DIR__ + taskinfo["run"] = Dict( + "script" => "sum.jl", + "workdir" => joinpath(pwd(), "@__DIR__/scripts") + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir=joinpath(pwd(), "@__DIR__/scripts")) + dirs, rst = with_sandbox() do + # This might be a little bit unintuitive + # "$(pwd())/@__DIR__/scripts" will be replaced by absolute path "$(pwd())/runners/scripts" + readdir(), execute_task(t; workdir=workdir_prefix) + end + @test dirs == [] + @test rst == 55 + + # custom workdir with abspath + taskinfo["run"] = Dict( + "script" => "sum.jl", + "workdir" => scripts_dir + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec == JuliaModuleRunner(script="sum.jl", workdir=scripts_dir) + dirs, rst = with_sandbox() do + # `workdir` doesn't affect here at all + workdir = joinpath(tempdir(), "somewhere") + readdir(), execute_task(t; workdir=workdir) + end + @test dirs == [] + @test rst == 55 + end +end diff --git a/test/runners/scripts/error_sum.jl b/test/runners/scripts/error_sum.jl new file mode 100644 index 0000000..dea6914 --- /dev/null +++ b/test/runners/scripts/error_sum.jl @@ -0,0 +1,2 @@ +error("Hi 你好") +sum(1:10) # 55 diff --git a/test/runners/scripts/sum.jl b/test/runners/scripts/sum.jl new file mode 100644 index 0000000..6456c11 --- /dev/null +++ b/test/runners/scripts/sum.jl @@ -0,0 +1 @@ +sum(1:10) # 55 diff --git a/test/runners/scripts/verbose_sum.jl b/test/runners/scripts/verbose_sum.jl new file mode 100644 index 0000000..bdc3798 --- /dev/null +++ b/test/runners/scripts/verbose_sum.jl @@ -0,0 +1,3 @@ +println("You should not see this message!") +@info "You should not see this message!" +sum(1:10) # 55 diff --git a/test/runtests.jl b/test/runtests.jl index febf168..b9b011d 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,5 +1,12 @@ -using Workflows +using Configurations +using Workflows.Dialects: SimpleTask +using Workflows.Runners: execute_task, build_runner, capture_run +using Workflows.Runners: JuliaModuleRunner +using Suppressor using Test +using TOML + +include("testutils.jl") @testset "Workflows.jl" begin @info "run dialects test" @@ -8,4 +15,8 @@ using Test include("dialects/manifest.jl") include("dialects/config_io.jl") end + + @testset "runners" begin + include("runners/juliamodule.jl") + end end diff --git a/test/testutils.jl b/test/testutils.jl new file mode 100644 index 0000000..65261c6 --- /dev/null +++ b/test/testutils.jl @@ -0,0 +1,38 @@ +const _sandbox_root = mktempdir() + +""" + with_sandbox(f; includes=[""]) + +run function `f()` in a simple sandbox directory, with `includes` copied there. + +# Examples + +```julia +julia> with_sandbox() do + pwd() +end +"/private/var/folders/34/km3mmt5930gc4pzq1d08jvjw0000gn/T/jl_rNgOVo/jl_BSkXtM" + +julia> with_sandbox(includes=["scripts", "not_exists"]) do + readdir() +end +┌ Warning: source path not existed +│ path = "not_exists" +└ @ Main REPL[35]:9 +1-element Vector{String}: +"scripts" +``` +""" +function with_sandbox(f; includes::Vector{String}=String[], follow_symlinks=false) + sandbox_root = mktempdir(_sandbox_root) + for src_path in includes + dst_path = joinpath(sandbox_root, relpath(abspath(src_path))) + if ispath(src_path) + mkpath(dirname(dst_path)) + cp(src_path, dst_path; follow_symlinks=follow_symlinks) + else + @warn "source path not existed" path=src_path + end + end + cd(f, sandbox_root) +end From 0e2f4d453b749d92e9092a491f38166392df2b34 Mon Sep 17 00:00:00 2001 From: Johnny Chen Date: Sun, 13 Feb 2022 19:16:19 +0800 Subject: [PATCH 4/7] compat: redirect_stdio for julia 1.6 Also set julia lower bound to 1.6 --- .github/workflows/UnitTest.yml | 2 +- Project.toml | 2 +- src/runners/Runners.jl | 2 ++ src/runners/compat.jl | 55 ++++++++++++++++++++++++++++++++++ 4 files changed, 59 insertions(+), 2 deletions(-) create mode 100644 src/runners/compat.jl diff --git a/.github/workflows/UnitTest.yml b/.github/workflows/UnitTest.yml index 67cd575..1a3e49e 100644 --- a/.github/workflows/UnitTest.yml +++ b/.github/workflows/UnitTest.yml @@ -14,7 +14,7 @@ jobs: strategy: fail-fast: false matrix: - julia-version: ['1.0', '1', 'nightly'] + julia-version: ['1.6', '1', 'nightly'] os: [ubuntu-latest] arch: [x64] include: diff --git a/Project.toml b/Project.toml index d7452ca..27bfb04 100644 --- a/Project.toml +++ b/Project.toml @@ -11,7 +11,7 @@ TOML = "fa267f1f-6049-4f14-aa54-33bafae1ed76" [compat] Configurations = "0.17" TOML = "1" -julia = "1" +julia = "1.6" [extras] Suppressor = "fd094767-a336-5f1f-9728-57cf17d0bbfb" diff --git a/src/runners/Runners.jl b/src/runners/Runners.jl index 51b790b..0f02a5f 100644 --- a/src/runners/Runners.jl +++ b/src/runners/Runners.jl @@ -25,4 +25,6 @@ build_runner(::Val{runner_type}, run_info::AbstractDict) where runner_type = err include("juliamodule.jl") # runner = "juliamodule" +include("compat.jl") + end #module diff --git a/src/runners/compat.jl b/src/runners/compat.jl new file mode 100644 index 0000000..09b917c --- /dev/null +++ b/src/runners/compat.jl @@ -0,0 +1,55 @@ +@static if VERSION < v"1.7" +# copied from https://github.com/JuliaLang/julia/blob/a400a24a5d9e6609740814e4092538feb499ce60/base/stream.jl#L1290-L1415 + +function redirect_stdio(;stdin=nothing, stderr=nothing, stdout=nothing) + stdin === nothing || redirect_stdin(stdin) + stderr === nothing || redirect_stderr(stderr) + stdout === nothing || redirect_stdout(stdout) +end + +function redirect_stdio(f; stdin=nothing, stderr=nothing, stdout=nothing) + + function resolve(new::Nothing, oldstream, mode) + (new=nothing, close=false, old=nothing) + end + function resolve(path::AbstractString, oldstream,mode) + (new=open(path, mode), close=true, old=oldstream) + end + function resolve(new, oldstream, mode) + (new=new, close=false, old=oldstream) + end + + same_path(x, y) = false + function same_path(x::AbstractString, y::AbstractString) + # if x = y = "does_not_yet_exist.txt" then samefile will return false + (abspath(x) == abspath(y)) || samefile(x,y) + end + if same_path(stderr, stdin) + throw(ArgumentError("stdin and stderr cannot be the same path")) + end + if same_path(stdout, stdin) + throw(ArgumentError("stdin and stdout cannot be the same path")) + end + + new_in , close_in , old_in = resolve(stdin , Base.stdin , "r") + new_out, close_out, old_out = resolve(stdout, Base.stdout, "w") + if same_path(stderr, stdout) + # make sure that in case stderr = stdout = "same/path" + # only a single io is used instead of opening the same file twice + new_err, close_err, old_err = new_out, false, Base.stderr + else + new_err, close_err, old_err = resolve(stderr, Base.stderr, "w") + end + + redirect_stdio(; stderr=new_err, stdin=new_in, stdout=new_out) + + try + return f() + finally + redirect_stdio(;stderr=old_err, stdin=old_in, stdout=old_out) + close_err && close(new_err) + close_in && close(new_in ) + close_out && close(new_out) + end +end +end From bee65463dcbe4157f22c2dc21f903c3bcf742e20 Mon Sep 17 00:00:00 2001 From: Johnny Chen Date: Sat, 12 Feb 2022 13:50:22 +0800 Subject: [PATCH 5/7] initial implementation of shell runner --- src/runners/Runners.jl | 1 + src/runners/shell.jl | 210 ++++++++++++++++++++++++ test/runners/scripts/envcheck.jl | 1 + test/runners/scripts/log_println_sum.jl | 4 + test/runners/scripts/println_sum.jl | 1 + test/runners/shell.jl | 206 +++++++++++++++++++++++ test/runtests.jl | 5 +- 7 files changed, 427 insertions(+), 1 deletion(-) create mode 100644 src/runners/shell.jl create mode 100644 test/runners/scripts/envcheck.jl create mode 100644 test/runners/scripts/log_println_sum.jl create mode 100644 test/runners/scripts/println_sum.jl create mode 100644 test/runners/shell.jl diff --git a/src/runners/Runners.jl b/src/runners/Runners.jl index 0f02a5f..1f46e5b 100644 --- a/src/runners/Runners.jl +++ b/src/runners/Runners.jl @@ -24,6 +24,7 @@ build_runner(t::AbstractTask) = build_runner(Val(Symbol(runner_type(t))), runner build_runner(::Val{runner_type}, run_info::AbstractDict) where runner_type = error("Not implemented for runner type: $runner_type.") include("juliamodule.jl") # runner = "juliamodule" +include("shell.jl") # runner = "shell" include("compat.jl") diff --git a/src/runners/shell.jl b/src/runners/shell.jl new file mode 100644 index 0000000..22a03fb --- /dev/null +++ b/src/runners/shell.jl @@ -0,0 +1,210 @@ +@option struct ShellRunner <: AbstractTaskRunner + command::String + workdir::String = "@__DIR__" + """set true to capture the last non-empty line of stdout as execution result. The captured result will not be printed to stdout.""" + capture::Bool = false + """set false to disable stdout, i.e., redirect to devnull.""" + enable_stdout::Bool = true + """set false to disable stderr, i.e., redirect to devnull.""" + enable_stderr::Bool = true + """set false to suppress the errors with warning if the command fails to execute.""" + strict::Bool = true +end +build_runner(::Val{:shell}, run_info::AbstractDict) = from_dict(ShellRunner, run_info) + +function execute_task(exec::ShellRunner, t::AbstractTask; workdir::String=".", env=nothing) + cmd = strip(exec.command) + workdir = replace(exec.workdir, "@__DIR__" => workdir) + cd(workdir) do + @debug "executing task $(task_id(t)) in shell runner" workdir=pwd() + # TODO: redirect stdout as result + stdout = exec.enable_stdout ? Base.stdout : devnull + stderr = exec.enable_stderr ? Base.stderr : devnull + try + if exec.capture + capture_run(cmd; stdout=stdout, stderr=stderr, env=env) + else + non_capture_run(cmd; stdout=stdout, stderr=stderr, env=env) + end + catch err + exec.strict ? rethrow() : @warn "failed to execute task $(task_id(t)) in shell runner" err + # TODO(johnnychen94): maybe cleanup intermediate results? + end + end +end + + +struct ShellExecutionError <: Exception + msg::String +end + +""" + capture_run(cmd; [stdout], [stderr], [env]) + +Run command `cmd` and capture the last non-empty line of `stdout` as the function result. + +# Examples + +```julia +# supprssing io with `devnull` doesn't affect the capturing +julia>capture_run(`printf "hello world \\n2"`; stdout=devnull) +"2" + +julia>capture_run(`julia -e 'println("hello world"); println(rand(1:4, 4, 4))'`; stdout=devnull) +"[2 1 4 4; 1 2 3 4; 3 2 4 4; 1 1 1 2]" + +# escape sequences are kept faithfully; it's the caller's responsibility to appropriately handle them. +julia> capture_run(`julia -e 'println("\e[0;31mred")'`) +"\e[0;31mred" +``` +""" +function capture_run(cmd::Union{Cmd, Base.OrCmds}; stdout=stdout, stderr=stderr) + interval = 0.1 # 100ms + isnewline_char(x) = x == UInt8('\n') # TODO(johnnychen94): maybe also check CRLF? + + function continue_line!(line, buffer) + data = take!(buffer) + idx = findlast(isnewline_char, data) + if isnothing(idx) + # if the current line is not yet finished + append!(line, data) + else + # otherwise, we get a new line and the previous results can be safely printed + + if length(line) > 0 && (isnewline_char(line[end])) + write(stdout, line) + resize!(line, 0) + end + + # now let's check if the new contents has if only one non-empty line + if last(idx) == length(data) + # ignore trailing newlines by searching backwards from the first position + # that is not '\n'. + prev_idx = findprev(x->!isnewline_char(x), data, first(idx)) + if isnothing(prev_idx) + # with only `\n`s, simply appending to line should be sufficient if this + # happens to be the last line, we will eventually discard the trailing + # newlines + append!(line, data) + else + prev_idx = findprev(isnewline_char, data, prev_idx) + if isnothing(prev_idx) + # if contains only one line, great + append!(line, data) + else + # otherwise, only append the last non-empty line + write(stdout, @view data[1:last(prev_idx)]) + resize!(line, 0) + append!(line, @view data[last(prev_idx)+1:end]) + end + end + else + # otherwise, print previous lines to stdout, and reset the current line with + # new line content + write(stdout, @view data[1:last(idx)]) + resize!(line, 0) + append!(line, @view data[last(idx)+1:end]) + end + end + + return line + end + + line = UInt8[] + buffer = IOBuffer() + proc = try + run(pipeline(cmd, stdout=buffer, stderr=stderr), wait=false) + catch err + err isa Base.IOError && rethrow(ShellExecutionError(err.msg)) + rethrow() + end + + while process_running(proc) + continue_line!(line, buffer) + sleep(interval) + end + + @assert process_exited(proc) + if proc.exitcode != 0 + # If the process existed unexpectedly, the last non-empty line in the buffer is + # almost useless. Thus here we directly rethrow the errors, and let the caller + # decide how to handle the exception. + write(stdout, line) + throw(ShellExecutionError("non-zero exit code: $(proc.exitcode)")) + end + continue_line!(line, buffer) + + # discard trailing '\n's + idx = findprev(x->!isnewline_char(x), line, length(line)) + if isnothing(idx) + return "" + else + resize!(line, idx) + end + # if it contains multiple lines, only capture the last line + idx = findfirst(isnewline_char, line) + if !(isnothing(idx) || (idx == length(line))) + write(stdout, @view line[1:last(idx)]) + return String(line[last(idx)+1:end]) + else + return String(line) + end +end + +function capture_run(cmd::AbstractString; env=nothing, kwargs...) + try + capture_run(build_cmd_pipeline(cmd; env=env); kwargs...) + catch err + if err isa ShellExecutionError + rethrow(ShellExecutionError("failed to execute command `$cmd`: $(err.msg)")) + end + rethrow() + end +end + +""" + non_capture_run(cmd; [stdout], [stderr], [env]) + +Run command `cmd` similar to `run(pipeline(cmd; stdout, stderr); wait=true)` except that it +throws `ShellExecutionError` if the command fails. +""" +function non_capture_run(cmd::Union{Cmd, Base.OrCmds}; stdout=stdout, stderr=stderr) + interval = 0.1 #100ms + proc = try + # set `wait=false` here and do manual check for better exception handling + run(pipeline(cmd, stdout=stdout, stderr=stderr); wait=false) + catch err + err isa Base.IOError && rethrow(ShellExecutionError(err.msg)) + rethrow() + end + while process_running(proc) + sleep(interval) + end + @assert process_exited(proc) + if proc.exitcode != 0 + throw(ShellExecutionError("non-zero exit code: $(proc.exitcode)")) + end +end + +function non_capture_run(cmd::AbstractString; env=nothing, kwargs...) + try + non_capture_run(build_cmd_pipeline(cmd; env=env); kwargs...) + catch err + if err isa ShellExecutionError + rethrow(ShellExecutionError("failed to execute command `$cmd`: $(err.msg)")) + end + rethrow() + end + return "" # to keep consistent with `capture_run` version +end + +# TODO(johnnychen94): support shell pipes such as "grep results.csv julia | cut -d, -f2" (#7) +function build_cmd_pipeline(cmd::AbstractString; env=nothing) + # We can't simply interpolate the entire string by `$(cmd)`: + # ```julia + # cmd = "echo 'hello world'" + # run(`$cmd`) # errors + # run(`echo 'hello world'`) # works + # ``` + return Cmd(Cmd(Base.shell_split(cmd)); env=env) +end diff --git a/test/runners/scripts/envcheck.jl b/test/runners/scripts/envcheck.jl new file mode 100644 index 0000000..7af91f0 --- /dev/null +++ b/test/runners/scripts/envcheck.jl @@ -0,0 +1 @@ +println(ENV["HOWLONG"]) diff --git a/test/runners/scripts/log_println_sum.jl b/test/runners/scripts/log_println_sum.jl new file mode 100644 index 0000000..442c1ce --- /dev/null +++ b/test/runners/scripts/log_println_sum.jl @@ -0,0 +1,4 @@ +msg = "You are not expected to see this message during test." +println(msg) +@warn msg +println(sum(1:10)) # 55 diff --git a/test/runners/scripts/println_sum.jl b/test/runners/scripts/println_sum.jl new file mode 100644 index 0000000..bca11e7 --- /dev/null +++ b/test/runners/scripts/println_sum.jl @@ -0,0 +1 @@ +println(sum(1:10)) # 55 diff --git a/test/runners/shell.jl b/test/runners/shell.jl new file mode 100644 index 0000000..e0d2ad6 --- /dev/null +++ b/test/runners/shell.jl @@ -0,0 +1,206 @@ +@testset "capture_run" begin + # single line + rst = capture_run(`julia --startup=no -e 'println("hello world")'`) + @test rst == "hello world" + + # last evaluation result is not the return value + rst = capture_run(`julia --startup=no -e 'println("hello world"); 1+1'`) + @test rst == "hello world" + + # stderr is also not the return value + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello world"); @info "hi"'`; stderr=io) + @test String(take!(io)) == "[ Info: hi\n" + @test rst == "hello world" + + # multiple lines + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello world"); print(1+1)'`, stdout=io) + @test String(take!(io)) == "hello world\n" + @test rst == "2" + + # last \n is discarded + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello world"); println(1+1)'`, stdout=io) + @test String(take!(io)) == "hello world\n" + @test rst == "2" + + # if errors, throw exception + io = IOBuffer() + try + capture_run(`julia --startup=no -e 'error("some error")'`; stderr=io) + catch err + @test err == ShellExecutionError("non-zero exit code: 1") + finally + @test occursin("ERROR: some error", String(take!(io))) + end + try + capture_run(`not_a_command`) + catch err + @test err isa ShellExecutionError && occursin("could not spawn `not_a_command`", err.msg) + end + + # multiple trailing '\n's are ignored + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello \n\n\nworld\n\n\n")'`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + @test rst == "world" + io = IOBuffer() + rst = capture_run(`julia --startup=no -e 'println("hello \n\n\nworld")'`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + @test rst == "world" + if !Sys.iswindows() + io = IOBuffer() + @test "world" == capture_run(`printf "hello \n\n\nworld"`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + io = IOBuffer() + @test "world" == capture_run(`printf "hello \n\n\nworld\n"`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + io = IOBuffer() + @test "world" == capture_run(`printf "hello \n\n\nworld\n\n\n"`; stdout=io) + @test String(take!(io)) == "hello \n\n\n" + end + + # Test if a looooooooooong line is kept as expected + function test_large_data(n = 1000) + script = """ + using TOML + data = 1:$n + TOML.print(Dict("data"=>data)) + """ + rst = with_sandbox() do + write("script.jl", script) + capture_run(`julia --startup=no script.jl`; stdout=devnull) + end + @test TOML.parse(rst)["data"] == collect(1:n) + end + for (n_repeat, sz) in [(3, 1), (3, 10), (3, 100), (2, 1000), (1, 10000), (1, 100000)] + for _ in 1:n_repeat + test_large_data(sz) + end + end +end + +@testset "Runner: shell" begin + # The workdir setup is a little bit tricky here as we usually triggers the test + # in project root dir with `pkg> test`. To successfully run the tests interactively, + # we need to set pwd to the "test/runners" dir. + scripts_dir = joinpath(@__DIR__, "scripts") + workdir_prefix = basename(@__DIR__) + + taskinfo = Dict{String,Any}( + "name" => "task1", + "id" => "1", + "runner" => "shell", + ) + + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/sum.jl", + "capture" => true, + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + dirs, rst = with_sandbox(includes=[scripts_dir]) do + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == "" + + # check if we successfully parsed the stdout as return value + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/println_sum.jl", + "capture" => true, + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + dirs, rst = with_sandbox(includes=[scripts_dir]) do + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == "55" + + # non-capture mode shell runner + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/println_sum.jl" + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec.capture == false # default is non-capturing + with_sandbox(includes=[scripts_dir]) do + dirs = readdir(workdir_prefix) + rst = @suppress_out execute_task(t; workdir=workdir_prefix) + out = @capture_out execute_task(t; workdir=workdir_prefix) + @test dirs == ["scripts"] + @test rst == "" + @test out == "55\n" + end + + # check if we can successfully suppress stdout/stderr + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/log_println_sum.jl", + "enable_stdout" => false, + "enable_stderr" => false, + "capture" => true, + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + dirs, rst = with_sandbox(includes=[scripts_dir]) do + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix) + end + @test dirs == ["scripts"] + @test rst == "55" + + # strict mode: error + taskinfo["run"] = Dict( + "command" => "julia --startup=no nothing.jl", + "strict" => true, + "enable_stderr" => false + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec.strict == true + try + execute_task(t) + catch err + @test err == ShellExecutionError("failed to execute command `julia --startup=no nothing.jl`: non-zero exit code: 1") + end + + # non-strict mode: warning instead of error + taskinfo["run"] = Dict( + "command" => "julia --startup=no nothing.jl", + "strict" => false + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + @test exec.strict == false + msg = @capture_err execute_task(t) + @test occursin("SystemError: opening file", msg) || occursin("could not open file", msg) + @test occursin("failed to execute command `julia --startup=no nothing.jl`", msg) + + # env passing + taskinfo["run"] = Dict( + "command" => "julia --startup=no scripts/envcheck.jl", + "capture" => true, + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + dirs, rst = with_sandbox(includes=[scripts_dir]) do + # this would otherwise fail if `env=nothing` + env=copy(ENV); env["HOWLONG"] = "forever!" + readdir(workdir_prefix), execute_task(t; workdir=workdir_prefix, env=env) + end + @test dirs == ["scripts"] + @test rst == "forever!" + + # check command split + # cmd = "echo 'hello world'" + # `echo 'hello world'` is different from `$(cmd)` + taskinfo["run"] = Dict( + "command" => "julia --startup=no -e 'println(\"hello world\")'", + "capture" => true + ) + t = from_dict(SimpleTask, taskinfo) + exec = build_runner(t) + rst = execute_task(t) + @test rst == "hello world" +end diff --git a/test/runtests.jl b/test/runtests.jl index b9b011d..b1b261f 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,7 +1,8 @@ using Configurations using Workflows.Dialects: SimpleTask using Workflows.Runners: execute_task, build_runner, capture_run -using Workflows.Runners: JuliaModuleRunner +using Workflows.Runners: ShellExecutionError +using Workflows.Runners: JuliaModuleRunner, ShellRunner using Suppressor using Test using TOML @@ -16,7 +17,9 @@ include("testutils.jl") include("dialects/config_io.jl") end + @info "run runners test" @testset "runners" begin include("runners/juliamodule.jl") + include("runners/shell.jl") end end From e2ae6c6272f0b6617c9c9924907b72320ad6f78e Mon Sep 17 00:00:00 2001 From: Johnny Chen Date: Sat, 12 Feb 2022 19:23:49 +0800 Subject: [PATCH 6/7] initial scheduler implementation This implementation provides a single-threaded workflow scheduler targets specifically to ManifestWorkflow with PipelineOrder. --- .gitignore | 2 + Project.toml | 3 + src/Workflows.jl | 7 +- src/dialects/orders.jl | 26 ++++++ src/scheduler.jl | 95 +++++++++++++++++++++ test/examples/manifest/.gitignore | 1 + test/examples/manifest/Project.toml | 3 + test/examples/manifest/print.jl | 1 + test/examples/manifest/scripts/exp.jl | 13 +++ test/examples/manifest/scripts/large_sum.jl | 10 +++ test/examples/manifest/scripts/small_sum.jl | 10 +++ test/examples/manifest/simple.toml | 62 ++++++++++++++ test/examples/manifest/summary.jl | 21 +++++ test/runtests.jl | 5 ++ test/scheduler.jl | 26 ++++++ 15 files changed, 284 insertions(+), 1 deletion(-) create mode 100644 src/scheduler.jl create mode 100644 test/examples/manifest/.gitignore create mode 100644 test/examples/manifest/Project.toml create mode 100644 test/examples/manifest/print.jl create mode 100644 test/examples/manifest/scripts/exp.jl create mode 100644 test/examples/manifest/scripts/large_sum.jl create mode 100644 test/examples/manifest/scripts/small_sum.jl create mode 100644 test/examples/manifest/simple.toml create mode 100644 test/examples/manifest/summary.jl create mode 100644 test/scheduler.jl diff --git a/.gitignore b/.gitignore index 43324e6..b1f89f7 100644 --- a/.gitignore +++ b/.gitignore @@ -8,3 +8,5 @@ Manifest.toml __pycache__/ .mypy_cache/ + +.workflows diff --git a/Project.toml b/Project.toml index 27bfb04..a921099 100644 --- a/Project.toml +++ b/Project.toml @@ -5,11 +5,14 @@ version = "0.1.0" [deps] Configurations = "5218b696-f38b-4ac9-8b61-a12ec717816d" +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" Printf = "de0858da-6303-5e67-8744-51eddeeeb8d7" +SHA = "ea8e919c-243c-51af-8825-aaa63cd721ce" TOML = "fa267f1f-6049-4f14-aa54-33bafae1ed76" [compat] Configurations = "0.17" +JSON3 = "1" TOML = "1" julia = "1.6" diff --git a/src/Workflows.jl b/src/Workflows.jl index 98cc0e7..27df160 100644 --- a/src/Workflows.jl +++ b/src/Workflows.jl @@ -1,10 +1,15 @@ module Workflows using Configurations +using SHA +using JSON3 include("dialects/Dialects.jl") using .Dialects: load_config, save_config +using .Dialects: AbstractWorkflow, ManifestWorkflow +using .Dialects: task_id, task_deps include("runners/Runners.jl") -using .Runners +using .Runners: execute_task +include("scheduler.jl") end diff --git a/src/dialects/orders.jl b/src/dialects/orders.jl index 85564c5..23ddf36 100644 --- a/src/dialects/orders.jl +++ b/src/dialects/orders.jl @@ -34,4 +34,30 @@ end PipelineOrder(stages::Vector{String}) = PipelineOrder([[x] for x in stages]) Base.:(==)(p1::PipelineOrder, p2::PipelineOrder) = p1.stages == p2.stages +function Base.iterate(o::PipelineOrder) + (length(o.stages) >= 1 && length(o.stages[1]) >= 1) || return nothing + status = falses(sum(length, o.stages)) + status[1] = true + return o.stages[1][1], status +end +function Base.iterate(o::PipelineOrder, state) + length(o.stages) == 0 && return nothing + n = 0 + for i in 1:length(o.stages) + cur_stage = o.stages[i] + stage_status = @view state[n+1:n+length(cur_stage)] + n += length(cur_stage) + all(stage_status) && continue + idx = findfirst(x->!x, stage_status) + if !isnothing(idx) + # Techniquelly, this can be thread unsafe in the sense that one task can be + # executed multiple times in parallel. It's the caller's duty to maintain the + # runtime status pool. + stage_status[idx] = true + return cur_stage[idx], state + end + end + return nothing +end + # TODO(johnnychen94): add DAG support diff --git a/src/scheduler.jl b/src/scheduler.jl new file mode 100644 index 0000000..d8952dc --- /dev/null +++ b/src/scheduler.jl @@ -0,0 +1,95 @@ +""" + execute(filename::AbstractString; workdir=dirname(filename), kwargs...) + execute(w::AbstractWorkflow; workdir=".", kwargs...) + +Execute workflow `w` or workflow defined in file `filename`. + +# Parameters + +- `workdir`: the working folder to execute the workflow. +- `cleanup=false`: true to clean the internal tmp files in `.workflows/tmp` folder when + julia exits. +""" +function execute(filename::AbstractString; workdir="", kwargs...) + w = load_config(filename) + workdir = isempty(workdir) ? dirname(filename) : workdir + execute(w; workdir=workdir, kwargs...) +end + +function execute(w::ManifestWorkflow; workdir=".", cleanup=false) + tasklocks = Dict{String,ReentrantLock}() + results = Dict{String,Any}() # store the captured stdout result of shell runner + + tmpdir = workflow_tmpdir(w; workdir=workdir) + @info "execute workflow" workdir cleanup tmpdir + if cleanup + atexit() do + tmpdir = abspath(tmpdir) + rm(tmpdir; recursive=true, force=true) + tmproot = dirname(tmpdir) + if isempty(readdir(tmproot)) + rm(tmproot) + end + end + end + + # TODO(johnnychen94): support threads (#6) + for taskid in w.order + haskey(tasklocks, taskid) || (tasklocks[taskid] = ReentrantLock()) + lock(tasklocks[taskid]) do + t = w.tasks[taskid] + tid = task_id(t) + + # If this task requests STDOUT results from previous tasks, then dump the data + # as json file and store the file path as environment variable. The task + # script/command is resonsible for appropriately handling this. + stdout_deps = _get_stdout_deps(results, t) + tmpfile = joinpath(tmpdir, "deps_$tid.json") + env = _dump_stdout_to_temp(tmpfile, stdout_deps; workdir=workdir) + if !isnothing(env) + @info "prepare task $(tid)" tmpfile + end + + @info "Executing task $(tid)" + results[taskid] = execute_task(t; workdir=workdir, env=env) + end + end + return +end + + +function _dump_stdout_to_temp(filename::String, data::Dict; workdir) + isempty(data) && return nothing + + tmpdir = dirname(filename) + isdir(tmpdir) || mkpath(tmpdir) + open(filename, "w") do io + JSON3.write(io, data) + end + + env = copy(ENV) + env["WORKFLOW_TMP_INFILE"] = relpath(filename, workdir) + return env +end + +function _get_stdout_deps(results, t) + # `results` is readonly here + + # Prepare all `@__STDOUT__*` results required by task `t` and dump into filesystem + function get_result(name) + m = match(r"@__STDOUT__(?.*)", name) + isnothing(m) && return nothing + id = m[:taskid] + if !haskey(results, id) + @warn "task \"$(task_id(t))\" requests stdout of task \"$id\" but failed to get it" + return nothing + end + return id => results[id] + end + return Dict(filter!(x->!isnothing(x), map(get_result, task_deps(t)))) +end + +function workflow_tmpdir(w; workdir=".") + val = bytes2hex(sha256(to_toml(w)))[1:8] + return joinpath(workdir, ".workflows", "tmp", val) +end diff --git a/test/examples/manifest/.gitignore b/test/examples/manifest/.gitignore new file mode 100644 index 0000000..4323e36 --- /dev/null +++ b/test/examples/manifest/.gitignore @@ -0,0 +1 @@ +/test_outs/ diff --git a/test/examples/manifest/Project.toml b/test/examples/manifest/Project.toml new file mode 100644 index 0000000..cd53ab1 --- /dev/null +++ b/test/examples/manifest/Project.toml @@ -0,0 +1,3 @@ +[deps] +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +Pkg = "44cfe95a-1eb2-52ea-b672-e2afdf69b78f" diff --git a/test/examples/manifest/print.jl b/test/examples/manifest/print.jl new file mode 100644 index 0000000..d0c0ef7 --- /dev/null +++ b/test/examples/manifest/print.jl @@ -0,0 +1 @@ +print(read(joinpath("test_outs", "results.json"), String)) diff --git a/test/examples/manifest/scripts/exp.jl b/test/examples/manifest/scripts/exp.jl new file mode 100644 index 0000000..fb9c4ed --- /dev/null +++ b/test/examples/manifest/scripts/exp.jl @@ -0,0 +1,13 @@ +using JSON3 + +rst = Dict() +rst["exp"] = @elapsed sum(exp, 1:10) + +mkpath("test_outs") +open(joinpath("test_outs", "exp.json"), "w") do io + JSON3.write(io, rst) +end + +# this message will be discarded for shell runner with `enable_stdout=false` +println("You should not see this message!") +@warn "You should not see this message!" diff --git a/test/examples/manifest/scripts/large_sum.jl b/test/examples/manifest/scripts/large_sum.jl new file mode 100644 index 0000000..7bf650b --- /dev/null +++ b/test/examples/manifest/scripts/large_sum.jl @@ -0,0 +1,10 @@ +using JSON3 + +rst = Dict{Int,Float64}() +for sz in [1000, 2000, 3000] + x = 1:sz + sum(x) + rst[sz] = @elapsed sum(x) +end + +print(JSON3.write(rst)) diff --git a/test/examples/manifest/scripts/small_sum.jl b/test/examples/manifest/scripts/small_sum.jl new file mode 100644 index 0000000..a575329 --- /dev/null +++ b/test/examples/manifest/scripts/small_sum.jl @@ -0,0 +1,10 @@ +using JSON3 + +rst = Dict{Int,Float64}() +for sz in [10, 100, 1000] + x = 1:sz + sum(x) + rst[sz] = @elapsed sum(x) +end + +JSON3.write(rst) diff --git a/test/examples/manifest/simple.toml b/test/examples/manifest/simple.toml new file mode 100644 index 0000000..5e1dffc --- /dev/null +++ b/test/examples/manifest/simple.toml @@ -0,0 +1,62 @@ +version = "0.1" +dialect = "manifest" + +order = [["0"], ["1", "2", "3"], ["4"], ["5"]] + +[[tasks]] +name = "init" +id = "0" +runner = "shell" +[tasks.run] +command = "julia --startup=no --project=. -e 'using Pkg; Pkg.instantiate()'" +enable_stderr = false +enable_stdout = false + +[[tasks]] +name = "small sum" +id = "1" +groups = ["Type:Base", "Type:Benchmark"] +deps = ["scripts"] +runner = "juliamodule" +[tasks.run] +script = "scripts/small_sum.jl" + +[[tasks]] +name = "norm" +id = "2" +groups = ["Package:LinearAlgebra", "Type:Benchmark"] +deps = ["scripts"] +runner = "shell" +[tasks.run] +command = "julia --startup=no scripts/large_sum.jl" +capture = true + +[[tasks]] +name = "exp" +id = "3" +groups = ["Package:Base", "Type:Benchmark"] +deps = ["scripts"] +outs = ["test_outs/exp.json"] +runner = "shell" +[tasks.run] +command = "julia --startup=no scripts/exp.jl" +enable_stderr = false +enable_stdout = false + +[[tasks]] +name = "summary" +id = "4" +groups = ["Type:LinearAlgebra", "Type:Benchmark"] +deps = ["summary.jl", "@__STDOUT__1", "@__STDOUT__2", "test_outs/exp.json"] +outs = ["test_outs/results.json"] +runner = "shell" +[tasks.run] +command = "julia --startup=no summary.jl" + +[[tasks]] +name = "print result" +id = "5" +deps = ["test_outs/results.json"] +runner = "shell" +[tasks.run] +command = "julia --startup=no --project=. print.jl" diff --git a/test/examples/manifest/summary.jl b/test/examples/manifest/summary.jl new file mode 100644 index 0000000..5b87395 --- /dev/null +++ b/test/examples/manifest/summary.jl @@ -0,0 +1,21 @@ +using JSON3 + +# This environment variable is set before the task execution process is spawned +src_file = ENV["WORKFLOW_TMP_INFILE"] + +@assert isfile(src_file) + +data = open(src_file, "r") do io + JSON3.read(io) +end |> Dict + +extra = open(joinpath("test_outs", "exp.json")) do io + JSON3.read(io) +end |> Dict + +merge!(data, extra) + +mkpath("test_outs") +open(joinpath("test_outs", "results.json"), "w") do io + JSON3.write(io, data) +end diff --git a/test/runtests.jl b/test/runtests.jl index b1b261f..79d7ec3 100644 --- a/test/runtests.jl +++ b/test/runtests.jl @@ -1,4 +1,6 @@ using Configurations +using JSON3 +using Workflows using Workflows.Dialects: SimpleTask using Workflows.Runners: execute_task, build_runner, capture_run using Workflows.Runners: ShellExecutionError @@ -22,4 +24,7 @@ include("testutils.jl") include("runners/juliamodule.jl") include("runners/shell.jl") end + + @info "run schduler test" + include("scheduler.jl") end diff --git a/test/scheduler.jl b/test/scheduler.jl new file mode 100644 index 0000000..95a2987 --- /dev/null +++ b/test/scheduler.jl @@ -0,0 +1,26 @@ +@testset "scheduler" begin + examples_dir = joinpath(@__DIR__, "examples") + + @testset "manifest" begin + # cleanup before running test so that we don't get affected by previous runs + rm(joinpath(examples_dir, "manifest", ".workflows"), recursive=true, force=true) + rm(joinpath(examples_dir, "manifest", "test_outs"), recursive=true, force=true) + + with_sandbox(includes=["examples"]) do + workdir = joinpath("examples", "manifest") + + filename = joinpath(workdir, "simple.toml") + msg = @capture_out @suppress_err Workflows.execute(filename) + # msg = @capture_out Workflows.execute(filename) # debug + results = @test_nowarn JSON3.read(msg) + + @test sort!(String.(keys(results))) == ["1", "2", "exp"] + @test sort!(String.(keys(JSON3.read(results["1"])))) == ["10", "100", "1000"] + @test sort!(String.(keys(JSON3.read(results["2"])))) == ["1000", "2000", "3000"] + @test results[:exp] isa Float64 + + @test issubset([".workflows", "test_outs", "Manifest.toml"], readdir(workdir)) + @test strip(read(joinpath(workdir, "test_outs", "results.json"), String)) == strip(msg) + end + end +end From 6b7afcbeb56203ebf157c8267e230b7d83077fd4 Mon Sep 17 00:00:00 2001 From: Johnny Chen Date: Wed, 9 Feb 2022 16:37:58 +0800 Subject: [PATCH 7/7] initial benchmark example using manifest spec --- examples/manifest/benchmark/.gitignore | 1 + examples/manifest/benchmark/Project.toml | 8 +++ examples/manifest/benchmark/README.md | 6 ++ examples/manifest/benchmark/benchmark.toml | 58 +++++++++++++++++++ .../manifest/benchmark/scripts/julia/rand.jl | 12 ++++ .../manifest/benchmark/scripts/julia/sum.jl | 13 +++++ .../manifest/benchmark/scripts/julia/utils.jl | 10 ++++ .../manifest/benchmark/scripts/numpy/randn.py | 9 +++ .../manifest/benchmark/scripts/numpy/sum.py | 10 ++++ .../manifest/benchmark/scripts/numpy/utils.py | 10 ++++ examples/manifest/benchmark/summary.jl | 48 +++++++++++++++ 11 files changed, 185 insertions(+) create mode 100644 examples/manifest/benchmark/.gitignore create mode 100644 examples/manifest/benchmark/Project.toml create mode 100644 examples/manifest/benchmark/README.md create mode 100644 examples/manifest/benchmark/benchmark.toml create mode 100644 examples/manifest/benchmark/scripts/julia/rand.jl create mode 100644 examples/manifest/benchmark/scripts/julia/sum.jl create mode 100644 examples/manifest/benchmark/scripts/julia/utils.jl create mode 100644 examples/manifest/benchmark/scripts/numpy/randn.py create mode 100644 examples/manifest/benchmark/scripts/numpy/sum.py create mode 100644 examples/manifest/benchmark/scripts/numpy/utils.py create mode 100644 examples/manifest/benchmark/summary.jl diff --git a/examples/manifest/benchmark/.gitignore b/examples/manifest/benchmark/.gitignore new file mode 100644 index 0000000..a9a1bd3 --- /dev/null +++ b/examples/manifest/benchmark/.gitignore @@ -0,0 +1 @@ +reports/ diff --git a/examples/manifest/benchmark/Project.toml b/examples/manifest/benchmark/Project.toml new file mode 100644 index 0000000..ee1d685 --- /dev/null +++ b/examples/manifest/benchmark/Project.toml @@ -0,0 +1,8 @@ +[deps] +BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf" +CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b" +DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0" +JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" +LinearAlgebra = "37e2e46d-f89d-539d-b4ee-838fcccc9c8e" +PrettyTables = "08abe8d2-0d0c-5749-adfa-8a2ac140af0d" +Workflows = "115008b9-7a42-4cba-af26-8bebb992e909" diff --git a/examples/manifest/benchmark/README.md b/examples/manifest/benchmark/README.md new file mode 100644 index 0000000..b356cba --- /dev/null +++ b/examples/manifest/benchmark/README.md @@ -0,0 +1,6 @@ +# Benchmark example with the manfiest dialect + +This demo benchmarks Julia with Numpy on a few functions `sum`, `rand` and `randn` using scripts +in `scripts`. + +Package `numpy` is required for the python executable. diff --git a/examples/manifest/benchmark/benchmark.toml b/examples/manifest/benchmark/benchmark.toml new file mode 100644 index 0000000..e0b0658 --- /dev/null +++ b/examples/manifest/benchmark/benchmark.toml @@ -0,0 +1,58 @@ +version = "0.1" +dialect = "manifest" + +order = [["1", "2", "3", "4"], ["5"]] + +[[tasks]] +name = "sum" +id = "1" +groups = ["Type:LinearAlgebra", "Type:Benchmark", "Framework:Julia"] +deps = ["scripts/julia"] +runner = "juliamodule" +[tasks.run] +script = "scripts/julia/sum.jl" + +[[tasks]] +name = "rand" +id = "2" +groups = ["Type:LinearAlgebra", "Type:Benchmark", "Framework:Julia"] +deps = ["scripts/julia"] +runner = "juliamodule" +[tasks.run] +script = "scripts/julia/rand.jl" + +[[tasks]] +name = "sum" +id = "3" +groups = ["Type:LinearAlgebra", "Type:Benchmark", "Framework:Numpy"] +deps = ["scripts/python"] +runner = "shell" +[tasks.run] +command = "python scripts/numpy/sum.py" +capture = true + +[[tasks]] +name = "randn" +id = "4" +groups = ["Type:LinearAlgebra", "Type:Benchmark", "Framework:Numpy"] +deps = ["scripts/python"] +runner = "shell" +[tasks.run] +command = "python scripts/numpy/randn.py" +capture = true + +[[tasks]] +name = "summary" +id = "5" +groups = ["Type:LinearAlgebra", "Type:Benchmark"] +deps = [ + "summary.jl", + "@__STDOUT__1", + "@__STDOUT__2", + "@__STDOUT__3", + "@__STDOUT__4", +] +outs = ["reports"] +runner = "shell" +[tasks.run] +command = "julia --startup=no --project=. summary.jl" diff --git a/examples/manifest/benchmark/scripts/julia/rand.jl b/examples/manifest/benchmark/scripts/julia/rand.jl new file mode 100644 index 0000000..9d16618 --- /dev/null +++ b/examples/manifest/benchmark/scripts/julia/rand.jl @@ -0,0 +1,12 @@ +using LinearAlgebra +using BenchmarkTools +using JSON3 +include(joinpath(@__DIR__, "utils.jl")) + +rst = Dict() +for n in [64, 128, 256, 512, 1024, 2048] + b = @benchmark rand($n) samples=100 evals=1 + rst[n] = trial_to_dict(b) +end + +JSON3.write(rst) diff --git a/examples/manifest/benchmark/scripts/julia/sum.jl b/examples/manifest/benchmark/scripts/julia/sum.jl new file mode 100644 index 0000000..3d3df29 --- /dev/null +++ b/examples/manifest/benchmark/scripts/julia/sum.jl @@ -0,0 +1,13 @@ +using LinearAlgebra +using BenchmarkTools +using JSON3 +include(joinpath(@__DIR__, "utils.jl")) + +rst = Dict() +for n in [64, 128, 256, 512, 1024, 2048, 4096, 8192] + x = rand(n) + b = @benchmark sum($x) samples=100 evals=1 + rst[n] = trial_to_dict(b) +end + +JSON3.write(rst) diff --git a/examples/manifest/benchmark/scripts/julia/utils.jl b/examples/manifest/benchmark/scripts/julia/utils.jl new file mode 100644 index 0000000..af7e434 --- /dev/null +++ b/examples/manifest/benchmark/scripts/julia/utils.jl @@ -0,0 +1,10 @@ +using BenchmarkTools + +function trial_to_dict(trial::BenchmarkTools.Trial) + d = Dict{String, Float64}() + d["time"] = mean(trial.times) + d["gctimes"] = mean(trial.gctimes) + d["allocs"] = trial.allocs + d["memory"] = trial.memory + return d +end diff --git a/examples/manifest/benchmark/scripts/numpy/randn.py b/examples/manifest/benchmark/scripts/numpy/randn.py new file mode 100644 index 0000000..bacf574 --- /dev/null +++ b/examples/manifest/benchmark/scripts/numpy/randn.py @@ -0,0 +1,9 @@ +import numpy as np +import json +from utils import belapsed + +rst: dict = {} +for n in [64, 128, 256, 512, 1024, 2048]: + rst[n] = {"time": belapsed(lambda: np.random.randn(n), number=100)} + +print(json.dumps(rst)) diff --git a/examples/manifest/benchmark/scripts/numpy/sum.py b/examples/manifest/benchmark/scripts/numpy/sum.py new file mode 100644 index 0000000..f4d4d71 --- /dev/null +++ b/examples/manifest/benchmark/scripts/numpy/sum.py @@ -0,0 +1,10 @@ +import numpy as np +import json +from utils import belapsed + +rst: dict = {} +for n in [64, 128, 256, 512, 1024, 2048]: + x = np.random.rand(n) + rst[n] = {"time": belapsed(lambda: x.sum(), number=100)} + +print(json.dumps(rst)) diff --git a/examples/manifest/benchmark/scripts/numpy/utils.py b/examples/manifest/benchmark/scripts/numpy/utils.py new file mode 100644 index 0000000..298a9b6 --- /dev/null +++ b/examples/manifest/benchmark/scripts/numpy/utils.py @@ -0,0 +1,10 @@ +import timeit + + +def belapsed(func, *, number=None): + if number: + return timeit.timeit(func, number=number) / number + else: + timer = timeit.Timer(func) + n, t = timer.autorange() + return t / n diff --git a/examples/manifest/benchmark/summary.jl b/examples/manifest/benchmark/summary.jl new file mode 100644 index 0000000..8709954 --- /dev/null +++ b/examples/manifest/benchmark/summary.jl @@ -0,0 +1,48 @@ +src_file = get(ENV, "WORKFLOW_TMP_INFILE", "") +@assert isfile(src_file) "file $src_file not existed" + +using JSON3 +using DataFrames +using CSV +using PrettyTables + +data = open(src_file) do io + JSON3.read(io) +end + +# flatten and merge results into one single big dataframe +dfs = map(String.(keys(data))) do tid + X = JSON3.read(data[tid]) + X_df = map(collect(keys(X))) do sz + d = Dict(X[sz]) + d[:size] = parse(Int, String(sz)) + d[:tid] = tid + d + end |> DataFrame +end +df = reduce(dfs) do X, Y + outerjoin(X, Y; on=intersect(names(X), names(Y)), matchmissing=:equal) +end + +# format markdown reports +buffer = IOBuffer() +for df_sz in groupby(df, :size) + println(buffer, "# size: ", df_sz[!, :size][1], "\n") + + # drop the types line provided by DataFrames + tmp_buffer = IOBuffer() + PrettyTables.pretty_table( + tmp_buffer, + df_sz; + tf=PrettyTables.tf_markdown) + lines = split(String(take!(tmp_buffer)), "\n") + println(buffer, lines[1]) + foreach(l->println(buffer, l), lines[3:end]) + + println(buffer) +end + +# save final results +isdir("reports") || mkdir("reports") +CSV.write("reports/results.csv", df) +write("reports/report.md", take!(buffer))