Skip to content

Commit

Permalink
Use rclcpp::GuardCondition
Browse files Browse the repository at this point in the history
Signed-off-by: Mauro Passerino <[email protected]>
  • Loading branch information
Mauro Passerino committed Aug 19, 2021
1 parent d7764b4 commit ab313f2
Show file tree
Hide file tree
Showing 48 changed files with 314 additions and 354 deletions.
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/clock.cpp
src/rclcpp/context.cpp
src/rclcpp/contexts/default_context.cpp
src/rclcpp/detail/add_guard_condition_to_rcl_wait_set.cpp
src/rclcpp/detail/resolve_parameter_overrides.cpp
src/rclcpp/detail/rmw_implementation_specific_payload.cpp
src/rclcpp/detail/rmw_implementation_specific_publisher_payload.cpp
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
// Copyright 2021 Open Source Robotics Foundation, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__DETAIL__ADD_GUARD_CONDITION_TO_RCL_WAIT_SET_HPP_
#define RCLCPP__DETAIL__ADD_GUARD_CONDITION_TO_RCL_WAIT_SET_HPP_

#include "rclcpp/guard_condition.hpp"

namespace rclcpp
{
namespace detail
{

/// Adds the guard condition to a waitset
/**
* This function is thread-safe.
* \param[in] wait_set reference to a wait set where to add the guard condition
* \param[in] guard_condition reference to the guard_condition to be added
*/
RCLCPP_PUBLIC
void
add_guard_condition_to_rcl_wait_set(
rcl_wait_set_t & wait_set,
const rclcpp::GuardCondition & guard_condition);

} // namespace detail
} // namespace rclcpp

#endif // RCLCPP__DETAIL__ADD_GUARD_CONDITION_TO_RCL_WAIT_SET_HPP_
4 changes: 2 additions & 2 deletions rclcpp/include/rclcpp/executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -525,7 +525,7 @@ class Executor
std::atomic_bool spinning;

/// Guard condition for signaling the rmw layer to wake up for special events.
rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
rclcpp::GuardCondition interrupt_guard_condition_;

std::shared_ptr<rclcpp::GuardCondition> shutdown_guard_condition_;

Expand All @@ -549,7 +549,7 @@ class Executor
spin_once_impl(std::chrono::nanoseconds timeout);

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rcl_guard_condition_t *,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ class StaticExecutorEntitiesCollector final
* \throws std::runtime_error if it couldn't add guard condition to wait set
*/
RCLCPP_PUBLIC
bool
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;

RCLCPP_PUBLIC
Expand Down Expand Up @@ -328,7 +328,7 @@ class StaticExecutorEntitiesCollector final
WeakCallbackGroupsToNodesMap weak_groups_to_nodes_associated_with_executor_;

typedef std::map<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr,
const rcl_guard_condition_t *,
const rclcpp::GuardCondition *,
std::owner_less<rclcpp::node_interfaces::NodeBaseInterface::WeakPtr>>
WeakNodesToGuardConditionsMap;
WeakNodesToGuardConditionsMap weak_nodes_to_guard_conditions_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@

#include "rcl/error_handling.h"

#include "rclcpp/guard_condition.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/type_support_decl.hpp"
#include "rclcpp/waitable.hpp"
Expand All @@ -41,9 +42,10 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

RCLCPP_PUBLIC
SubscriptionIntraProcessBase(
rclcpp::Context::SharedPtr context,
const std::string & topic_name,
const rclcpp::QoS & qos_profile)
: topic_name_(topic_name), qos_profile_(qos_profile)
: gc_(context), topic_name_(topic_name), qos_profile_(qos_profile)
{}

virtual ~SubscriptionIntraProcessBase() = default;
Expand All @@ -53,7 +55,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable
get_number_of_ready_guard_conditions() {return 1;}

RCLCPP_PUBLIC
bool
void
add_to_wait_set(rcl_wait_set_t * wait_set);

virtual bool
Expand All @@ -79,7 +81,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable

protected:
std::recursive_mutex reentrant_mutex_;
rcl_guard_condition_t gc_;
rclcpp::GuardCondition gc_;

private:
virtual void
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,36 +67,13 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase
const std::string & topic_name,
const rclcpp::QoS & qos_profile,
rclcpp::IntraProcessBufferType buffer_type)
: SubscriptionIntraProcessBase(topic_name, qos_profile)
: SubscriptionIntraProcessBase(context, topic_name, qos_profile)
{
// Create the intra-process buffer.
buffer_ = rclcpp::experimental::create_intra_process_buffer<MessageT, Alloc, Deleter>(
buffer_type,
qos_profile,
allocator);

// Create the guard condition.
rcl_guard_condition_options_t guard_condition_options =
rcl_guard_condition_get_default_options();

gc_ = rcl_get_zero_initialized_guard_condition();
rcl_ret_t ret = rcl_guard_condition_init(
&gc_, context->get_rcl_context().get(), guard_condition_options);

if (RCL_RET_OK != ret) {
throw std::runtime_error(
"SubscriptionIntraProcessBuffer init error initializing guard condition");
}
}

virtual ~SubscriptionIntraProcessBuffer()
{
if (rcl_guard_condition_fini(&gc_) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Failed to destroy guard condition: %s",
rcutils_get_error_string().str);
}
}

bool
Expand Down Expand Up @@ -130,8 +107,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionIntraProcessBase
void
trigger_guard_condition()
{
rcl_ret_t ret = rcl_trigger_guard_condition(&gc_);
(void)ret;
gc_.trigger();
}

BufferUniquePtr buffer_;
Expand Down
3 changes: 2 additions & 1 deletion rclcpp/include/rclcpp/graph_listener.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "rcl/guard_condition.h"
#include "rcl/wait.h"
#include "rclcpp/context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/node_interfaces/node_graph_interface.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down Expand Up @@ -187,7 +188,7 @@ class GraphListener : public std::enable_shared_from_this<GraphListener>
mutable std::mutex node_graph_interfaces_mutex_;
std::vector<rclcpp::node_interfaces::NodeGraphInterface *> node_graph_interfaces_;

rcl_guard_condition_t interrupt_guard_condition_ = rcl_get_zero_initialized_guard_condition();
rclcpp::GuardCondition interrupt_guard_condition_;
rcl_wait_set_t wait_set_ = rcl_get_zero_initialized_wait_set();
};

Expand Down
9 changes: 8 additions & 1 deletion rclcpp/include/rclcpp/guard_condition.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@ class GuardCondition
RCLCPP_PUBLIC
explicit GuardCondition(
rclcpp::Context::SharedPtr context =
rclcpp::contexts::get_global_default_context());
rclcpp::contexts::get_global_default_context(),
rcl_guard_condition_options_t guard_condition_options =
rcl_guard_condition_get_default_options());

RCLCPP_PUBLIC
virtual
Expand All @@ -58,6 +60,11 @@ class GuardCondition
rclcpp::Context::SharedPtr
get_context() const;

/// Return the underlying rcl guard condition structure.
RCLCPP_PUBLIC
rcl_guard_condition_t &
get_rcl_guard_condition();

/// Return the underlying rcl guard condition structure.
RCLCPP_PUBLIC
const rcl_guard_condition_t &
Expand Down
6 changes: 4 additions & 2 deletions rclcpp/include/rclcpp/memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,11 @@ class RCLCPP_PUBLIC MemoryStrategy
virtual void clear_handles() = 0;
virtual void remove_null_handles(rcl_wait_set_t * wait_set) = 0;

virtual void add_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void
add_guard_condition(const rclcpp::GuardCondition & guard_condition) = 0;

virtual void remove_guard_condition(const rcl_guard_condition_t * guard_condition) = 0;
virtual void
remove_guard_condition(const rclcpp::GuardCondition & guard_condition) = 0;

virtual void
get_next_subscription(
Expand Down
8 changes: 2 additions & 6 deletions rclcpp/include/rclcpp/node_interfaces/node_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,13 +114,9 @@ class NodeBase : public NodeBaseInterface
get_associated_with_executor_atomic() override;

RCLCPP_PUBLIC
rcl_guard_condition_t *
rclcpp::GuardCondition &
get_notify_guard_condition() override;

RCLCPP_PUBLIC
std::unique_lock<std::recursive_mutex>
acquire_notify_guard_condition_lock() const override;

RCLCPP_PUBLIC
bool
get_use_intra_process_default() const override;
Expand Down Expand Up @@ -149,7 +145,7 @@ class NodeBase : public NodeBaseInterface

/// Guard condition for notifying the Executor of changes to this node.
mutable std::recursive_mutex notify_guard_condition_mutex_;
rcl_guard_condition_t notify_guard_condition_ = rcl_get_zero_initialized_guard_condition();
rclcpp::GuardCondition notify_guard_condition_;
bool notify_guard_condition_is_valid_;
};

Expand Down
14 changes: 4 additions & 10 deletions rclcpp/include/rclcpp/node_interfaces/node_base_interface.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@

#include "rclcpp/callback_group.hpp"
#include "rclcpp/context.hpp"
#include "rclcpp/guard_condition.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/visibility_control.hpp"

Expand Down Expand Up @@ -142,24 +143,17 @@ class NodeBaseInterface
std::atomic_bool &
get_associated_with_executor_atomic() = 0;

/// Return guard condition that should be notified when the internal node state changes.
/// Return a guard condition that should be notified when the internal node state changes.
/**
* For example, this should be notified when a publisher is added or removed.
*
* \return the rcl_guard_condition_t if it is valid, else nullptr
* \return the GuardCondition if it is valid, else thow runtime error
*/
RCLCPP_PUBLIC
virtual
rcl_guard_condition_t *
rclcpp::GuardCondition &
get_notify_guard_condition() = 0;

/// Acquire and return a scoped lock that protects the notify guard condition.
/** This should be used when triggering the notify guard condition. */
RCLCPP_PUBLIC
virtual
std::unique_lock<std::recursive_mutex>
acquire_notify_guard_condition_lock() const = 0;

/// Return the default preference for using intra process communication.
RCLCPP_PUBLIC
virtual
Expand Down
2 changes: 1 addition & 1 deletion rclcpp/include/rclcpp/qos_event.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ class QOSEventHandlerBase : public Waitable

/// Add the Waitable to a wait set.
RCLCPP_PUBLIC
bool
void
add_to_wait_set(rcl_wait_set_t * wait_set) override;

/// Check if the Waitable is ready.
Expand Down
28 changes: 9 additions & 19 deletions rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "rcl/allocator.h"

#include "rclcpp/allocator/allocator_common.hpp"
#include "rclcpp/detail/add_guard_condition_to_rcl_wait_set.hpp"
#include "rclcpp/memory_strategy.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/visibility_control.hpp"
Expand Down Expand Up @@ -61,20 +62,20 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
allocator_ = std::make_shared<VoidAlloc>();
}

void add_guard_condition(const rcl_guard_condition_t * guard_condition) override
void add_guard_condition(const rclcpp::GuardCondition & guard_condition) override
{
for (const auto & existing_guard_condition : guard_conditions_) {
if (existing_guard_condition == guard_condition) {
if (existing_guard_condition == &guard_condition) {
return;
}
}
guard_conditions_.push_back(guard_condition);
guard_conditions_.push_back(&guard_condition);
}

void remove_guard_condition(const rcl_guard_condition_t * guard_condition) override
void remove_guard_condition(const rclcpp::GuardCondition & guard_condition) override
{
for (auto it = guard_conditions_.begin(); it != guard_conditions_.end(); ++it) {
if (*it == guard_condition) {
if (*it == &guard_condition) {
guard_conditions_.erase(it);
break;
}
Expand Down Expand Up @@ -240,22 +241,11 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
}

for (auto guard_condition : guard_conditions_) {
if (rcl_wait_set_add_guard_condition(wait_set, guard_condition, NULL) != RCL_RET_OK) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Couldn't add guard_condition to wait set: %s",
rcl_get_error_string().str);
return false;
}
detail::add_guard_condition_to_rcl_wait_set(*wait_set, *guard_condition);
}

for (auto waitable : waitable_handles_) {
if (!waitable->add_to_wait_set(wait_set)) {
RCUTILS_LOG_ERROR_NAMED(
"rclcpp",
"Couldn't add waitable to wait set: %s", rcl_get_error_string().str);
return false;
}
waitable->add_to_wait_set(wait_set);
}
return true;
}
Expand Down Expand Up @@ -509,7 +499,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy
using VectorRebind =
std::vector<T, typename std::allocator_traits<Alloc>::template rebind_alloc<T>>;

VectorRebind<const rcl_guard_condition_t *> guard_conditions_;
VectorRebind<const rclcpp::GuardCondition *> guard_conditions_;

VectorRebind<std::shared_ptr<const rcl_subscription_t>> subscription_handles_;
VectorRebind<std::shared_ptr<const rcl_service_t>> service_handles_;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,10 +383,7 @@ class StoragePolicyCommon
continue;
}
rclcpp::Waitable & waitable = *waitable_ptr_pair.second;
bool successful = waitable.add_to_wait_set(&rcl_wait_set_);
if (!successful) {
throw std::runtime_error("waitable unexpectedly failed to be added to wait set");
}
waitable.add_to_wait_set(&rcl_wait_set_);
}
}

Expand Down
Loading

0 comments on commit ab313f2

Please sign in to comment.