Skip to content

Commit

Permalink
Add functions for waiting for publishers and subscribers (#907)
Browse files Browse the repository at this point in the history
These blocking functions are especially useful for tests where we want
to wait for some number of publishers/subscribers to be available
before proceeding with some other checks.

Update tests to use new graph API
We can simplify some tests by reusing the new graph functions.

Signed-off-by: Jacob Perron <[email protected]>
  • Loading branch information
jacobperron authored Apr 1, 2021
1 parent b7784eb commit 414fdbd
Show file tree
Hide file tree
Showing 6 changed files with 401 additions and 166 deletions.
93 changes: 93 additions & 0 deletions rcl/include/rcl/graph.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ extern "C"
#include <rmw/get_topic_names_and_types.h>
#include <rmw/topic_endpoint_info_array.h>

#include "rcutils/time.h"
#include "rcutils/types.h"

#include "rosidl_runtime_c/service_type_support_struct.h"
Expand Down Expand Up @@ -581,6 +582,98 @@ rcl_count_subscribers(
const char * topic_name,
size_t * count);

/// Wait for there to be a specified number of publishers on a given topic.
/**
* The `node` parameter must point to a valid node.
* The nodes graph guard condition is used by this function, and therefore the caller should
* take care not to use the guard condition concurrently in any other wait sets.
*
* The `allocator` parameter must point to a valid allocator.
*
* The `topic_name` parameter must not be `NULL`, and must not be an empty string.
* It should also follow the topic name rules.
*
* This function blocks and will return when the number of publishers for `topic_name`
* is greater than or equal to the `count` parameter, or the specified `timeout` is reached.
*
* The `timeout` parameter is in nanoseconds.
* The timeout is based on system time elapsed.
* A negative value disables the timeout (i.e. this function blocks until the number of
* publishers is greater than or equals to `count`).
*
* The `success` parameter must point to a valid bool.
* The `success` parameter is the output for this function and will be set.
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Maybe [1]
* <i>[1] implementation may need to protect the data structure with a lock</i>
*
* \param[in] node the handle to the node being used to query the ROS graph
* \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events
* \param[in] topic_name the name of the topic in question
* \param[in] count number of publishers to wait for
* \param[in] timeout maximum duration to wait for publishers
* \param[out] success `true` if the number of publishers is equal to or greater than count, or
* `false` if a timeout occurred waiting for publishers.
* \return #RCL_RET_OK if there was no errors, or
* \return #RCL_RET_NODE_INVALID if the node is invalid, or
* \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or
* \return #RCL_RET_TIMEOUT if a timeout occurs before the number of publishers is detected, or
* \return #RCL_RET_ERROR if an unspecified error occurred.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_wait_for_publishers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t count,
rcutils_duration_value_t timeout,
bool * success);

/// Wait for there to be a specified number of subscribers on a given topic.
/**
* \see rcl_wait_for_publishers
*
* <hr>
* Attribute | Adherence
* ------------------ | -------------
* Allocates Memory | Yes
* Thread-Safe | No
* Uses Atomics | No
* Lock-Free | Maybe [1]
* <i>[1] implementation may need to protect the data structure with a lock</i>
*
* \param[in] node the handle to the node being used to query the ROS graph
* \param[in] allocator to allocate space for the rcl_wait_set_t used to wait for graph events
* \param[in] topic_name the name of the topic in question
* \param[in] count number of subscribers to wait for
* \param[in] timeout maximum duration to wait for subscribers
* \param[out] success `true` if the number of subscribers is equal to or greater than count, or
* `false` if a timeout occurred waiting for subscribers.
* \return #RCL_RET_OK if there was no errors, or
* \return #RCL_RET_NODE_INVALID if the node is invalid, or
* \return #RCL_RET_INVALID_ARGUMENT if any arguments are invalid, or
* \return #RCL_RET_TIMEOUT if a timeout occurs before the number of subscribers is detected, or
* \return #RCL_RET_ERROR if an unspecified error occurred.
*/
RCL_PUBLIC
RCL_WARN_UNUSED
rcl_ret_t
rcl_wait_for_subscribers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t count,
rcutils_duration_value_t timeout,
bool * success);

/// Return a list of all publishers to a topic.
/**
* The `node` parameter must point to a valid node.
Expand Down
177 changes: 177 additions & 0 deletions rcl/src/rcl/graph.c
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,12 @@ extern "C"
#include "rcl/graph.h"

#include "rcl/error_handling.h"
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rcutils/allocator.h"
#include "rcutils/error_handling.h"
#include "rcutils/macros.h"
#include "rcutils/time.h"
#include "rcutils/types.h"
#include "rmw/error_handling.h"
#include "rmw/get_node_info_and_types.h"
Expand Down Expand Up @@ -452,6 +456,179 @@ rcl_count_subscribers(
return rcl_convert_rmw_ret_to_rcl_ret(rmw_ret);
}

typedef rcl_ret_t (* count_entities_func_t)(
const rcl_node_t * node,
const char * topic_name,
size_t * count);

rcl_ret_t
_rcl_wait_for_entities(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t expected_count,
rcutils_duration_value_t timeout,
bool * success,
count_entities_func_t count_entities_func)
{
if (!rcl_node_is_valid(node)) {
return RCL_RET_NODE_INVALID;
}
RCL_CHECK_ALLOCATOR_WITH_MSG(allocator, "invalid allocator", return RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(topic_name, RCL_RET_INVALID_ARGUMENT);
RCL_CHECK_ARGUMENT_FOR_NULL(success, RCL_RET_INVALID_ARGUMENT);

rcl_ret_t ret = RCL_RET_OK;
*success = false;

// We can avoid waiting if there are already the expected number of publishers
size_t count = 0u;
ret = count_entities_func(node, topic_name, &count);
if (ret != RCL_RET_OK) {
// Error message already set
return ret;
}
if (expected_count <= count) {
*success = true;
return RCL_RET_OK;
}

// Create a wait set and add the node graph guard condition to it
rcl_wait_set_t wait_set = rcl_get_zero_initialized_wait_set();
ret = rcl_wait_set_init(
&wait_set, 0, 1, 0, 0, 0, 0, node->context, *allocator);
if (ret != RCL_RET_OK) {
// Error message already set
return ret;
}

const rcl_guard_condition_t * guard_condition = rcl_node_get_graph_guard_condition(node);
if (!guard_condition) {
// Error message already set
ret = RCL_RET_ERROR;
goto cleanup;
}

// Add it to the wait set
ret = rcl_wait_set_add_guard_condition(&wait_set, guard_condition, NULL);
if (ret != RCL_RET_OK) {
// Error message already set
goto cleanup;
}

// Get current time
// We use system time to be consistent with the clock used by rcl_wait()
rcutils_time_point_value_t start;
rcutils_ret_t time_ret = rcutils_system_time_now(&start);
if (time_ret != RCUTILS_RET_OK) {
rcutils_error_string_t error = rcutils_get_error_string();
rcutils_reset_error();
RCL_SET_ERROR_MSG(error.str);
ret = RCL_RET_ERROR;
goto cleanup;
}

// Wait for expected count or timeout
rcl_ret_t wait_ret;
while (true) {
// Use separate 'wait_ret' code to avoid returning spurious TIMEOUT value
wait_ret = rcl_wait(&wait_set, timeout);
if (wait_ret != RCL_RET_OK && wait_ret != RCL_RET_TIMEOUT) {
// Error message already set
ret = wait_ret;
break;
}

// Check count
ret = count_entities_func(node, topic_name, &count);
if (ret != RCL_RET_OK) {
// Error already set
break;
}
if (expected_count <= count) {
*success = true;
break;
}

// If we're not waiting indefinitely, compute time remaining
if (timeout >= 0) {
rcutils_time_point_value_t now;
time_ret = rcutils_system_time_now(&now);
if (time_ret != RCUTILS_RET_OK) {
rcutils_error_string_t error = rcutils_get_error_string();
rcutils_reset_error();
RCL_SET_ERROR_MSG(error.str);
ret = RCL_RET_ERROR;
break;
}
timeout = timeout - (now - start);
if (timeout <= 0) {
ret = RCL_RET_TIMEOUT;
break;
}
}

// Clear wait set for next iteration
ret = rcl_wait_set_clear(&wait_set);
if (ret != RCL_RET_OK) {
// Error message already set
break;
}
}

rcl_ret_t cleanup_ret;
cleanup:
// Cleanup
cleanup_ret = rcl_wait_set_fini(&wait_set);
if (cleanup_ret != RCL_RET_OK) {
// If we got two unexpected errors, return the earlier error
if (ret != RCL_RET_OK && ret != RCL_RET_TIMEOUT) {
// Error message already set
ret = cleanup_ret;
}
}

return ret;
}

rcl_ret_t
rcl_wait_for_publishers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t expected_count,
rcutils_duration_value_t timeout,
bool * success)
{
return _rcl_wait_for_entities(
node,
allocator,
topic_name,
expected_count,
timeout,
success,
rcl_count_publishers);
}

rcl_ret_t
rcl_wait_for_subscribers(
const rcl_node_t * node,
rcl_allocator_t * allocator,
const char * topic_name,
const size_t expected_count,
rcutils_duration_value_t timeout,
bool * success)
{
return _rcl_wait_for_entities(
node,
allocator,
topic_name,
expected_count,
timeout,
success,
rcl_count_subscribers);
}

typedef rmw_ret_t (* get_topic_endpoint_info_func_t)(
const rmw_node_t * node,
rcutils_allocator_t * allocator,
Expand Down
Loading

0 comments on commit 414fdbd

Please sign in to comment.