Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unique network flows #1496

Merged
merged 15 commits into from
Apr 5, 2021
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/memory_strategies.cpp
src/rclcpp/memory_strategy.cpp
src/rclcpp/message_info.cpp
src/rclcpp/network_flow_endpoint.cpp
src/rclcpp/node.cpp
src/rclcpp/node_options.cpp
src/rclcpp/node_interfaces/node_base.cpp
Expand Down
108 changes: 108 additions & 0 deletions rclcpp/include/rclcpp/network_flow_endpoint.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
// Copyright 2020 Ericsson AB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__NETWORK_FLOW_ENDPOINT_HPP_
#define RCLCPP__NETWORK_FLOW_ENDPOINT_HPP_

#include <cstdint>
#include <string>
#include <iostream>

#include "rcl/network_flow_endpoints.h"

#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{

/**
* Class describes a network flow endpoint based on the counterpart definition
* in the RMW layer.
*/
class NetworkFlowEndpoint
{
public:
/// Construct from rcl_network_flow_endpoint_t
RCLCPP_PUBLIC
explicit NetworkFlowEndpoint(const rcl_network_flow_endpoint_t & network_flow_endpoint)
: transport_protocol_(
rcl_network_flow_endpoint_get_transport_protocol_string(network_flow_endpoint.
transport_protocol)),
internet_protocol_(
rcl_network_flow_endpoint_get_internet_protocol_string(
network_flow_endpoint.internet_protocol)),
transport_port_(network_flow_endpoint.transport_port),
flow_label_(network_flow_endpoint.flow_label),
dscp_(network_flow_endpoint.dscp),
internet_address_(network_flow_endpoint.internet_address)
{
}

/// Get transport protocol
RCLCPP_PUBLIC
const std::string & transport_protocol() const;

/// Get internet protocol
RCLCPP_PUBLIC
const std::string & internet_protocol() const;
wjwwood marked this conversation as resolved.
Show resolved Hide resolved

/// Get transport port
RCLCPP_PUBLIC
uint16_t transport_port() const;

/// Get flow label
RCLCPP_PUBLIC
uint32_t flow_label() const;

/// Get DSCP
RCLCPP_PUBLIC
uint8_t dscp() const;

/// Get internet address
RCLCPP_PUBLIC
const std::string & internet_address() const;
wjwwood marked this conversation as resolved.
Show resolved Hide resolved

/// Compare two NetworkFlowEndpoint instances
friend bool operator==(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right);
friend bool operator!=(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right);

/// Streaming helper
friend std::ostream & operator<<(
std::ostream & os,
const NetworkFlowEndpoint & network_flow_endpoint);

private:
std::string transport_protocol_;
std::string internet_protocol_;
uint16_t transport_port_;
uint32_t flow_label_;
uint8_t dscp_;
std::string internet_address_;
};

/// Check if two NetworkFlowEndpoint instances are equal
RCLCPP_PUBLIC
bool operator==(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right);

/// Check if two NetworkFlowEndpoint instances are not equal
RCLCPP_PUBLIC
bool operator!=(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right);

/// Streaming helper
RCLCPP_PUBLIC
std::ostream & operator<<(std::ostream & os, const NetworkFlowEndpoint & network_flow);

} // namespace rclcpp

#endif // RCLCPP__NETWORK_FLOW_ENDPOINT_HPP_
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rcl/publisher.h"

#include "rclcpp/macros.hpp"
#include "rclcpp/network_flow_endpoint.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/type_support_decl.hpp"
Expand Down Expand Up @@ -193,6 +194,15 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
uint64_t intra_process_publisher_id,
IntraProcessManagerSharedPtr ipm);

/// Get network flow endpoints
/**
* Describes network flow endpoints that this publisher is sending messages out on
* \return vector of NetworkFlowEndpoint
*/
RCLCPP_PUBLIC
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

protected:
template<typename EventCallbackT>
void
Expand Down
7 changes: 7 additions & 0 deletions rclcpp/include/rclcpp/publisher_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ struct PublisherOptionsBase
/// Whether or not to use default callbacks when user doesn't supply any in event_callbacks
bool use_default_callbacks = true;

/// Require middleware to generate unique network flow endpoints
/// Disabled by default
rmw_unique_network_flow_endpoints_requirement_t require_unique_network_flow_endpoints =
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED;

/// Callback group in which the waitable items from the publisher should be placed.
std::shared_ptr<rclcpp::CallbackGroup> callback_group;

Expand Down Expand Up @@ -80,6 +85,8 @@ struct PublisherOptionsWithAllocator : public PublisherOptionsBase
auto message_alloc = std::make_shared<MessageAllocatorT>(*this->get_allocator().get());
result.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(*message_alloc);
result.qos = qos.get_rmw_qos_profile();
result.rmw_publisher_options.require_unique_network_flow_endpoints =
this->require_unique_network_flow_endpoints;

// Apply payload to rcl_publisher_options if necessary.
if (rmw_implementation_payload && rmw_implementation_payload->has_been_customized()) {
Expand Down
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/message_info.hpp"
#include "rclcpp/network_flow_endpoint.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/serialized_message.hpp"
Expand Down Expand Up @@ -263,6 +264,15 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
bool
exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state);

/// Get network flow endpoints
/**
* Describes network flow endpoints that this subscription is receiving messages on
* \return vector of NetworkFlowEndpoint
*/
RCLCPP_PUBLIC
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

protected:
template<typename EventCallbackT>
void
Expand Down
7 changes: 7 additions & 0 deletions rclcpp/include/rclcpp/subscription_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ struct SubscriptionOptionsBase
/// True to ignore local publications.
bool ignore_local_publications = false;

/// Require middleware to generate unique network flow endpoints
/// Disabled by default
rmw_unique_network_flow_endpoints_requirement_t require_unique_network_flow_endpoints =
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED;

/// The callback group for this subscription. NULL to use the default callback group.
rclcpp::CallbackGroup::SharedPtr callback_group = nullptr;

Expand Down Expand Up @@ -108,6 +113,8 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
result.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc);
result.qos = qos.get_rmw_qos_profile();
result.rmw_subscription_options.ignore_local_publications = this->ignore_local_publications;
result.rmw_subscription_options.require_unique_network_flow_endpoints =
this->require_unique_network_flow_endpoints;

// Apply payload to rcl_subscription_options if necessary.
if (rmw_implementation_payload && rmw_implementation_payload->has_been_customized()) {
Expand Down
84 changes: 84 additions & 0 deletions rclcpp/src/rclcpp/network_flow_endpoint.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2020 Ericsson AB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string>

#include "rclcpp/network_flow_endpoint.hpp"

namespace rclcpp
{

const std::string &
NetworkFlowEndpoint::transport_protocol() const
{
return transport_protocol_;
}

const std::string &
NetworkFlowEndpoint::internet_protocol() const
{
return internet_protocol_;
}

uint16_t NetworkFlowEndpoint::transport_port() const
{
return transport_port_;
}

uint32_t NetworkFlowEndpoint::flow_label() const
{
return flow_label_;
}

uint8_t NetworkFlowEndpoint::dscp() const
{
return dscp_;
}

const std::string &
NetworkFlowEndpoint::internet_address() const
{
return internet_address_;
}

bool operator==(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right)
{
return left.transport_protocol_ == right.transport_protocol_ &&
left.internet_protocol_ == right.internet_protocol_ &&
left.transport_port_ == right.transport_port_ &&
left.flow_label_ == right.flow_label_ &&
left.dscp_ == right.dscp_ &&
left.internet_address_ == right.internet_address_;
}

bool operator!=(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right)
{
return !(left == right);
}

std::ostream & operator<<(std::ostream & os, const NetworkFlowEndpoint & network_flow_endpoint)
{
// Stream out in JSON-like format
os << "{" <<
"\"transportProtocol\": \"" << network_flow_endpoint.transport_protocol_ << "\", " <<
"\"internetProtocol\": \"" << network_flow_endpoint.internet_protocol_ << "\", " <<
"\"transportPort\": \"" << network_flow_endpoint.transport_port_ << "\", " <<
"\"flowLabel\": \"" << std::to_string(network_flow_endpoint.flow_label_) << "\", " <<
"\"dscp\": \"" << std::to_string(network_flow_endpoint.dscp_) << "\", " <<
"\"internetAddress\": \"" << network_flow_endpoint.internet_address_ << "\"" <<
"}";
return os;
}

} // namespace rclcpp
38 changes: 38 additions & 0 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/network_flow_endpoint.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/qos_event.hpp"

Expand Down Expand Up @@ -268,3 +269,40 @@ PublisherBase::default_incompatible_qos_callback(
get_topic_name(),
policy_name.c_str());
}

std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoints() const
{
rcutils_allocator_t allocator = rcutils_get_default_allocator();
rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
rcl_get_zero_initialized_network_flow_endpoint_array();
rcl_ret_t ret = rcl_publisher_get_network_flow_endpoints(
publisher_handle_.get(), &allocator, &network_flow_endpoint_array);
if (RCL_RET_OK != ret) {
auto error_msg = std::string("error obtaining network flows of publisher: ") +
rcl_get_error_string().str;
rcl_reset_error();
if (RCL_RET_OK !=
rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array, &allocator))
{
error_msg += std::string(", also error cleaning up network flow array: ") +
rcl_get_error_string().str;
rcl_reset_error();
}
rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
}

std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
for (size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
network_flow_endpoint_vector.push_back(
rclcpp::NetworkFlowEndpoint(
network_flow_endpoint_array.
network_flow_endpoint[i]));
wjwwood marked this conversation as resolved.
Show resolved Hide resolved
}

ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array, &allocator);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error cleaning up network flow array");
}

return network_flow_endpoint_vector;
}
37 changes: 37 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,40 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
}
throw std::runtime_error("given pointer_to_subscription_part does not match any part");
}

std::vector<rclcpp::NetworkFlowEndpoint> SubscriptionBase::get_network_flow_endpoints() const
{
rcutils_allocator_t allocator = rcutils_get_default_allocator();
rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
rcl_get_zero_initialized_network_flow_endpoint_array();
rcl_ret_t ret = rcl_subscription_get_network_flow_endpoints(
subscription_handle_.get(), &allocator, &network_flow_endpoint_array);
if (RCL_RET_OK != ret) {
auto error_msg = std::string("Error obtaining network flows of subscription: ") +
rcl_get_error_string().str;
rcl_reset_error();
if (RCL_RET_OK !=
rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array, &allocator))
{
error_msg += std::string(". Also error cleaning up network flow array: ") +
rcl_get_error_string().str;
rcl_reset_error();
}
rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
}

std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
for (size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
network_flow_endpoint_vector.push_back(
rclcpp::NetworkFlowEndpoint(
network_flow_endpoint_array.
network_flow_endpoint[i]));
}

ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array, &allocator);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error cleaning up network flow array");
}

return network_flow_endpoint_vector;
}
Loading