From 99f0f79cbdb9585aa6be46bf9d0cff46911f2f08 Mon Sep 17 00:00:00 2001 From: "Luiz M. Faria" Date: Fri, 17 Nov 2023 15:46:46 +0100 Subject: [PATCH] handle `Task`s in the `data` of a `DataFlowTask` handle `Task`s in the `data` format --- .gitignore | 1 + src/dag.jl | 23 +++++++++++------------ src/dataflowtask.jl | 4 ++-- src/taskgraph.jl | 2 +- test/dataflowtask_test.jl | 15 +++++++++++++++ 5 files changed, 30 insertions(+), 15 deletions(-) diff --git a/.gitignore b/.gitignore index cf6bf17f..82c60a45 100644 --- a/.gitignore +++ b/.gitignore @@ -22,3 +22,4 @@ JuliaSysimage.dll Manifest.toml TiledFactorization docs/.DS_Store +docs/slides/** diff --git a/src/dag.jl b/src/dag.jl index 12d48b71..88b144d7 100644 --- a/src/dag.jl +++ b/src/dag.jl @@ -179,6 +179,7 @@ and outgoing edges are updated. function update_edges!(dag::DAG, nodej) transitively_connected = dag._buffer empty!(transitively_connected) + hastask = any(x -> isa(x, Task), data(nodej)) # update dependencies from newer to older and reinfornce transitivity by # skipping predecessors of nodes which are already connected for (nodei, _) in Iterators.reverse(dag) @@ -189,21 +190,19 @@ function update_edges!(dag::DAG, nodej) end ti = tag(nodei) (ti ∈ transitively_connected) && continue - # if a DataFlowTask is in data, add the edge directly to the DAG - @assert nodei ≤ nodej "i = $(nodei.tag), j = $(nodej.tag)" - dep = data_dependency(nodei, nodej) - dep || continue + # tasks are handled differently when they appear in the data in that + # they are checked directly agains the nodej.task field + dep = false + if hastask + for d in data(nodej) + d === nodei.task && (dep = true; break) + end + end + dep || data_dependency(nodei, nodej) || continue addedge!(dag, nodei, nodej) update_transitively_connected!(transitively_connected, nodei, dag) - # addedge_transitive!(dag,nodei,nodej) - end - # if a DataFlowTask is in data and it is still active, add the edge directly to the DAG - for d in data(nodej) - (d isa DataFlowTask) && - (tag(d) ∉ transitively_connected) && - haskey(dag.inoutlist, d) && - addedge!(dag, d, nodej) end + return dag end diff --git a/src/dataflowtask.jl b/src/dataflowtask.jl index d3dc4caa..2a467c95 100644 --- a/src/dataflowtask.jl +++ b/src/dataflowtask.jl @@ -115,9 +115,9 @@ end @noinline function _data_dependency(datai, modei, dataj, modej) for (di, mi) in zip(datai, modei) - (di isa DataFlowTask) && continue + (di isa Task) && continue # Tasks are handled differently for (dj, mj) in zip(dataj, modej) - (dj isa DataFlowTask) && continue + (dj isa Task) && continue # Tasks are handled differently mi == READ && mj == READ && continue if memory_overlap(di, dj) return true diff --git a/src/taskgraph.jl b/src/taskgraph.jl index 095cdea2..8a5f59b9 100644 --- a/src/taskgraph.jl +++ b/src/taskgraph.jl @@ -143,7 +143,7 @@ function stop_dag_cleaner(tg::TaskGraph = get_active_taskgraph()) return tg.dag_cleaner else # expected result, task is running put!(tg.finished, Stop()) - # wait for t to stop before continuining + # wait for t to stop before continuing wait(t) end return tg.dag_cleaner diff --git a/test/dataflowtask_test.jl b/test/dataflowtask_test.jl index 5188cb09..8d444b3e 100644 --- a/test/dataflowtask_test.jl +++ b/test/dataflowtask_test.jl @@ -135,3 +135,18 @@ end @test typeof(s) == Task @inferred test_seq_mode(x) end + +@testset "Fetching task" begin + DataFlowTasks.set_active_taskgraph!(DataFlowTasks.TaskGraph()) + d1 = @dspawn (sleep(0.01); rand(10)) label = "sleep" + d2 = @dspawn fill!(fetch(@R(d1)), 0) label = "fill" + @test fetch(d2) |> sum == 0 + # make sure that d2 depends on d1 by checking the length of the critical + # path + log_info = DataFlowTasks.@log begin + d1 = @dspawn (sleep(0.01); rand(10)) label = "sleep" + d2 = @dspawn fill!(fetch(@R(d1)), 0) label = "fill" + fetch(d2) + end + @test length(DataFlowTasks.longest_path(log_info)) == 2 +end