Skip to content

Commit

Permalink
Unique network flows (#1496)
Browse files Browse the repository at this point in the history
* Add option to enable unique network flow

- Option enabled for publishers and subscriptions
- TODO: Discuss error handling if not supported

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Get network flows of publishers and subscriptions

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Use new unique network flow option

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Rename files for clarity

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Rename API for clarity and add DSCP option

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Convert integer to string prior to streaming

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Uncrustify

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Fix linkage error thrown by MSVC compiler

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Use updated rmw interface

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Improve readability

Signed-off-by: Ananya Muddukrishna <[email protected]>

* Forward declare friend functions

Signed-off-by: Ananya Muddukrishna <[email protected]>

Co-authored-by: Ananya Muddukrishna <[email protected]>
  • Loading branch information
anamud and Ananya Muddukrishna authored Apr 5, 2021
1 parent 06a4ee0 commit 6af478d
Show file tree
Hide file tree
Showing 11 changed files with 372 additions and 0 deletions.
1 change: 1 addition & 0 deletions rclcpp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ set(${PROJECT_NAME}_SRCS
src/rclcpp/memory_strategies.cpp
src/rclcpp/memory_strategy.cpp
src/rclcpp/message_info.cpp
src/rclcpp/network_flow_endpoint.cpp
src/rclcpp/node.cpp
src/rclcpp/node_interfaces/node_base.cpp
src/rclcpp/node_interfaces/node_clock.cpp
Expand Down
115 changes: 115 additions & 0 deletions rclcpp/include/rclcpp/network_flow_endpoint.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,115 @@
// Copyright 2020 Ericsson AB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef RCLCPP__NETWORK_FLOW_ENDPOINT_HPP_
#define RCLCPP__NETWORK_FLOW_ENDPOINT_HPP_

#include <cstdint>
#include <string>
#include <iostream>

#include "rcl/network_flow_endpoints.h"

#include "rclcpp/visibility_control.hpp"

namespace rclcpp
{

/// Forward declaration
class NetworkFlowEndpoint;

/// Check if two NetworkFlowEndpoint instances are equal
RCLCPP_PUBLIC
bool operator==(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right);

/// Check if two NetworkFlowEndpoint instances are not equal
RCLCPP_PUBLIC
bool operator!=(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right);

/// Streaming helper for NetworkFlowEndpoint
RCLCPP_PUBLIC
std::ostream & operator<<(std::ostream & os, const NetworkFlowEndpoint & network_flow_endpoint);

/**
* Class describes a network flow endpoint based on the counterpart definition
* in the RMW layer.
*/
class NetworkFlowEndpoint
{
public:
/// Construct from rcl_network_flow_endpoint_t
RCLCPP_PUBLIC
explicit NetworkFlowEndpoint(const rcl_network_flow_endpoint_t & network_flow_endpoint)
: transport_protocol_(
rcl_network_flow_endpoint_get_transport_protocol_string(network_flow_endpoint.
transport_protocol)),
internet_protocol_(
rcl_network_flow_endpoint_get_internet_protocol_string(
network_flow_endpoint.internet_protocol)),
transport_port_(network_flow_endpoint.transport_port),
flow_label_(network_flow_endpoint.flow_label),
dscp_(network_flow_endpoint.dscp),
internet_address_(network_flow_endpoint.internet_address)
{
}

/// Get transport protocol
RCLCPP_PUBLIC
const std::string & transport_protocol() const;

/// Get internet protocol
RCLCPP_PUBLIC
const std::string & internet_protocol() const;

/// Get transport port
RCLCPP_PUBLIC
uint16_t transport_port() const;

/// Get flow label
RCLCPP_PUBLIC
uint32_t flow_label() const;

/// Get DSCP
RCLCPP_PUBLIC
uint8_t dscp() const;

/// Get internet address
RCLCPP_PUBLIC
const std::string & internet_address() const;

/// Compare two NetworkFlowEndpoint instances
friend bool rclcpp::operator==(
const NetworkFlowEndpoint & left,
const NetworkFlowEndpoint & right);
friend bool rclcpp::operator!=(
const NetworkFlowEndpoint & left,
const NetworkFlowEndpoint & right);

/// Streaming helper
friend std::ostream & rclcpp::operator<<(
std::ostream & os,
const NetworkFlowEndpoint & network_flow_endpoint);

private:
std::string transport_protocol_;
std::string internet_protocol_;
uint16_t transport_port_;
uint32_t flow_label_;
uint8_t dscp_;
std::string internet_address_;
};

} // namespace rclcpp

#endif // RCLCPP__NETWORK_FLOW_ENDPOINT_HPP_
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/publisher_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
#include "rcl/publisher.h"

#include "rclcpp/macros.hpp"
#include "rclcpp/network_flow_endpoint.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/type_support_decl.hpp"
Expand Down Expand Up @@ -193,6 +194,15 @@ class PublisherBase : public std::enable_shared_from_this<PublisherBase>
uint64_t intra_process_publisher_id,
IntraProcessManagerSharedPtr ipm);

/// Get network flow endpoints
/**
* Describes network flow endpoints that this publisher is sending messages out on
* \return vector of NetworkFlowEndpoint
*/
RCLCPP_PUBLIC
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

protected:
template<typename EventCallbackT>
void
Expand Down
7 changes: 7 additions & 0 deletions rclcpp/include/rclcpp/publisher_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ struct PublisherOptionsBase
/// Whether or not to use default callbacks when user doesn't supply any in event_callbacks
bool use_default_callbacks = true;

/// Require middleware to generate unique network flow endpoints
/// Disabled by default
rmw_unique_network_flow_endpoints_requirement_t require_unique_network_flow_endpoints =
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED;

/// Callback group in which the waitable items from the publisher should be placed.
std::shared_ptr<rclcpp::CallbackGroup> callback_group;

Expand Down Expand Up @@ -80,6 +85,8 @@ struct PublisherOptionsWithAllocator : public PublisherOptionsBase
auto message_alloc = std::make_shared<MessageAllocatorT>(*this->get_allocator().get());
result.allocator = rclcpp::allocator::get_rcl_allocator<MessageT>(*message_alloc);
result.qos = qos.get_rmw_qos_profile();
result.rmw_publisher_options.require_unique_network_flow_endpoints =
this->require_unique_network_flow_endpoints;

// Apply payload to rcl_publisher_options if necessary.
if (rmw_implementation_payload && rmw_implementation_payload->has_been_customized()) {
Expand Down
10 changes: 10 additions & 0 deletions rclcpp/include/rclcpp/subscription_base.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include "rclcpp/experimental/subscription_intra_process_base.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/message_info.hpp"
#include "rclcpp/network_flow_endpoint.hpp"
#include "rclcpp/qos.hpp"
#include "rclcpp/qos_event.hpp"
#include "rclcpp/serialized_message.hpp"
Expand Down Expand Up @@ -266,6 +267,15 @@ class SubscriptionBase : public std::enable_shared_from_this<SubscriptionBase>
bool
exchange_in_use_by_wait_set_state(void * pointer_to_subscription_part, bool in_use_state);

/// Get network flow endpoints
/**
* Describes network flow endpoints that this subscription is receiving messages on
* \return vector of NetworkFlowEndpoint
*/
RCLCPP_PUBLIC
std::vector<rclcpp::NetworkFlowEndpoint>
get_network_flow_endpoints() const;

protected:
template<typename EventCallbackT>
void
Expand Down
7 changes: 7 additions & 0 deletions rclcpp/include/rclcpp/subscription_options.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ struct SubscriptionOptionsBase
/// True to ignore local publications.
bool ignore_local_publications = false;

/// Require middleware to generate unique network flow endpoints
/// Disabled by default
rmw_unique_network_flow_endpoints_requirement_t require_unique_network_flow_endpoints =
RMW_UNIQUE_NETWORK_FLOW_ENDPOINTS_NOT_REQUIRED;

/// The callback group for this subscription. NULL to use the default callback group.
rclcpp::CallbackGroup::SharedPtr callback_group = nullptr;

Expand Down Expand Up @@ -108,6 +113,8 @@ struct SubscriptionOptionsWithAllocator : public SubscriptionOptionsBase
result.allocator = allocator::get_rcl_allocator<MessageT>(*message_alloc);
result.qos = qos.get_rmw_qos_profile();
result.rmw_subscription_options.ignore_local_publications = this->ignore_local_publications;
result.rmw_subscription_options.require_unique_network_flow_endpoints =
this->require_unique_network_flow_endpoints;

// Apply payload to rcl_subscription_options if necessary.
if (rmw_implementation_payload && rmw_implementation_payload->has_been_customized()) {
Expand Down
84 changes: 84 additions & 0 deletions rclcpp/src/rclcpp/network_flow_endpoint.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Copyright 2020 Ericsson AB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <string>

#include "rclcpp/network_flow_endpoint.hpp"

namespace rclcpp
{

const std::string &
NetworkFlowEndpoint::transport_protocol() const
{
return transport_protocol_;
}

const std::string &
NetworkFlowEndpoint::internet_protocol() const
{
return internet_protocol_;
}

uint16_t NetworkFlowEndpoint::transport_port() const
{
return transport_port_;
}

uint32_t NetworkFlowEndpoint::flow_label() const
{
return flow_label_;
}

uint8_t NetworkFlowEndpoint::dscp() const
{
return dscp_;
}

const std::string &
NetworkFlowEndpoint::internet_address() const
{
return internet_address_;
}

bool operator==(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right)
{
return left.transport_protocol_ == right.transport_protocol_ &&
left.internet_protocol_ == right.internet_protocol_ &&
left.transport_port_ == right.transport_port_ &&
left.flow_label_ == right.flow_label_ &&
left.dscp_ == right.dscp_ &&
left.internet_address_ == right.internet_address_;
}

bool operator!=(const NetworkFlowEndpoint & left, const NetworkFlowEndpoint & right)
{
return !(left == right);
}

std::ostream & operator<<(std::ostream & os, const NetworkFlowEndpoint & network_flow_endpoint)
{
// Stream out in JSON-like format
os << "{" <<
"\"transportProtocol\": \"" << network_flow_endpoint.transport_protocol_ << "\", " <<
"\"internetProtocol\": \"" << network_flow_endpoint.internet_protocol_ << "\", " <<
"\"transportPort\": \"" << network_flow_endpoint.transport_port_ << "\", " <<
"\"flowLabel\": \"" << std::to_string(network_flow_endpoint.flow_label_) << "\", " <<
"\"dscp\": \"" << std::to_string(network_flow_endpoint.dscp_) << "\", " <<
"\"internetAddress\": \"" << network_flow_endpoint.internet_address_ << "\"" <<
"}";
return os;
}

} // namespace rclcpp
37 changes: 37 additions & 0 deletions rclcpp/src/rclcpp/publisher_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@
#include "rclcpp/experimental/intra_process_manager.hpp"
#include "rclcpp/logging.hpp"
#include "rclcpp/macros.hpp"
#include "rclcpp/network_flow_endpoint.hpp"
#include "rclcpp/node.hpp"
#include "rclcpp/qos_event.hpp"

Expand Down Expand Up @@ -268,3 +269,39 @@ PublisherBase::default_incompatible_qos_callback(
get_topic_name(),
policy_name.c_str());
}

std::vector<rclcpp::NetworkFlowEndpoint> PublisherBase::get_network_flow_endpoints() const
{
rcutils_allocator_t allocator = rcutils_get_default_allocator();
rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
rcl_get_zero_initialized_network_flow_endpoint_array();
rcl_ret_t ret = rcl_publisher_get_network_flow_endpoints(
publisher_handle_.get(), &allocator, &network_flow_endpoint_array);
if (RCL_RET_OK != ret) {
auto error_msg = std::string("error obtaining network flows of publisher: ") +
rcl_get_error_string().str;
rcl_reset_error();
if (RCL_RET_OK !=
rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array))
{
error_msg += std::string(", also error cleaning up network flow array: ") +
rcl_get_error_string().str;
rcl_reset_error();
}
rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
}

std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
for (size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
network_flow_endpoint_vector.push_back(
rclcpp::NetworkFlowEndpoint(
network_flow_endpoint_array.network_flow_endpoint[i]));
}

ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error cleaning up network flow array");
}

return network_flow_endpoint_vector;
}
37 changes: 37 additions & 0 deletions rclcpp/src/rclcpp/subscription_base.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -288,3 +288,40 @@ SubscriptionBase::exchange_in_use_by_wait_set_state(
}
throw std::runtime_error("given pointer_to_subscription_part does not match any part");
}

std::vector<rclcpp::NetworkFlowEndpoint> SubscriptionBase::get_network_flow_endpoints() const
{
rcutils_allocator_t allocator = rcutils_get_default_allocator();
rcl_network_flow_endpoint_array_t network_flow_endpoint_array =
rcl_get_zero_initialized_network_flow_endpoint_array();
rcl_ret_t ret = rcl_subscription_get_network_flow_endpoints(
subscription_handle_.get(), &allocator, &network_flow_endpoint_array);
if (RCL_RET_OK != ret) {
auto error_msg = std::string("Error obtaining network flows of subscription: ") +
rcl_get_error_string().str;
rcl_reset_error();
if (RCL_RET_OK !=
rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array))
{
error_msg += std::string(". Also error cleaning up network flow array: ") +
rcl_get_error_string().str;
rcl_reset_error();
}
rclcpp::exceptions::throw_from_rcl_error(ret, error_msg);
}

std::vector<rclcpp::NetworkFlowEndpoint> network_flow_endpoint_vector;
for (size_t i = 0; i < network_flow_endpoint_array.size; ++i) {
network_flow_endpoint_vector.push_back(
rclcpp::NetworkFlowEndpoint(
network_flow_endpoint_array.
network_flow_endpoint[i]));
}

ret = rcl_network_flow_endpoint_array_fini(&network_flow_endpoint_array);
if (RCL_RET_OK != ret) {
rclcpp::exceptions::throw_from_rcl_error(ret, "error cleaning up network flow array");
}

return network_flow_endpoint_vector;
}
Loading

0 comments on commit 6af478d

Please sign in to comment.