Skip to content

Commit

Permalink
Add support for taking a sequence of messages (#614)
Browse files Browse the repository at this point in the history
* Add support for taking a sequence of messages

Signed-off-by: Michael Carroll <[email protected]>

* Fix uncrustify

Signed-off-by: Michael Carroll <[email protected]>

* Apply suggestions from code review

Co-Authored-By: Jacob Perron <[email protected]>
Signed-off-by: Michael Carroll <[email protected]>

* Address reviewer feedback.

Signed-off-by: Michael Carroll <[email protected]>

* Remove vertical whitespace

Signed-off-by: Michael Carroll <[email protected]>

* Address reviewer feedback

Signed-off-by: Michael Carroll <[email protected]>

* Remove blank line

Signed-off-by: Michael Carroll <[email protected]>

Co-authored-by: Jacob Perron <[email protected]>
Signed-off-by: Jorge Perez <[email protected]>
  • Loading branch information
2 people authored and Blast545 committed Apr 24, 2020
1 parent 45bd449 commit 959d02e
Showing 1 changed file with 155 additions and 1 deletion.
156 changes: 155 additions & 1 deletion rcl/test/rcl/test_subscription.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -507,7 +507,7 @@ TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_subscription
const rosidl_message_type_support_t * ts =
ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings);
const char * topic = "rcl_loan";
rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options();
rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options();
ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
Expand Down Expand Up @@ -566,6 +566,160 @@ TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_subscription
}
}



/* Basic nominal test of a subscription taking a sequence.
*/
TEST_F(
CLASSNAME(
TestSubscriptionFixture,
RMW_IMPLEMENTATION), test_subscription_nominal_string_sequence) {
rcl_ret_t ret;
rcl_publisher_t publisher = rcl_get_zero_initialized_publisher();
const rosidl_message_type_support_t * ts =
ROSIDL_GET_MSG_TYPE_SUPPORT(test_msgs, msg, Strings);
const char * topic = "rcl_test_subscription_nominal_string_sequence_chatter";
rcl_publisher_options_t publisher_options = rcl_publisher_get_default_options();
ret = rcl_publisher_init(&publisher, this->node_ptr, ts, topic, &publisher_options);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_publisher_fini(&publisher, this->node_ptr);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
});
rcl_subscription_t subscription = rcl_get_zero_initialized_subscription();
rcl_subscription_options_t subscription_options = rcl_subscription_get_default_options();
ret = rcl_subscription_init(&subscription, this->node_ptr, ts, topic, &subscription_options);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rcl_ret_t ret = rcl_subscription_fini(&subscription, this->node_ptr);
EXPECT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
});
// TODO(wjwwood): add logic to wait for the connection to be established
// probably using the count_subscriptions busy wait mechanism
// until then we will sleep for a short period of time
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
const char * test_string = "testing";
{
test_msgs__msg__Strings msg;
test_msgs__msg__Strings__init(&msg);
ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
test_msgs__msg__Strings__fini(&msg);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
}
bool success;
wait_for_subscription_to_be_ready(&subscription, context_ptr, 10, 100, success);
ASSERT_TRUE(success);
auto allocator = rcutils_get_default_allocator();
{
size_t size = 1;
rmw_message_info_sequence_t message_infos;
rmw_message_info_sequence_init(&message_infos, size, &allocator);

rmw_message_sequence_t messages;
rmw_message_sequence_init(&messages, size, &allocator);

auto seq = test_msgs__msg__Strings__Sequence__create(size);

for (size_t ii = 0; ii < size; ++ii) {
messages.data[ii] = &seq->data[ii];
}

OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rmw_message_info_sequence_fini(&message_infos);
rmw_message_sequence_fini(&messages);
test_msgs__msg__Strings__Sequence__destroy(seq);
});

// Attempt to take more than capacity allows.
ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr);
ASSERT_EQ(RCL_RET_INVALID_ARGUMENT, ret) << rcl_get_error_string().str;

ASSERT_EQ(0u, messages.size);
ASSERT_EQ(0u, message_infos.size);
}

{
size_t size = 5;
rmw_message_info_sequence_t message_infos;
rmw_message_info_sequence_init(&message_infos, size, &allocator);

rmw_message_sequence_t messages;
rmw_message_sequence_init(&messages, size, &allocator);

auto seq = test_msgs__msg__Strings__Sequence__create(size);

for (size_t ii = 0; ii < size; ++ii) {
messages.data[ii] = &seq->data[ii];
}

OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rmw_message_info_sequence_fini(&message_infos);
rmw_message_sequence_fini(&messages);
test_msgs__msg__Strings__Sequence__destroy(seq);
});

ret = rcl_take_sequence(&subscription, 5, &messages, &message_infos, nullptr);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
ASSERT_EQ(3u, messages.size);
ASSERT_EQ(3u, message_infos.size);
}

{
test_msgs__msg__Strings msg;
test_msgs__msg__Strings__init(&msg);
ASSERT_TRUE(rosidl_runtime_c__String__assign(&msg.string_value, test_string));
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
ret = rcl_publish(&publisher, &msg, nullptr);
test_msgs__msg__Strings__fini(&msg);
ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
}

// Give a brief moment for publications to go through.
std::this_thread::sleep_for(std::chrono::milliseconds(500));
// Take fewer messages than are available in the subscription
{
size_t size = 3;
rmw_message_info_sequence_t message_infos;
rmw_message_info_sequence_init(&message_infos, size, &allocator);

rmw_message_sequence_t messages;
rmw_message_sequence_init(&messages, size, &allocator);

auto seq = test_msgs__msg__Strings__Sequence__create(size);

for (size_t ii = 0; ii < size; ++ii) {
messages.data[ii] = &seq->data[ii];
}

OSRF_TESTING_TOOLS_CPP_SCOPE_EXIT(
{
rmw_message_info_sequence_fini(&message_infos);
rmw_message_sequence_fini(&messages);
test_msgs__msg__Strings__Sequence__destroy(seq);
});

ret = rcl_take_sequence(&subscription, 3, &messages, &message_infos, nullptr);

ASSERT_EQ(RCL_RET_OK, ret) << rcl_get_error_string().str;
ASSERT_EQ(3u, messages.size);
ASSERT_EQ(3u, message_infos.size);

ASSERT_EQ(
std::string(test_string),
std::string(seq->data[0].string_value.data, seq->data[0].string_value.size));
}
}

TEST_F(CLASSNAME(TestSubscriptionFixture, RMW_IMPLEMENTATION), test_get_options) {
rcl_ret_t ret;
const rosidl_message_type_support_t * ts =
Expand Down

0 comments on commit 959d02e

Please sign in to comment.