Skip to content

Commit

Permalink
initial implementation
Browse files Browse the repository at this point in the history
  • Loading branch information
johnnychen94 committed Aug 5, 2021
1 parent 9571943 commit 7b21a19
Show file tree
Hide file tree
Showing 13 changed files with 293 additions and 2 deletions.
11 changes: 10 additions & 1 deletion Project.toml
Original file line number Diff line number Diff line change
@@ -1,9 +1,18 @@
name = "Workflows"
uuid = "115008b9-7a42-4cba-af26-8bebb992e909"
authors = ["Johnny Chen <[email protected]>"]
version = "0.1.0"
version = "0.1.0-dev"

[deps]
CSV = "336ed68f-0bac-5ca0-87d4-7b16caf5d00b"
DataFrames = "a93c6f00-e57d-5684-b7b6-d8193f3e46c0"
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
TOML = "fa267f1f-6049-4f14-aa54-33bafae1ed76"

[compat]
CSV = "0.6, 0.7, 0.8"
DataFrames = "0.21, 0.22, 1"
JSON3 = "1"
julia = "1"

[extras]
Expand Down
2 changes: 2 additions & 0 deletions example/benchmark/.gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
results.csv
Manifest.toml
34 changes: 34 additions & 0 deletions example/benchmark/Benchmark.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
version = "0"

[[stages]]
name = "benchmark"
deps = ["scripts"]

[stages.run]
metrics = ["time"]
out = ["results.csv"]
driver = "csv"

[[benchmark]]
name = "dilate"
tags = ["juliaimages", "morphology"]

[[benchmark.run]]
source = "scripts/juliaimages/morphology/dilate.jl"
driver = "julia"

[[benchmark]]
name = "erode"
tags = ["juliaimages", "morphology"]

[[benchmark.run]]
source = "scripts/juliaimages/morphology/erode.jl"
driver = "julia"

[[benchmark]]
name = "dilate"
tags = ["skimage", "morphology"]

[[benchmark.run]]
driver = "shell"
command = "python scripts/skimage/morphology/dilate.py"
9 changes: 9 additions & 0 deletions example/benchmark/Project.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
[deps]
BenchmarkTools = "6e4b80f9-dd63-53aa-95a3-0cdb28fa8baf"
ImageMorphology = "787d08f9-d448-5407-9aad-5290dd7ab264"
JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1"
TestImages = "5e47fb64-e119-507b-a336-dd2b206d9990"
Workflows = "115008b9-7a42-4cba-af26-8bebb992e909"

[compat]
Workflows = "0.1"
9 changes: 9 additions & 0 deletions example/benchmark/main.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# Usage:
# ```julia
# pkg> activate example/benchmark
# julia> include("example/benchmark/main.jl")
# ```

using Workflows

run_workflow(joinpath(@__DIR__, "Benchmark.toml"))
12 changes: 12 additions & 0 deletions example/benchmark/scripts/juliaimages/morphology/dilate.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using ImageMorphology
using TestImages
using BenchmarkTools, JSON3

img = TestImages.shepp_logan(400)

rst = @benchmark dilate($img, 1) seconds=1

Dict(
"memory" => rst.memory, # byte
"time" => median(rst.times)/1e6, # ms
) |> JSON3.write
12 changes: 12 additions & 0 deletions example/benchmark/scripts/juliaimages/morphology/erode.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using ImageMorphology
using TestImages
using BenchmarkTools, JSON3

img = TestImages.shepp_logan(400)

rst = @benchmark erode($img, 1) seconds=1

Dict(
"memory" => rst.memory, # byte
"time" => median(rst.times)/1e6, # ms
) |> JSON3.write
15 changes: 15 additions & 0 deletions example/benchmark/scripts/skimage/morphology/dilate.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
import timeit
import json

# setup and test
from skimage.data import shepp_logan_phantom
from skimage.morphology import dilation, square

img = shepp_logan_phantom() # (400, 400)
dilation(img, square(3))

# benchmark
count, time = timeit.Timer('dilation(img, square(3))', globals=globals()).autorange()

# export
print(json.dumps({"time": 1e3*time/count})) # ms
43 changes: 42 additions & 1 deletion src/Workflows.jl
Original file line number Diff line number Diff line change
@@ -1,5 +1,46 @@
module Workflows

# Write your package code here.
using TOML
using JSON3
using CSV
using DataFrames

export run_workflow

include("parsing.jl")
include("drivers.jl")
include("report.jl")

function run_workflow(config_file)
root = abspath(dirname(config_file))

config = TOML.parsefile(config_file)
verify_configuration(config)

stage_names = get_stage_names(config)
for cur_stage_name in stage_names
stats = run_stage(config[cur_stage_name]; root)

stage_config = get_stage_config(config, cur_stage_name)
report_stage(stats, stage_config; root)
end

return true
end

function run_stage(taskpool; root)
map(taskpool) do cur_task
task_id = default_case_id(cur_task)
stats = mapreduce(merge!, cur_task["run"]) do info
try
start_runner(info; root)
catch err
@warn "failed to run task" task=task_id err
JSON3.read("{}")
end
end
task_id => stats
end
end

end #module
31 changes: 31 additions & 0 deletions src/drivers.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
function start_runner(task_info; root)
task_runner = task_info["driver"]
json_strings = if task_runner == "julia"
julia_runner(task_info; root)
elseif task_runner == "shell"
shell_runner(task_info; root)
else
throw(ArgumentError("Unsupported task runner $(task_runner)"))
end

JSON3.read(json_strings)
end

function julia_runner(task_info; root)
script = strip(task_info["source"])
cd(root) do
# run scripts in a sandbox module
m = Module(gensym())
Core.eval(m, :(Base.include($m, $script)))
end
end

function shell_runner(task_info; root)
cmd = strip(task_info["command"])
@assert !isempty(cmd)
cd(root) do
out_io = IOBuffer()
run(pipeline(`sh -c $cmd`; stdout=out_io, stderr=devnull))
String(take!(out_io))
end
end
36 changes: 36 additions & 0 deletions src/parsing.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
function default_case_id(case_info)
# case sensitive
tags = sorted(case_info["tags"]) # order insensitive
join([case_info["name"], tags...], "_")
end

# Each stage can take a very long time to run, so it's a good practice
# to eagerly verify the configuration so that we don't waste time in
# running a broken workflow.
function verify_configuration(config)
stage_names = get_stage_names(config)

# verify if there are duplicate stages
duplicate_stages = setdiff(stage_names, Set(stage_names))
isempty(duplicate_stages) || throw(ArgumentError("Stages $duplicate_stages are duplicated."))

# verify if every stage has its associated tasks
missing_stages = setdiff(stage_names, keys(config))
isempty(missing_stages) || throw(ArgumentError("Stages $missing_stages are not configured."))

# TODO: verify that each task consists of multiple sub-tasks

return true
end

function get_stage_names(config)
map(x->x["name"], config["stages"])
end

function get_stage_config(config, stage_name)
stages_config = config["stages"]
idx = findfirst(stages_config) do stage
stage["name"] == stage_name
end
stages_config[idx]
end
28 changes: 28 additions & 0 deletions src/report.jl
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
function report_stage(stats, stage_config; root)
stage_run_info = stage_config["run"]

df = make_dataframe(stats, stage_run_info["metrics"])

reporter = stage_run_info["driver"]
if reporter == "csv"
for outpath in stage_run_info["out"]
isabspath(outpath) || (outpath = joinpath(root, outpath))
CSV.write(outpath, df)
end
else
throw(ArgumentError("Unsupported metrics reporter $reporter"))
end
end

function make_dataframe(stats, metrics_info)
filtered_stats = map(metrics_info) do k
k => map(stats) do task_item
string(get(task_item.second, k, ""))
end
end
data = [
"id" => map(first, stats),
filtered_stats...
]
DataFrame(data)
end
53 changes: 53 additions & 0 deletions workflow_specification.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
# Workflow specification version: 0
#
# Each workflow consists of multiple ordered stages.
# - A later stage may or may not depend on results of previous stages.
#
# Each stage consists of multiple independent tasks.
# - Each task should not depend on other tasks to run.
# - Each task can consists of one or multiple sub-tasks to run and collect different results
# - Sub-tasks can not be split into sub-sub-tasks.
# - Task has fields "name"(required) and "tags"(optional). These are used to generate unique ID.
# Task A and Task B may not have identical "name" and "tags" section; either "name" or "tags"
# section to be the same is allowed.
#
# - Each sub-task has a driver, which runs and collect the result, then pass to its parent task in
# JSON format
# - Task collects the JSON results from its sub-tasks, then pass to the stage handler
# - Stage handler collects all results from its tasks, and then generate the output


# (required) Workflow specification version
version = "0"

# (required) every workflow consists of multiple stages
[[stages]]
name = "<stage name>" # (required) used to track tasks in this stage

# (required) stage handler
[stages.run]
# (required) every stage handler should specify its driver
driver = "<handler driver>"
# (optional) some driver specific fields

# (optional) defines a task for stage <stage name>
[[<stage name>]]
name = "<task1 name>" # (required)
tags = ["<tag1>", "<tag2>", ...] # (optional)

# (required) task runner
[[<stage name>.run]]
# (required) every task runner should specify its driver
driver = "<runner driver>"
# (optional) some driver specific fields

# (optional) defines a task for stage <stage name>
[[<stage name>]]
name = "<task2 name>" # (required)
tags = ["<tag1>", "<tag2>", ...] # (optional)

# (required) task runner
[[<stage name>.run]]
# (required) every task runner should specify its driver
driver = "<runner driver>"
# (optional) some driver specific fields

0 comments on commit 7b21a19

Please sign in to comment.