Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to override endpoint qos settings from a qos profile matching the topic name #7

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 29 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -140,11 +140,12 @@ variables.

- [RMW_CONNEXT_CYCLONE_COMPATIBILITY_MODE](#RMW_CONNEXT_CYCLONE_COMPATIBILITY_MODE)
- [RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY](#RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY)
- [RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE](#RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE)
- [RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY](#RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY)
- [RMW_CONNEXT_INITIAL_PEERS](#RMW_CONNEXT_INITIAL_PEERS)
- [RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE](#RMW_CONNEXT_LEGACY_RMW_COMPATIBILITY_MODE)
- [RMW_CONNEXT_REQUEST_REPLY_MAPPING](#RMW_CONNEXT_REQUEST_REPLY_MAPPING)
- [RMW_CONNEXT_UDP_INTERFACE](#RMW_CONNEXT_UDP_INTERFACE)
- [RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE](#RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE)

### RMW_CONNEXT_CYCLONE_COMPATIBILITY_MODE

Expand Down Expand Up @@ -176,17 +177,24 @@ Variable `RMW_CONNEXT_DISABLE_FAST_ENDPOINT_DISCOVERY` may be used to disable
these automatic optimizations, and to leave the DomainParticipant's QoS to
its defaults.

### RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE
### RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY

`rmw_connextdds` will always set `DDS_DataWriterQos::publish_mode::kind` of
any DataWriter it creates to `DDS_ASYNCHRONOUS_PUBLISH_MODE_QOS`, in order to
enable out of the box support for "large data".
When this variable is not set or set to `always`, the qos settings specified in the default profile will be used and the ros qos profile will be applied on top of it.
You can use topic filters in XML profile files to have different defaults for different topics, but you have to use the mangled topic names (see [ROS topic mangling conventions](#ros-topic-mangling-conventions)).

This behavior might not be always desirable, and it can be disabled by setting
`RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE` to a non-empty value.
In case this variable is set to `never`, the qos settings will be loaded from the default profile as before but the ros qos profile will be ignored.
Be aware of configuring the qos of rcl topics (rt/rosout, rt/parameter_events, etc) and the rmw internal topic `ros_discovery_info` correctly.

This variable is not used by `rmw_connextddsmicro`, since it doesn't
automatically override `DDS_DataWriterQos::publish_mode::kind`.
This variable can also be set to `dds_topics: <regex>`, e.g.: `dds_topics: rt/my_topic|rt/my_ns/another_topic`.
In that case, qos settings for topics matching the provided regex will be loaded in the same way as the `never` policy, and the ones that don't match will be loaded in the same way as the `always` policy.

#### ROS topic mangling conventions

ROS mangles topic names in the following way:

- Topics are prefixed with `rt`. e.g.: `/my/fully/qualified/ros/topic` is converted to `rt/my/fully/qualified/ros/topic`.
- The service request topics are prefixed with `rq` and suffixed with `Request`. e.g.: `/my/fully/qualified/ros/service` request topic is `rq/my/fully/qualified/ros/serviceRequest`.
- The service response topics are prefixed with `rr` and suffixed with `Response`. e.g.: `/my/fully/qualified/ros/service` response topic is `rr/my/fully/qualified/ros/serviceResponse`.
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this section looks good as is, at least for an initial merge.

I'm considering starting a proper manual in Sphinx for rmw_connextdds[micro], putting it on public hosting (e.g. readthedocs), and move most documentation out of the readme, to avoid bloating it inevitably.

With that available, I would probably move this information about topic mangling to a different section, one more general than this one about env variables (e.g. "Configuring QoS in rmw_connextdds").


### RMW_CONNEXT_INITIAL_PEERS

Expand Down Expand Up @@ -309,3 +317,15 @@ RMW_CONNEXT_UDP_INTERFACE=eth0 \
```

This variable is not used by `rmw_connextdds`.

### RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE

`rmw_connextdds` will always set `DDS_DataWriterQos::publish_mode::kind` of
any DataWriter it creates to `DDS_ASYNCHRONOUS_PUBLISH_MODE_QOS`, in order to
enable out of the box support for "large data".

This behavior might not be always desirable, and it can be disabled by setting
`RMW_CONNEXT_USE_DEFAULT_PUBLISH_MODE` to a non-empty value.

This variable is not used by `rmw_connextddsmicro`, since it doesn't
automatically override `DDS_DataWriterQos::publish_mode::kind`.
20 changes: 19 additions & 1 deletion rmw_connextdds_common/include/rmw_connextdds/context.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
#include <limits>
#include <map>
#include <mutex>
#include <regex>
#include <string>

#include "rmw_connextdds/dds_api.hpp"
Expand Down Expand Up @@ -92,7 +93,6 @@ struct rmw_context_impl_t
bool localhost_only;

/* Global configuration for QoS profiles */
std::string qos_library;
std::string qos_ctx_name;
std::string qos_ctx_namespace;
bool use_default_publish_mode;
Expand All @@ -104,6 +104,24 @@ struct rmw_context_impl_t
#if RMW_CONNEXT_FAST_ENDPOINT_DISCOVERY
bool fast_endp_discovery{true};
#endif /* RMW_CONNEXT_FAST_ENDPOINT_DISCOVERY */

enum class endpoint_qos_override_policy_t
{
// Use default QoS policy got from the DDS qos profile file applying topic filters
// and apply the ROS settings on top of it.
Always,
// Use default QoS policy got from the DDS qos profile file applying topic filters
// without any changes for the topics that matches the provided regex expressions.
// Apply the ROS settings on top of it for the ones that doesn't match.
DDSTopics,
// Use default QoS policy got from the DDS qos profile file applying topic filters
// without any changes.
Never,
};

endpoint_qos_override_policy_t endpoint_qos_override_policy;
std::regex endpoint_qos_override_policy_topics_regex;

/* Participant reference count*/
size_t node_count{0};
std::mutex initialization_mutex;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,10 @@ typedef DDS_UntypedSampleSeq RMW_Connext_UntypedSampleSeq;
(DDS_PropertySeq_assert_property(&(p_)->value, (n_), (v_), (pr_)) ? \
DDS_RETCODE_OK : DDS_RETCODE_ERROR)

#define DDS_DomainParticipantFactory_get_qos_profiles(dpf, profiles, library_name) DDS_RETCODE_OK

#define DDS_DomainParticipantFactory_get_qos_profile_libraries(dpf, libraries) DDS_RETCODE_OK

// Forward declaration for DDS_LifespanQosPolicy, since it is not supported by Micro.
struct DDS_LifespanQosPolicy;

Expand Down
31 changes: 0 additions & 31 deletions rmw_connextdds_common/include/rmw_connextdds/rmw_impl.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -1486,37 +1486,6 @@ rmw_ret_t
#endif /* RMW_CONNEXT_HAVE_LIFESPAN_QOS */
rmw_qos_profile_t * const qos_policies);

rmw_ret_t
rmw_connextdds_list_context_qos_profiles(
rmw_context_impl_t * const ctx,
std::vector<std::string> & profiles);

rmw_ret_t
rmw_connextdds_list_publisher_qos_profiles(
rmw_context_impl_t * const ctx,
const char * const topic_name,
std::vector<std::string> & profiles);

rmw_ret_t
rmw_connextdds_list_subscription_qos_profiles(
rmw_context_impl_t * const ctx,
const char * const topic_name,
std::vector<std::string> & profiles);

rmw_ret_t
rmw_connextdds_list_client_qos_profiles(
rmw_context_impl_t * const ctx,
const char * const service_name,
std::vector<std::string> & req_profiles,
std::vector<std::string> & rep_profiles);

rmw_ret_t
rmw_connextdds_list_service_qos_profiles(
rmw_context_impl_t * const ctx,
const char * const service_name,
std::vector<std::string> & req_profiles,
std::vector<std::string> & rep_profiles);

/******************************************************************************
* Security Helpers
******************************************************************************/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,11 @@
#define RMW_CONNEXT_ENV_SECURITY_LOG_VERBOSITY "ROS_SECURITY_LOG_VERBOSITY"
#endif /* RMW_CONNEXT_ENV_SECURITY_LOG_VERBOSITY */

#ifndef RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY
#define RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY "RMW_CONNEXT_ENDPOINT_QOS_OVERRIDE_POLICY"
#endif /* RMW_CONNEXT_ENV_ALLOW_TOPIC_QOS_PROFILES */


/******************************************************************************
* DDS Implementation
* Select the DDS implementation used to build the RMW library.
Expand Down
68 changes: 48 additions & 20 deletions rmw_connextdds_common/src/common/rmw_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,58 @@ rmw_context_impl_t::initialize_participant(const bool localhost_only)
{
RMW_CONNEXT_LOG_DEBUG("initializing DDS DomainParticipant")

this->localhost_only = localhost_only;

/* Lookup RMW_CONNEXT_ENV_ALLOW_TOPIC_QOS_PROFILES env variable.*/
const char * endpoint_qos_policy = nullptr;
const char * lookup_rc = rcutils_get_env(
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY, &endpoint_qos_policy);

if (nullptr != lookup_rc || nullptr == endpoint_qos_policy) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to lookup from environment: "
"var=%s, "
"rc=%s ",
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
lookup_rc)
return RMW_RET_ERROR;
}

this->endpoint_qos_override_policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Always;
const char dds_topic_policy_prefix[] = "dds_topics: ";
const char never_policy[] = "never";
const char always_policy[] = "always";
if (
0 == strncmp(
endpoint_qos_policy, dds_topic_policy_prefix, sizeof(dds_topic_policy_prefix) - 1u))
{
this->endpoint_qos_override_policy =
rmw_context_impl_t::endpoint_qos_override_policy_t::DDSTopics;
try {
this->endpoint_qos_override_policy_topics_regex = &endpoint_qos_policy[sizeof(dds_topic_policy_prefix) - 1u];
} catch (std::regex_error & err) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"regex expression provided in {%s} environment variable is invalid: %s\n",
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
err.what());
return RMW_RET_ERROR;
}
} else if (0 == strcmp(endpoint_qos_policy, never_policy)) {
this->endpoint_qos_override_policy = rmw_context_impl_t::endpoint_qos_override_policy_t::Never;
} else if (endpoint_qos_policy[0] != '\0' && strcmp(endpoint_qos_policy, always_policy) != 0) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"Environment variable {%s} has an unexpected value {%s}. "
"Allowed values are {always}, {never} or {dds_topics: <regex_expression>}.\n",
RMW_CONNEXT_ENV_ENDPOINT_QOS_OVERRIDE_POLICY,
endpoint_qos_policy);
return RMW_RET_ERROR;
}

if (nullptr == RMW_Connext_gv_DomainParticipantFactory) {
RMW_CONNEXT_LOG_ERROR("DDS DomainParticipantFactory not initialized")
return RMW_RET_ERROR;
}

this->localhost_only = localhost_only;

DDS_DomainId_t domain_id =
static_cast<DDS_DomainId_t>(this->domain_id);

Expand Down Expand Up @@ -779,27 +824,10 @@ rmw_api_connextdds_init(
// context->actual_domain_id in rmw_context_impl_t::initialize_node()
ctx->domain_id = actual_domain_id;

/* Lookup name of custom QoS library - NOT USED FOR ANYTHING YET */
const char * qos_library = nullptr;
const char * lookup_rc =
rcutils_get_env(RMW_CONNEXT_ENV_QOS_LIBRARY, &qos_library);

if (nullptr != lookup_rc || nullptr == qos_library) {
RMW_CONNEXT_LOG_ERROR_A_SET(
"failed to lookup from environment: "
"var=%s, "
"rc=%s ",
RMW_CONNEXT_ENV_QOS_LIBRARY,
lookup_rc)
return RMW_RET_ERROR;
}

ctx->qos_library = qos_library;

// All publishers will use asynchronous publish mode unless
// RMW_CONNEXT_ENV_USE_DEFAULT_PUBLISH_MODE is set.
const char * use_default_publish_mode_env = nullptr;
lookup_rc = rcutils_get_env(
const char * lookup_rc = rcutils_get_env(
RMW_CONNEXT_ENV_USE_DEFAULT_PUBLISH_MODE, &use_default_publish_mode_env);

if (nullptr != lookup_rc || nullptr == use_default_publish_mode_env) {
Expand Down
Loading