Skip to content

Commit

Permalink
Use rclcpp::guard_condition
Browse files Browse the repository at this point in the history
Signed-off-by: Mauro Passerino <[email protected]>
  • Loading branch information
Mauro Passerino committed Apr 1, 2021
1 parent e7e3504 commit b9bec69
Show file tree
Hide file tree
Showing 35 changed files with 191 additions and 328 deletions.
7 changes: 3 additions & 4 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -524,9 +524,8 @@ class Executor
std::atomic_bool spinning;

/// Guard condition for signaling the rmw layer to wake up for special events.
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();

std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;
GuardCondition::SharedPtr interrupt_guard_condition_;
GuardCondition::SharedPtr shutdown_guard_condition_;

/// Wait set for managing entities that the rmw layer waits on.
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
Expand All @@ -548,7 +547,7 @@ class Executor
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rcl_guard_condition_t *,
GuardCondition::SharedPtr,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class StaticExecutorEntitiesCollector final
init(
rcl_wait_set_t * p_wait_set,
rclcpp::memory_strategy::MemoryStrategy::SharedPtr memory_strategy,
rcl_guard_condition_t * executor_guard_condition);
GuardCondition::SharedPtr executor_guard_condition);

/// Finalize StaticExecutorEntitiesCollector to clear resources
RCLCPP_PUBLIC
Expand Down Expand Up @@ -326,7 +326,7 @@ class StaticExecutorEntitiesCollector final
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rcl_guard_condition_t *,
GuardCondition::SharedPtr,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -192,7 +192,7 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor
}
std::chrono::nanoseconds timeout_left = timeout_ns;

entities_collector_->init(&wait_set_, memory_strategy_, &interrupt_guard_condition_);
entities_collector_->init(&wait_set_, memory_strategy_, interrupt_guard_condition_);
RCLCPP_SCOPE_EXIT(entities_collector_->fini());

while (rclcpp::ok(this->context_)) {
Expand Down
24 changes: 3 additions & 21 deletions rclcpp/include/rclcpp/experimental/subscription_intra_process.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -81,16 +81,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
allocator);

// Create the guard condition.
rcl_guard_condition_options_t guard_condition_options =
rcl_guard_condition_get_default_options();

gc_ = rcl_get_zero_initialized_guard_condition();
rcl_ret_t ret = rcl_guard_condition_init(
&gc_, context->get_rcl_context().get(), guard_condition_options);

if (RCL_RET_OK != ret) {
throw std::runtime_error("SubscriptionIntraProcess init error initializing guard condition");
}
gc_ = std::make_shared<GuardCondition>(context);

TRACEPOINT(
rclcpp_subscription_callback_added,
Expand All @@ -104,15 +95,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
#endif
}

~SubscriptionIntraProcess()
{
if (rcl_guard_condition_fini(&gc_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to destroy guard condition: %s",
rcutils_get_error_string().str);
}
}
~SubscriptionIntraProcess() = default;

bool
is_ready(rcl_wait_set_t * wait_set)
Expand Down Expand Up @@ -168,8 +151,7 @@ class SubscriptionIntraProcess : public SubscriptionIntraProcessBase
void
trigger_guard_condition()
{
rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
(void)ret;
gc_->trigger();
}

template<typename T>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "rcl/error_handling.h"

#include "rclcpp/guard_condition.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"

Expand Down Expand Up @@ -76,7 +77,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;
GuardCondition::SharedPtr gc_;

private:
virtual void
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/graph_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down Expand Up @@ -187,7 +188,7 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
mutable std::mutex node_graph_interfaces_mutex_;
std::vector<rclcpp::node_interfaces::NodeGraphInterface *> node_graph_interfaces_;

rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
GuardCondition::SharedPtr interrupt_guard_condition_;
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
};

Expand Down
8 changes: 7 additions & 1 deletion rclcpp/include/rclcpp/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ class GuardCondition
RCLCPP_PUBLIC
explicit GuardCondition(
rclcpp::Context::SharedPtr context =
rclcpp::contexts::get_global_default_context());
rclcpp::contexts::get_global_default_context(),
rcl_guard_condition_options_t guard_condition_options =
rcl_guard_condition_get_default_options());

RCLCPP_PUBLIC
virtual
Expand Down Expand Up @@ -89,6 +91,10 @@ class GuardCondition
bool
exchange_in_use_by_wait_set_state(bool in_use_state);

RCLCPP_PUBLIC
bool
add_to_wait_set(rcl_wait_set_t * wait_set);

protected:
rclcpp::Context::SharedPtr context_;
rcl_guard_condition_t rcl_guard_condition_;
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ class RCLCPP_PUBLIC MemoryStrategy
virtual void clear_handles() = 0;
virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0;

virtual void add_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void
add_guard_condition(const GuardCondition::SharedPtr guard_condition) = 0;

virtual void remove_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void
remove_guard_condition(const GuardCondition::SharedPtr guard_condition) = 0;

virtual void
get_next_subscription(
Expand Down
10 changes: 7 additions & 3 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -104,8 +104,8 @@ class NodeBase : public NodeBaseInterface
get_associated_with_executor_atomic() override;

RCLCPP_PUBLIC
rcl_guard_condition_t *
get_notify_guard_condition() override;
rclcpp::GuardCondition::SharedPtr
get_notify_guard_condition() const override;

RCLCPP_PUBLIC
std::unique_lock<std::recursive_mutex>
Expand All @@ -122,6 +122,10 @@ class NodeBase : public NodeBaseInterface
resolve_topic_or_service_name(
const std::string & name, bool is_service, bool only_expand = false) const override;

RCLCPP_PUBLIC
void
trigger_notify_guard_condition() const override;

private:
RCLCPP_DISABLE_COPY(NodeBase)

Expand All @@ -138,7 +142,7 @@ class NodeBase : public NodeBaseInterface

/// Guard condition for notifying the Executor of changes to this node.
mutable std::recursive_mutex notify_guard_condition_mutex_;
rcl_guard_condition_t notify_guard_condition_ = rcl_get_zero_initialized_guard_condition();
GuardCondition::SharedPtr notify_guard_condition_;
bool notify_guard_condition_is_valid_;
};

Expand Down
12 changes: 9 additions & 3 deletions rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "rclcpp/callback_group.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp"

Expand Down Expand Up @@ -138,12 +139,12 @@ class NodeBaseInterface
/**
* For example, this should be notified when a publisher is added or removed.
*
* \return the rcl_guard_condition_t if it is valid, else nullptr
* \return the GuardCondition::SharedPtr if it is valid, else nullptr
*/
RCLCPP_PUBLIC
virtual
rcl_guard_condition_t *
get_notify_guard_condition() = 0;
rclcpp::GuardCondition::SharedPtr
get_notify_guard_condition() const = 0;

/// Acquire and return a scoped lock that protects the notify guard condition.
/** This should be used when triggering the notify guard condition. */
Expand All @@ -170,6 +171,11 @@ class NodeBaseInterface
std::string
resolve_topic_or_service_name(
const std::string & name, bool is_service, bool only_expand = false) const = 0;

RCLCPP_PUBLIC
virtual
void
trigger_notify_guard_condition() const = 0;
};

} // namespace node_interfaces
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
allocator_ = std::make_shared<VoidAlloc>();
}

void add_guard_condition(const rcl_guard_condition_t * guard_condition) override
void add_guard_condition(const GuardCondition::SharedPtr guard_condition) override
{
for (const auto & existing_guard_condition : guard_conditions_) {
if (existing_guard_condition == guard_condition) {
Expand All @@ -71,7 +71,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
guard_conditions_.push_back(guard_condition);
}

void remove_guard_condition(const rcl_guard_condition_t * guard_condition) override
void remove_guard_condition(const GuardCondition::SharedPtr guard_condition) override
{
for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
if (*it == guard_condition) {
Expand Down Expand Up @@ -240,7 +240,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}

for (auto guard_condition : guard_conditions_) {
if (rcl_wait_set_add_guard_condition(wait_set, guard_condition, NULL) != RCL_RET_OK) {
if (guard_condition->add_to_wait_set(wait_set) == false) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Couldn't add guard_condition to wait set: %s",
Expand Down Expand Up @@ -504,7 +504,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
using VectorRebind =
std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;

VectorRebind<const rcl_guard_condition_t *> guard_conditions_;
VectorRebind<GuardCondition::SharedPtr> guard_conditions_;

VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
Expand Down
48 changes: 9 additions & 39 deletions rclcpp/src/rclcpp/executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,7 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
// Store the context for later use.
context_ = options.context;

rcl_guard_condition_options_t guard_condition_options = rcl_guard_condition_get_default_options();
rcl_ret_t ret = rcl_guard_condition_init(
&interrupt_guard_condition_, context_->get_rcl_context().get(), guard_condition_options);
if (RCL_RET_OK != ret) {
throw_from_rcl_error(ret, "Failed to create interrupt guard condition in Executor constructor");
}
interrupt_guard_condition_ = std::make_shared<GuardCondition>(context_);

context_->on_shutdown(
[weak_gc = std::weak_ptr<rclcpp::GuardCondition>{shutdown_guard_condition_}]() {
Expand All @@ -66,13 +61,13 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)

// The number of guard conditions is always at least 2: 1 for the ctrl-c guard cond,
// and one for the executor's guard cond (interrupt_guard_condition_)
memory_strategy_->add_guard_condition(&shutdown_guard_condition_->get_rcl_guard_condition());
memory_strategy_->add_guard_condition(shutdown_guard_condition_);

// Put the executor's guard condition in
memory_strategy_->add_guard_condition(&interrupt_guard_condition_);
memory_strategy_->add_guard_condition(interrupt_guard_condition_);
rcl_allocator_t allocator = memory_strategy_->get_allocator();

ret = rcl_wait_set_init(
rcl_ret_t ret = rcl_wait_set_init(
&wait_set_,
0, 2, 0, 0, 0, 0,
context_->get_rcl_context().get(),
Expand All @@ -82,12 +77,6 @@ Executor::Executor(const rclcpp::ExecutorOptions & options)
"rclcpp",
"failed to create wait set: %s", rcl_get_error_string().str);
rcl_reset_error();
if (rcl_guard_condition_fini(&interrupt_guard_condition_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"failed to destroy guard condition: %s", rcl_get_error_string().str);
rcl_reset_error();
}
throw_from_rcl_error(ret, "Failed to create wait set in Executor constructor");
}
}
Expand Down Expand Up @@ -129,15 +118,8 @@ Executor::~Executor()
"failed to destroy wait set: %s", rcl_get_error_string().str);
rcl_reset_error();
}
// Finalize the interrupt guard condition.
if (rcl_guard_condition_fini(&interrupt_guard_condition_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"failed to destroy guard condition: %s", rcl_get_error_string().str);
rcl_reset_error();
}
// Remove and release the sigint guard condition
memory_strategy_->remove_guard_condition(&shutdown_guard_condition_->get_rcl_guard_condition());
memory_strategy_->remove_guard_condition(shutdown_guard_condition_);
}

std::vector<rclcpp::CallbackGroup::WeakPtr>
Expand Down Expand Up @@ -230,10 +212,7 @@ Executor::add_callback_group_to_map(
weak_nodes_to_guard_conditions_[node_weak_ptr] = node_ptr->get_notify_guard_condition();
if (notify) {
// Interrupt waiting to handle new node
rcl_ret_t ret = rcl_trigger_guard_condition(&interrupt_guard_condition_);
if (ret != RCL_RET_OK) {
throw_from_rcl_error(ret, "Failed to trigger guard condition on callback group add");
}
interrupt_guard_condition_->trigger();
}
// Add the node's notify condition to the guard condition handles
memory_strategy_->add_guard_condition(node_ptr->get_notify_guard_condition());
Expand Down Expand Up @@ -306,10 +285,7 @@ Executor::remove_callback_group_from_map(
rclcpp::node_interfaces::NodeBaseInterface::WeakPtr node_weak_ptr(node_ptr);
weak_nodes_to_guard_conditions_.erase(node_weak_ptr);
if (notify) {
rcl_ret_t ret = rcl_trigger_guard_condition(&interrupt_guard_condition_);
if (ret != RCL_RET_OK) {
throw_from_rcl_error(ret, "Failed to trigger guard condition on callback group remove");
}
interrupt_guard_condition_->trigger();
}
memory_strategy_->remove_guard_condition(node_ptr->get_notify_guard_condition());
}
Expand Down Expand Up @@ -482,10 +458,7 @@ void
Executor::cancel()
{
spinning.store(false);
rcl_ret_t ret = rcl_trigger_guard_condition(&interrupt_guard_condition_);
if (ret != RCL_RET_OK) {
throw_from_rcl_error(ret, "Failed to trigger guard condition in cancel");
}
interrupt_guard_condition_->trigger();
}

void
Expand Down Expand Up @@ -523,10 +496,7 @@ Executor::execute_any_executable(AnyExecutable & any_exec)
any_exec.callback_group->can_be_taken_from().store(true);
// Wake the wait, because it may need to be recalculated or work that
// was previously blocked is now available.
rcl_ret_t ret = rcl_trigger_guard_condition(&interrupt_guard_condition_);
if (ret != RCL_RET_OK) {
throw_from_rcl_error(ret, "Failed to trigger guard condition from execute_any_executable");
}
interrupt_guard_condition_->trigger();
}

static
Expand Down
Loading

0 comments on commit b9bec69

Please sign in to comment.