Skip to content

Commit

Permalink
Fix fg tests under tsan
Browse files Browse the repository at this point in the history
Signed-off-by: Alexei Katranov <[email protected]>
  • Loading branch information
alexey-katranov committed Jun 16, 2021
1 parent 2526dcf commit 47ca4bd
Show file tree
Hide file tree
Showing 4 changed files with 13 additions and 6 deletions.
1 change: 1 addition & 0 deletions test/common/graph_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -506,6 +506,7 @@ void test_resets() {
// reset doesn't delete edge

tbb::flow::make_edge(b0,q0);
g.wait_for_all(); // TODO: invesigate why make_edge to buffer_node always creates a forwarding task
g.reset();
for(T i = 0; i < NN; ++i) {
b0.try_put(i);
Expand Down
3 changes: 1 addition & 2 deletions test/common/spin_barrier.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,13 +52,12 @@ void SpinWaitWhile(Predicate pred) {
}
}
}
std::atomic_thread_fence(std::memory_order_acquire);
}

//! Spin WHILE the condition is true.
template <typename T, typename C>
void SpinWaitWhileCondition(const std::atomic<T>& location, C comp) {
SpinWaitWhile([&] { return comp(location.load(std::memory_order_relaxed)); });
SpinWaitWhile([&] { return comp(location.load(std::memory_order_acquire)); });
}

//! Spin WHILE the value of the variable is equal to a given value
Expand Down
4 changes: 2 additions & 2 deletions test/tbb/test_flow_graph_priorities.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -256,7 +256,7 @@ struct AsyncActivity {
typedef async_node_type::gateway_type gateway_type;

struct work_type { data_type input; gateway_type* gateway; };
bool done;
std::atomic<bool> done;
concurrent_queue<work_type> my_queue;
std::thread my_service_thread;

Expand Down Expand Up @@ -410,7 +410,7 @@ struct execution_tracker_t {
prioritized_work_finished = false;
prioritized_work_interrupted = false;
}
std::thread::id prioritized_work_submitter;
std::atomic<std::thread::id> prioritized_work_submitter;
bool prioritized_work_started;
bool prioritized_work_finished;
bool prioritized_work_interrupted;
Expand Down
11 changes: 9 additions & 2 deletions test/tbb/test_flow_graph_whitebox.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,7 @@ void TestContinueNode() {
if (icnt == 0) {
// wait for node to count the message (or for the node body to execute, which would be wrong)
utils::SpinWaitWhile([&] {
tbb::spin_mutex::scoped_lock l(cnode.my_mutex);
return serial_continue_state0 == 0 && cnode.my_current_count == 0;
});
CHECK_MESSAGE( (serial_continue_state0 == 0), "Improperly released continue_node");
Expand Down Expand Up @@ -261,6 +262,7 @@ void TestFunctionNode() {

// rejecting
serial_fn_state0 = 0;
std::atomic<bool> rejected{ false };
std::thread t([&] {
g.reset(); // attach to the current arena
tbb::flow::make_edge(fnode0, qnode1);
Expand All @@ -270,10 +272,12 @@ void TestFunctionNode() {
CHECK_MESSAGE( (!fnode0.my_successors.empty()), "successor edge not added");
qnode0.try_put(1);
qnode0.try_put(2); // rejecting node should reject, reverse.
rejected = true;
g.wait_for_all();
});
utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); });
utils::SpinWaitWhileEq(rejected, false);
CHECK(fnode0.my_predecessors.empty() == false);
serial_fn_state0 = 2; // release function_node body.
t.join();
INFO(" reset");
Expand Down Expand Up @@ -302,6 +306,7 @@ void TestFunctionNode() {
INFO("\n");

serial_fn_state0 = 0; // make the function_node wait
rejected = false;
std::thread t2([&] {
g.reset(); // attach to the current arena

Expand All @@ -312,11 +317,13 @@ void TestFunctionNode() {
// now if we put an item to the queues the edges to the function_node will reverse.
INFO(" put_node(2)");
qnode0.try_put(2); // start queue node.
rejected = true;
g.wait_for_all();
});
utils::SpinWaitWhileEq(serial_fn_state0, 0); // waiting rejecting node to start
// wait for the edges to reverse
utils::SpinWaitWhile([&] { return fnode0.my_predecessors.empty(); });
utils::SpinWaitWhileEq(rejected, false);
CHECK(fnode0.my_predecessors.empty() == false);
g.my_context->cancel_group_execution();
// release the function_node
serial_fn_state0 = 2;
Expand Down

0 comments on commit 47ca4bd

Please sign in to comment.