From 47ca4bd8d2f15c1ffcc242e5a532317298a44b4b Mon Sep 17 00:00:00 2001 From: Alexei Katranov Date: Wed, 16 Jun 2021 11:04:34 +0300 Subject: [PATCH] Fix fg tests under tsan Signed-off-by: Alexei Katranov --- test/common/graph_utils.h | 1 + test/common/spin_barrier.h | 3 +-- test/tbb/test_flow_graph_priorities.cpp | 4 ++-- test/tbb/test_flow_graph_whitebox.cpp | 11 +++++++++-- 4 files changed, 13 insertions(+), 6 deletions(-) diff --git a/test/common/graph_utils.h b/test/common/graph_utils.h index 2ee191caef..a19bcddfda 100644 --- a/test/common/graph_utils.h +++ b/test/common/graph_utils.h @@ -506,6 +506,7 @@ void test_resets() { // reset doesn't delete edge tbb::flow::make_edge(b0,q0); + g.wait_for_all(); // TODO: invesigate why make_edge to buffer_node always creates a forwarding task g.reset(); for(T i = 0; i < NN; ++i) { b0.try_put(i); diff --git a/test/common/spin_barrier.h b/test/common/spin_barrier.h index 77936646a6..59eff76245 100644 --- a/test/common/spin_barrier.h +++ b/test/common/spin_barrier.h @@ -52,13 +52,12 @@ void SpinWaitWhile(Predicate pred) { } } } - std::atomic_thread_fence(std::memory_order_acquire); } //! Spin WHILE the condition is true. template void SpinWaitWhileCondition(const std::atomic& location, C comp) { - SpinWaitWhile([&] { return comp(location.load(std::memory_order_relaxed)); }); + SpinWaitWhile([&] { return comp(location.load(std::memory_order_acquire)); }); } //! Spin WHILE the value of the variable is equal to a given value diff --git a/test/tbb/test_flow_graph_priorities.cpp b/test/tbb/test_flow_graph_priorities.cpp index 3e4baf378e..499ba40685 100644 --- a/test/tbb/test_flow_graph_priorities.cpp +++ b/test/tbb/test_flow_graph_priorities.cpp @@ -256,7 +256,7 @@ struct AsyncActivity { typedef async_node_type::gateway_type gateway_type; struct work_type { data_type input; gateway_type* gateway; }; - bool done; + std::atomic done; concurrent_queue my_queue; std::thread my_service_thread; @@ -410,7 +410,7 @@ struct execution_tracker_t { prioritized_work_finished = false; prioritized_work_interrupted = false; } - std::thread::id prioritized_work_submitter; + std::atomic prioritized_work_submitter; bool prioritized_work_started; bool prioritized_work_finished; bool prioritized_work_interrupted; diff --git a/test/tbb/test_flow_graph_whitebox.cpp b/test/tbb/test_flow_graph_whitebox.cpp index 2f9123678c..567ba14ac2 100644 --- a/test/tbb/test_flow_graph_whitebox.cpp +++ b/test/tbb/test_flow_graph_whitebox.cpp @@ -202,6 +202,7 @@ void TestContinueNode() { if (icnt == 0) { // wait for node to count the message (or for the node body to execute, which would be wrong) utils::SpinWaitWhile([&] { + tbb::spin_mutex::scoped_lock l(cnode.my_mutex); return serial_continue_state0 == 0 && cnode.my_current_count == 0; }); CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node"); @@ -261,6 +262,7 @@ void TestFunctionNode() { // rejecting serial_fn_state0 = 0; + std::atomic rejected{ false }; std::thread t([&] { g.reset(); // attach to the current arena tbb::flow::make_edge(fnode0, qnode1); @@ -270,10 +272,12 @@ void TestFunctionNode() { CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added"); qnode0.try_put(1); qnode0.try_put(2); // rejecting node should reject, reverse. + rejected = true; g.wait_for_all(); }); utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start - utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); }); + utils::SpinWaitWhileEq(rejected, false); + CHECK(fnode0.my_predecessors.empty() == false); serial_fn_state0 = 2; // release function_node body. t.join(); INFO(" reset"); @@ -302,6 +306,7 @@ void TestFunctionNode() { INFO("\n"); serial_fn_state0 = 0; // make the function_node wait + rejected = false; std::thread t2([&] { g.reset(); // attach to the current arena @@ -312,11 +317,13 @@ void TestFunctionNode() { // now if we put an item to the queues the edges to the function_node will reverse. INFO(" put_node(2)"); qnode0.try_put(2); // start queue node. + rejected = true; g.wait_for_all(); }); utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start // wait for the edges to reverse - utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); }); + utils::SpinWaitWhileEq(rejected, false); + CHECK(fnode0.my_predecessors.empty() == false); g.my_context->cancel_group_execution(); // release the function_node serial_fn_state0 = 2;