Skip to content

Commit

Permalink
Clarify wait concurrency (ros2#59)
Browse files Browse the repository at this point in the history
* fix bug where "wait set empty" check was wrong

* clarify rcl_wait is thread-safe with unique sets

* style fixup

* explicitly capture const local variables

this is required on Windows, but not gcc/clang
  • Loading branch information
wjwwood committed Jun 2, 2016
1 parent 818b154 commit 17845ae
Show file tree
Hide file tree
Showing 3 changed files with 172 additions and 14 deletions.
10 changes: 5 additions & 5 deletions rcl/include/rcl/wait.h
Original file line number Diff line number Diff line change
Expand Up @@ -513,11 +513,11 @@ rcl_wait_set_resize_services(rcl_wait_set_t * wait_set, size_t size);
* comes first.
* Passing a timeout struct with uninitialized memory is undefined behavior.
*
* \TODO(wjwwood) this function should probably be thread-safe with itself but
* it's not clear to me what happens if the wait sets being
* waited on can be overlapping or not or if we can even check.
* This function is not thread-safe and cannot be called concurrently, even if
* the given wait sets are not the same and non-overlapping in contents.
* This function is thread-safe for unique wait sets with unique contents.
* This function cannot operate on the same wait set in multiple threads, and
* the wait sets may not share content.
* For example, calling rcl_wait in two threads on two different wait sets that
* both contain a single, shared guard condition is undefined behavior.
*
* \param[inout] wait_set the set of things to be waited on and to be pruned if not ready
* \param[in] timeout the duration to wait for the wait set to be ready, in nanoseconds
Expand Down
8 changes: 6 additions & 2 deletions rcl/src/rcl/wait.c
Original file line number Diff line number Diff line change
Expand Up @@ -505,8 +505,12 @@ rcl_wait(rcl_wait_set_t * wait_set, int64_t timeout)
RCL_SET_ERROR_MSG("wait set is invalid");
return RCL_RET_WAIT_SET_INVALID;
}
if (wait_set->size_of_subscriptions == 0 && wait_set->size_of_guard_conditions == 0 &&
wait_set->size_of_clients == 0 && wait_set->size_of_services == 0)
if (
wait_set->size_of_subscriptions == 0 &&
wait_set->size_of_guard_conditions == 0 &&
wait_set->size_of_timers == 0 &&
wait_set->size_of_clients == 0 &&
wait_set->size_of_services == 0)
{
RCL_SET_ERROR_MSG("wait set is empty");
return RCL_RET_WAIT_SET_EMPTY;
Expand Down
168 changes: 161 additions & 7 deletions rcl/test/rcl/test_wait.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include <atomic>
#include <chrono>
#include <future>
#include <sstream>
#include <thread>
#include <vector>

#include "gtest/gtest.h"

Expand Down Expand Up @@ -42,7 +45,7 @@ TEST(CLASSNAME(WaitSetTestFixture, RMW_IMPLEMENTATION), test_resize_to_zero) {
ret = rcl_wait_set_resize_subscriptions(&wait_set, 0);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();

EXPECT_EQ(wait_set.size_of_subscriptions, 0);
EXPECT_EQ(wait_set.size_of_subscriptions, 0ull);

ret = rcl_wait_set_fini(&wait_set);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
Expand All @@ -58,6 +61,7 @@ TEST(CLASSNAME(WaitSetTestFixture, RMW_IMPLEMENTATION), finite_timeout) {
std::chrono::steady_clock::time_point before_sc = std::chrono::steady_clock::now();
ret = rcl_wait(&wait_set, timeout);
std::chrono::steady_clock::time_point after_sc = std::chrono::steady_clock::now();
ASSERT_EQ(RCL_RET_TIMEOUT, ret) << rcl_get_error_string_safe();
// Check time
int64_t diff = std::chrono::duration_cast<std::chrono::nanoseconds>(after_sc - before_sc).count();
EXPECT_LE(diff, timeout + TOLERANCE);
Expand Down Expand Up @@ -123,8 +127,7 @@ TEST(CLASSNAME(WaitSetTestFixture, RMW_IMPLEMENTATION), zero_timeout) {
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
ret = rcl_wait_set_fini(&wait_set);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
}
);
});


// Time spent during wait should be negligible.
Expand All @@ -138,6 +141,159 @@ TEST(CLASSNAME(WaitSetTestFixture, RMW_IMPLEMENTATION), zero_timeout) {
EXPECT_LE(diff, TOLERANCE);
}

// Check rcl_wait can be called in many threads, each with unique wait sets and resources.
TEST(CLASSNAME(WaitSetTestFixture, RMW_IMPLEMENTATION), multi_wait_set_threaded) {
rcl_ret_t ret;
const size_t number_of_threads = 20; // concurrent waits
const size_t count_target = 10; // number of times each wait should wake up before being "done"
const size_t retry_limit = 100; // number of times to retry when a timeout occurs waiting
const uint64_t wait_period = RCL_MS_TO_NS(1); // timeout passed to rcl_wait each try
const std::chrono::milliseconds trigger_period(2); // period between each round of triggers
struct TestSet
{
std::atomic<size_t> wake_count;
rcl_wait_set_t wait_set;
rcl_guard_condition_t guard_condition;
std::thread thread;
size_t thread_id;
};
std::vector<TestSet> test_sets(number_of_threads);
// Setup common function for waiting on the triggered guard conditions.
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
auto wait_func_factory = [count_target, retry_limit, wait_period](TestSet & test_set)
{
return [&test_set, count_target, retry_limit, wait_period]() {
while (test_set.wake_count < count_target) {
bool change_detected = false;
size_t wake_try_count = 0;
while (wake_try_count < retry_limit) {
wake_try_count++;
rcl_ret_t ret;
ret = rcl_wait_set_clear_guard_conditions(&test_set.wait_set);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
ret = rcl_wait_set_add_guard_condition(&test_set.wait_set, &test_set.guard_condition);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
{
std::stringstream ss;
ss << "[thread " << test_set.thread_id << "] Waiting (try #" << wake_try_count << ")";
// printf("%s\n", ss.str().c_str());
}
ret = rcl_wait(&test_set.wait_set, wait_period);
if (ret != RCL_RET_TIMEOUT) {
{
std::stringstream ss;
ss << "[thread " << test_set.thread_id << "] Wakeup (try #" << wake_try_count << ")";
// printf("%s\n", ss.str().c_str());
}
change_detected = true;
// if not timeout, then the single guard condition should be set
if (!test_set.wait_set.guard_conditions[0]) {
test_set.wake_count.store(count_target + 1); // indicates an error
ASSERT_NE(nullptr, test_set.wait_set.guard_conditions[0])
<< "[thread " << test_set.thread_id
<< "] expected guard condition to be marked ready after non-timeout wake up";
}
// no need to wait again
break;
} else {
std::stringstream ss;
ss << "[thread " << test_set.thread_id << "] Timeout (try #" << wake_try_count << ")";
// printf("%s\n", ss.str().c_str());
}
}
if (!change_detected) {
test_set.wake_count.store(count_target + 1); // indicates an error
ASSERT_TRUE(change_detected);
}
test_set.wake_count++;
}
};
};
// *INDENT-ON*
// Setup each test set.
for (auto & test_set : test_sets) {
rcl_ret_t ret;
// init the wake count
test_set.wake_count.store(0);
// setup the guard condition
test_set.guard_condition = rcl_get_zero_initialized_guard_condition();
ret = rcl_guard_condition_init(
&test_set.guard_condition, rcl_guard_condition_get_default_options());
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
// setup the wait set
test_set.wait_set = rcl_get_zero_initialized_wait_set();
ret = rcl_wait_set_init(&test_set.wait_set, 0, 1, 0, 0, 0, rcl_get_default_allocator());
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
ret = rcl_wait_set_add_guard_condition(&test_set.wait_set, &test_set.guard_condition);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
test_set.thread_id = 0;
}
// Setup safe tear-down.
auto scope_exit = make_scope_exit([&test_sets]() {
for (auto & test_set : test_sets) {
rcl_ret_t ret = rcl_guard_condition_fini(&test_set.guard_condition);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
ret = rcl_wait_set_fini(&test_set.wait_set);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
}
});
// Now kick off all the threads.
size_t thread_enumeration = 0;
for (auto & test_set : test_sets) {
thread_enumeration++;
test_set.thread_id = thread_enumeration;
test_set.thread = std::thread(wait_func_factory(test_set));
}
// Loop, triggering every trigger_period until the threads are done.
// *INDENT-OFF* (prevent uncrustify from making unnecessary indents here)
auto loop_test = [&test_sets, count_target]() -> bool {
for (const auto & test_set : test_sets) {
if (test_set.wake_count.load() < count_target) {
return true;
}
}
return false;
};
// *INDENT-ON*
// auto print_state = [&test_sets](std::string prefix) {
// std::stringstream ss;
// ss << prefix << "[";
// size_t enumerate = 0;
// for (const auto & test_set : test_sets) {
// enumerate++;
// if (enumerate != 1) {
// ss << ", ";
// }
// ss << std::setfill('0') << std::setw(2) << test_set.wake_count.load();
// }
// ss << "]";
// printf("%s\n", ss.str().c_str());
// };
size_t loop_count = 0;
while (loop_test()) {
loop_count++;
// print_state("triggering, current states: ");
for (const auto & test_set : test_sets) {
ret = rcl_trigger_guard_condition(&test_set.guard_condition);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
}
std::this_thread::sleep_for(trigger_period);
}
// print_state("final states: ");
// Join the threads.
for (auto & test_set : test_sets) {
test_set.thread.join();
}
// Ensure the individual wake counts match the target (otherwise there was a problem).
for (auto & test_set : test_sets) {
ASSERT_EQ(count_target, test_set.wake_count.load());
}
// printf(
// "number of loops to get '%zu' wake ups on all threads: %zu\n",
// count_target,
// loop_count);
}

// Check the interaction of a guard condition and a negative timeout by
// triggering a guard condition in a separate thread
TEST(CLASSNAME(WaitSetTestFixture, RMW_IMPLEMENTATION), guard_condition) {
Expand All @@ -155,8 +311,7 @@ TEST(CLASSNAME(WaitSetTestFixture, RMW_IMPLEMENTATION), guard_condition) {
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
ret = rcl_guard_condition_fini(&guard_cond);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string_safe();
}
);
});

std::promise<rcl_ret_t> p;

Expand All @@ -171,8 +326,7 @@ TEST(CLASSNAME(WaitSetTestFixture, RMW_IMPLEMENTATION), guard_condition) {
trigger_diff = std::chrono::duration_cast<std::chrono::nanoseconds>(
after_trigger - before_trigger);
p.set_value(ret);
}
);
});
auto f = p.get_future();

std::chrono::steady_clock::time_point before_sc = std::chrono::steady_clock::now();
Expand Down

0 comments on commit 17845ae

Please sign in to comment.