From 7c6ae38c0d4867ee4a79054c4212d67304965f5d Mon Sep 17 00:00:00 2001 From: Michel Hidalgo Date: Tue, 15 Oct 2019 13:38:24 -0300 Subject: [PATCH] Remove test_ros2cli package. Signed-off-by: Michel Hidalgo --- test_ros2cli/CMakeLists.txt | 129 ----- test_ros2cli/COLCON_IGNORE | 0 test_ros2cli/package.xml | 35 -- test_ros2cli/test/cli_test_configuration.py | 102 ---- test_ros2cli/test/fixture_actions.py | 94 ---- .../test/ros2action_test_configuration.py | 198 ------- .../test/ros2interface_test_configuration.py | 159 ------ .../test/ros2msg_test_configuration.py | 87 --- .../test/ros2node_test_configuration.py | 107 ---- .../test/ros2service_test_configuration.py | 209 -------- .../test/ros2srv_test_configuration.py | 92 ---- .../test/ros2topic_test_configuration.py | 498 ------------------ test_ros2cli/test/test_cli.py.in | 139 ----- 13 files changed, 1849 deletions(-) delete mode 100644 test_ros2cli/CMakeLists.txt delete mode 100644 test_ros2cli/COLCON_IGNORE delete mode 100644 test_ros2cli/package.xml delete mode 100644 test_ros2cli/test/cli_test_configuration.py delete mode 100644 test_ros2cli/test/fixture_actions.py delete mode 100644 test_ros2cli/test/ros2action_test_configuration.py delete mode 100644 test_ros2cli/test/ros2interface_test_configuration.py delete mode 100644 test_ros2cli/test/ros2msg_test_configuration.py delete mode 100644 test_ros2cli/test/ros2node_test_configuration.py delete mode 100644 test_ros2cli/test/ros2service_test_configuration.py delete mode 100644 test_ros2cli/test/ros2srv_test_configuration.py delete mode 100644 test_ros2cli/test/ros2topic_test_configuration.py delete mode 100644 test_ros2cli/test/test_cli.py.in diff --git a/test_ros2cli/CMakeLists.txt b/test_ros2cli/CMakeLists.txt deleted file mode 100644 index e0e5b8cd6..000000000 --- a/test_ros2cli/CMakeLists.txt +++ /dev/null @@ -1,129 +0,0 @@ -cmake_minimum_required(VERSION 3.5) - -project(test_ros2cli) - -# Default to C++14 -if(NOT CMAKE_CXX_STANDARD) - set(CMAKE_CXX_STANDARD 14) -endif() -if(CMAKE_COMPILER_IS_GNUCXX OR CMAKE_CXX_COMPILER_ID MATCHES "Clang") - add_compile_options(-Wall -Wextra -Wpedantic) -endif() - -find_package(ament_cmake REQUIRED) - -if(BUILD_TESTING) - find_package(ament_lint_auto REQUIRED) - ament_lint_auto_find_test_dependencies() - - find_package(ros_testing REQUIRED) - find_package(rmw_implementation_cmake REQUIRED) - - macro(add_custom_cli_test test_name) - # Set variables for configuring the test files. - cmake_parse_arguments(TEST "" "CONFIG_MODULE;SETUP_DELAY;TIMEOUT" "" ${ARGN}) - # Name for test cases - set(TEST_NAME ${test_name}) - # RMW implementation for test cases - set(TEST_RMW_IMPLEMENTATION ${rmw_implementation}) - # Module carrying test configuration - if(NOT DEFINED TEST_CONFIG_MODULE) - set(TEST_CONFIG_MODULE ${test_name}_config) - endif() - # Setup delay for test fixture actions - if(NOT DEFINED TEST_SETUP_DELAY) - set(TEST_SETUP_DELAY 2.0) - if("${TEST_RMW_IMPLEMENTATION}" STREQUAL "rmw_connext_cpp") - # Connext startup is too slow. It needs a few extra seconds till - # discovery completes. - set(TEST_SETUP_DELAY 5.0) - endif() - endif() - # Timeout for test itself - if(NOT DEFINED TEST_TIMEOUT) - set(TEST_TIMEOUT 30) - endif() - - - - configure_file( - test/test_cli.py.in - ${test_name}${target_suffix}.py.genexp - @ONLY - ) - file(GENERATE - OUTPUT "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$.py" - INPUT "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}.py.genexp" - ) - install( - FILES test/${TEST_CONFIG_MODULE}.py - DESTINATION "${CMAKE_CURRENT_BINARY_DIR}" - ) - add_ros_test( - "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$.py" - TARGET ${test_name}${target_suffix} - ENV RMW_IMPLEMENTATION=${rmw_implementation} - APPEND_LIBRARY_DIRS "${append_library_dirs}" - TIMEOUT ${TEST_TIMEOUT} - ) - list( - APPEND generated_python_files - "${CMAKE_CURRENT_BINARY_DIR}/${test_name}${target_suffix}_$.py" - ) - endmacro() - - set(generated_python_files) - macro(custom_cli_tests) - add_custom_cli_test( - test_ros2action - CONFIG_MODULE ros2action_test_configuration - TIMEOUT 300) - add_custom_cli_test( - test_ros2topic - CONFIG_MODULE ros2topic_test_configuration - TIMEOUT 300) - add_custom_cli_test( - test_ros2service - CONFIG_MODULE ros2service_test_configuration - TIMEOUT 300) - add_custom_cli_test( - test_ros2node - CONFIG_MODULE ros2node_test_configuration - TIMEOUT 240) - add_custom_cli_test( - test_ros2msg - CONFIG_MODULE ros2msg_test_configuration - TIMEOUT 240) - add_custom_cli_test( - test_ros2srv - CONFIG_MODULE ros2srv_test_configuration - TIMEOUT 240) - add_custom_cli_test( - test_ros2interface - CONFIG_MODULE ros2interface_test_configuration - TIMEOUT 240) - endmacro() - install( - FILES - test/cli_test_configuration.py - test/fixture_actions.py - DESTINATION "${CMAKE_CURRENT_BINARY_DIR}" - ) - - set(append_library_dirs "${CMAKE_CURRENT_BINARY_DIR}") - if(WIN32) - set(append_library_dirs "${append_library_dirs}/$") - endif() - - call_for_each_rmw_implementation(custom_cli_tests) - - find_package(ament_cmake_flake8 REQUIRED) - ament_flake8( - TESTNAME "flake8_generated_launch" - # the generated code might contain longer lines for templated types - MAX_LINE_LENGTH 999 - ${generated_python_files} - ) -endif() - -ament_package() diff --git a/test_ros2cli/COLCON_IGNORE b/test_ros2cli/COLCON_IGNORE deleted file mode 100644 index e69de29bb..000000000 diff --git a/test_ros2cli/package.xml b/test_ros2cli/package.xml deleted file mode 100644 index 16c24f550..000000000 --- a/test_ros2cli/package.xml +++ /dev/null @@ -1,35 +0,0 @@ - - - - test_ros2cli - 0.8.2 - - Tests for ROS2 command line tools. - - Ivan Paunovic - Apache License 2.0 - - action_tutorials_interfaces - action_tutorials_cpp - ament_lint_auto - ament_lint_common - demo_nodes_py - geometry_msgs - ros_testing - ros2action - ros2cli - ros2interface - ros2msg - ros2node - ros2param - ros2service - ros2srv - ros2topic - std_msgs - std_srvs - test_msgs - - - ament_cmake - - diff --git a/test_ros2cli/test/cli_test_configuration.py b/test_ros2cli/test/cli_test_configuration.py deleted file mode 100644 index 966ab2792..000000000 --- a/test_ros2cli/test/cli_test_configuration.py +++ /dev/null @@ -1,102 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -from typing import Callable -from typing import Iterable -from typing import Mapping -from typing import Optional -from typing import Pattern -from typing import Union - -from launch import Action -from launch import SomeSubstitutionsType -from launch.utilities import normalize_to_list_of_substitutions - -import launch_testing.asserts - -ExpectedOutputType = Union[ - Iterable[Union[str, Pattern[str], Callable[[str], bool]]], - Callable[[Iterable[str]], bool] -] -SomeExpectedOutputType = Union[ExpectedOutputType, Mapping[str, ExpectedOutputType]] - - -class CLITestConfiguration(object): - """A class for configuring the `test_process_output_customizable.py.in` test.""" - - def __init__( - self, - *, - command: str, - arguments: Iterable[SomeSubstitutionsType], - description: Optional[str] = None, - fixture_actions: Optional[Iterable[Action]] = None, - setup_delay: Optional[int] = None, - expected_output: Optional[SomeExpectedOutputType] = None, - output_filter: Optional[Callable[[str], str]] = None, - exit_codes: Optional[Iterable[int]] = None, - self_terminates: bool = True, - timeout: int = 30. - ): - """ - Constructor. - - :param command: `ros2` command to be tested. - :param arguments: A list of `SomeSubstitutionsType`, which are passed - as arguments to the command. - :param description: description of the test being done. command to be tested. - It usually contains the verb and arguments being tested. - :param fixture_actions: A list of actions, which are launched before the `ros2` - command under test. - :param setup_delay: A period in seconds to wait for fixture actions to start up - before launching the `ros2` command under test. If none is given, an appropriate - default is automatically provided. - :param expected_output: Output expectations for the `ros2` command under test and/or - fixture actions. In its most explicit form it is a mapping from process names to - either a list of expected lines, which may be expressed as plain strings, compiled - regexes, or a callable for custom line assertion, or a callable that takes all lines - for custom output assertions. Empty lists express that no output is expected. The - `ros2` command under test can be referenced as 'cli'. Alternatively, this mapping - can be elided if expectations are only defined for the `ros2` command under test. - In that case, only the corresponding mapping value has to be provided. - :param output_filter: A callable to filter output before testing it against expectations. - :param exit_codes: A list of allowed exit codes for the `ros2` command under test. - :param self_terminates: A flag for command termination policy. Non-self terminating - commands are forced to terminate after the specified timeout has elapsed. - :param timeout: A finite test timeout in seconds. - """ - self.command = command - self.arguments = [normalize_to_list_of_substitutions(arg) for arg in arguments] - if description is None: - description = 'ros2 {} {}'.format( - command, ' '.join([ - ''.join([sub.describe() for sub in some_subs]) for some_subs in self.arguments - ]) - ) - self.description = description - if fixture_actions is None: - fixture_actions = [] - self.fixture_actions = fixture_actions - self.expected_output = expected_output - self.output_filter = output_filter - if exit_codes is None: - exit_codes = [launch_testing.asserts.EXIT_OK] - self.exit_codes = exit_codes - self.self_terminates = self_terminates - self.setup_delay = setup_delay - self.timeout = timeout - - def __repr__(self): - """Return the description.""" - return self.description diff --git a/test_ros2cli/test/fixture_actions.py b/test_ros2cli/test/fixture_actions.py deleted file mode 100644 index f381edbe3..000000000 --- a/test_ros2cli/test/fixture_actions.py +++ /dev/null @@ -1,94 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import sys - -from launch.actions import ExecuteProcess -from launch.substitutions import LaunchConfiguration -from launch_ros.actions import Node -from launch_ros.substitutions import ExecutableInPackage - -import yaml - - -def get_talker_node_action(*, node_name='my_talker', - node_namespace='my_ns', - topic_name='chatter'): - return Node( - package='demo_nodes_py', node_executable='talker', node_name=node_name, name=node_name, - node_namespace=node_namespace, remappings=[('chatter', topic_name)], output='screen', - sigterm_timeout=LaunchConfiguration('sigterm_timeout', default=30) - ) - - -def get_publisher_node_action(*, topic_name='/my_ns/test', topic_type='test_msgs/msg/BasicTypes'): - return ExecuteProcess( - cmd=['ros2', 'topic', 'pub', topic_name, topic_type], - name='publisher', output='screen', - sigterm_timeout=LaunchConfiguration( - 'sigterm_timeout', default=60 - ) - ) - - -def get_dummy_base_controller_node_action(*, topic_name='/my_ns/cmd_vel', start_time=0.0): - return ExecuteProcess( - cmd=[ - 'ros2', 'topic', 'pub', - '-r', '1', topic_name, - 'geometry_msgs/msg/TwistStamped', - yaml.dump({ - 'header': { - 'stamp': { - 'sec': int(start_time), - 'nanosec': int((start_time - int(start_time)) * 1e9) - } - } - }, default_flow_style=True).replace('\n', '') - ], name='dummy_base_controller', output='screen', - sigterm_timeout=LaunchConfiguration( - 'sigterm_timeout', default=60 - ) - ) - - -def get_listener_node_action(*, node_name='my_listener', - node_namespace='my_ns', - topic_name='chatter'): - return Node( - package='demo_nodes_py', node_executable='listener', node_name=node_name, name=node_name, - node_namespace=node_namespace, remappings=[('chatter', topic_name)], output='screen', - sigterm_timeout=LaunchConfiguration('sigterm_timeout', default=30) - ) - - -def get_add_two_ints_server_action(*, node_name='my_add_two_ints_server', - node_namespace='my_ns', - service_name='add_two_ints'): - return Node( - package='demo_nodes_py', node_executable='add_two_ints_server', - node_name=node_name, node_namespace=node_namespace, - output='screen', remappings=[('add_two_ints', service_name)], - sigterm_timeout=LaunchConfiguration('sigterm_timeout', default=30) - ) - - -def get_fibonacci_action_server_node_action(): - return ExecuteProcess( - cmd=[ - sys.executable, - ExecutableInPackage('fibonacci_action_server.py', 'action_tutorials'), - ], - sigterm_timeout=LaunchConfiguration('sigterm_timeout', default=30) - ) diff --git a/test_ros2cli/test/ros2action_test_configuration.py b/test_ros2cli/test/ros2action_test_configuration.py deleted file mode 100644 index a640ce41a..000000000 --- a/test_ros2cli/test/ros2action_test_configuration.py +++ /dev/null @@ -1,198 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import re -import sys - -from launch_testing_ros.tools import basic_output_filter - -import yaml - -sys.path.append(os.path.dirname(__file__)) - -from cli_test_configuration import CLITestConfiguration # noqa -from fixture_actions import get_fibonacci_action_server_node_action # noqa - - -some_actions_from_action_tutorials = [ - 'action_tutorials/action/Fibonacci' -] - - -def get_fibonacci_send_goal_output(*, order=1, with_feedback=False): - assert order > 0 - output = [ - 'Waiting for an action server to become available...', - 'Sending goal:', - ' order: {}'.format(order), - '', - re.compile('Goal accepted with ID: [a-f0-9]+'), - '', - ] - sequence = [0, 1] - for _ in range(order - 1): - sequence.append(sequence[-1] + sequence[-2]) - if with_feedback: - output.append('Feedback:') - output.extend((' ' + yaml.dump({ - 'partial_sequence': sequence - })).splitlines()) - output.append('') - output.append('Result:'), - output.extend((' ' + yaml.dump({ - 'sequence': sequence - })).splitlines()) - output.append('') - output.append('Goal finished with status: SUCCEEDED') - return output - - -def get_fibonacci_info_output(*, server_name='/fibonacci_action_server', - count_only=False, include_types=False): - output = [ - 'Action: /fibonacci', - 'Action clients: 0', - 'Action servers: 1', - ] - if not count_only: - server_info = server_name - if include_types: - server_info += ' [action_tutorials/action/Fibonacci]' - output.append(server_info) - return output - - -def get_test_configurations(rmw_implementation): - return [ - CLITestConfiguration( - command='action', - arguments=['info', '/not_an_action'], - expected_output=[ - 'Action: /not_an_action', - 'Action clients: 0', - 'Action servers: 0', - ], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='action', - arguments=['info', '/fibonacci'], - fixture_actions=[get_fibonacci_action_server_node_action()], - expected_output=get_fibonacci_info_output(), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='action', - arguments=['info', '-t', '/fibonacci'], - fixture_actions=[get_fibonacci_action_server_node_action()], - expected_output=get_fibonacci_info_output(include_types=True), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='action', - arguments=['info', '-c', '/fibonacci'], - fixture_actions=[get_fibonacci_action_server_node_action()], - expected_output=get_fibonacci_info_output(count_only=True), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='action', - arguments=['list'], - fixture_actions=[get_fibonacci_action_server_node_action()], - expected_output=['/fibonacci'], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='action', - arguments=['list', '-t'], - fixture_actions=[get_fibonacci_action_server_node_action()], - expected_output=['/fibonacci [action_tutorials/action/Fibonacci]'], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='action', - arguments=['list', '-c'], - fixture_actions=[get_fibonacci_action_server_node_action()], - expected_output=[lambda line: int(line) == 1], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='action', - arguments=[ - 'send_goal', - '/fibonacci', - 'action_tutorials/action/Fibonacci', - '{order: 5}' - ], - fixture_actions=[get_fibonacci_action_server_node_action()], - expected_output=get_fibonacci_send_goal_output(order=5), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='action', - arguments=[ - 'send_goal', - '-f', - '/fibonacci', - 'action_tutorials/action/Fibonacci', - '{order: 5}' - ], - fixture_actions=[get_fibonacci_action_server_node_action()], - expected_output=get_fibonacci_send_goal_output(order=5, with_feedback=True), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='action', - arguments=['show', 'action_tutorials/action/Fibonacci'], - expected_output=[ - 'int32 order', - '---', - 'int32[] sequence', - '---', - 'int32[] partial_sequence' - ], - ), - CLITestConfiguration( - command='action', - arguments=['show', 'not_a_package/action/Fibonacci'], - expected_output=['Unknown package name'], - exit_codes=[1] - ), - CLITestConfiguration( - command='action', - arguments=['show', 'action_tutorials/action/NotAnActionType'], - expected_output=['Unknown action type'], - exit_codes=[1] - ), - ] - diff --git a/test_ros2cli/test/ros2interface_test_configuration.py b/test_ros2cli/test/ros2interface_test_configuration.py deleted file mode 100644 index bf8f5d673..000000000 --- a/test_ros2cli/test/ros2interface_test_configuration.py +++ /dev/null @@ -1,159 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import itertools - -import os -import re -import sys - -from launch_testing_ros.tools import basic_output_filter - -sys.path.append(os.path.dirname(__file__)) - -from cli_test_configuration import CLITestConfiguration # noqa -from ros2msg_test_configuration import some_messages_from_std_msgs # noqa -from ros2srv_test_configuration import some_services_from_std_srvs # noqa -from ros2action_test_configuration import some_actions_from_action_tutorials # noqa - - -some_interfaces = (some_messages_from_std_msgs + - some_services_from_std_srvs + - some_actions_from_action_tutorials) - - -def get_test_configurations(*args, **kwargs): - return [ - CLITestConfiguration( - command='interface', - arguments=['list', '-m'], - expected_output=itertools.chain( - ['Messages:'], itertools.repeat(re.compile(r'.*/msg/.*')) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['list', '-s'], - expected_output=itertools.chain( - ['Services:'], itertools.repeat(re.compile(r'.*/srv/.*')) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['list', '-a'], - expected_output=itertools.chain( - ['Actions:'], itertools.repeat(re.compile(r'.*/action/.*')) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['list'], - expected_output=( - lambda lines: all( - any(interface in line for line in lines) - for interface in some_interfaces - ) - ), - ), - CLITestConfiguration( - command='interface', - arguments=['package', 'not_a_package'], - expected_output=['Unknown package not_a_package'], - exit_codes=[1] - ), - CLITestConfiguration( - command='interface', - arguments=['package', 'std_msgs'], - expected_output=( - lambda lines: ( - all(msg in lines for msg in some_messages_from_std_msgs) and - all(re.match(r'.*/msg/.*', line) is not None for line in lines) - ) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['package', 'action_tutorials'], - expected_output=( - lambda lines: ( - all(msg in lines for msg in some_actions_from_action_tutorials) and - all(re.match(r'.*/action/.*', line) is not None for line in lines) - ) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['package', 'rcl_interfaces'], - expected_output=( - lambda lines: ( - any(re.match(r'.*/msg/.*', line) is not None for line in lines) and - any(re.match(r'.*/srv/.*', line) is not None for line in lines) - ) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['packages', '-m'], - expected_output=( - lambda lines: ('std_msgs' in lines and - 'std_srvs' not in lines and - 'action_tutorials' not in lines) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['packages', '-s'], - expected_output=( - lambda lines: ('std_msgs' not in lines and - 'std_srvs' in lines and - 'action_tutorials' not in lines) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['packages', '-a'], - expected_output=( - lambda lines: ('std_msgs' not in lines and - 'std_srvs' not in lines and - 'action_tutorials' in lines) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['packages'], - expected_output=( - lambda lines: ('std_msgs' in lines and - 'std_srvs' in lines and - 'action_tutorials' in lines) - ) - ), - CLITestConfiguration( - command='interface', - arguments=['show', 'std_msgs/msg/String'], - expected_output=[ - '', - '', - 'module std_msgs {', - ' module msg {', - ' struct String {', - ' string data;', - ' };', - ' };', - '};' - ], - output_filter=basic_output_filter( - filtered_prefixes=['//'] - ), - ), - ] diff --git a/test_ros2cli/test/ros2msg_test_configuration.py b/test_ros2cli/test/ros2msg_test_configuration.py deleted file mode 100644 index c1c6167d2..000000000 --- a/test_ros2cli/test/ros2msg_test_configuration.py +++ /dev/null @@ -1,87 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import re -import sys - -sys.path.append(os.path.dirname(__file__)) - -from cli_test_configuration import CLITestConfiguration # noqa - - -some_messages_from_std_msgs = [ - 'std_msgs/msg/Bool', - 'std_msgs/msg/Float32', - 'std_msgs/msg/Float64', -] - - -def get_test_configurations(*args, **kwargs): - return [ - CLITestConfiguration( - command='msg', - arguments=['list'], - expected_output=( - lambda lines: ( - all(msg in lines for msg in some_messages_from_std_msgs) and - all(re.match(r'.*/msg/.*', line) is not None for line in lines) - ) - ) - ), - CLITestConfiguration( - command='msg', - arguments=['package', 'std_msgs'], - expected_output=( - lambda lines: ( - all(msg in lines for msg in some_messages_from_std_msgs) and - all(re.match(r'std_msgs/msg/.*', line) is not None for line in lines) - ) - ) - ), - CLITestConfiguration( - command='msg', - arguments=['package', 'not_a_package'], - expected_output=['Unknown package name'], - exit_codes=[1] - ), - CLITestConfiguration( - command='msg', - arguments=['packages'], - expected_output=lambda lines: 'std_msgs' in lines - ), - CLITestConfiguration( - command='msg', - arguments=['show', 'std_msgs/msg/String'], - expected_output=['string data'], - ), - CLITestConfiguration( - command='msg', - arguments=['show', 'std_msgs/msg/NotAMessageType'], - expected_output=['Unknown message name'], - exit_codes=[1] - ), - CLITestConfiguration( - command='msg', - arguments=['show', 'not_a_package/msg/String'], - expected_output=['Unknown package name'], - exit_codes=[1] - ), - CLITestConfiguration( - command='msg', - arguments=['show', 'not_a_message_type'], - expected_output=['The passed message type is invalid'], - exit_codes=[1] - ), - ] diff --git a/test_ros2cli/test/ros2node_test_configuration.py b/test_ros2cli/test/ros2node_test_configuration.py deleted file mode 100644 index f7232c8a2..000000000 --- a/test_ros2cli/test/ros2node_test_configuration.py +++ /dev/null @@ -1,107 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import itertools - -import os -import re -import sys - -from launch_testing_ros.tools import basic_output_filter - -sys.path.append(os.path.dirname(__file__)) - -from cli_test_configuration import CLITestConfiguration # noqa -from fixture_actions import get_talker_node_action # noqa -from fixture_actions import get_listener_node_action # noqa - - -def get_test_configurations(rmw_implementation): - return [ - CLITestConfiguration( - command='node', - arguments=['list'], - fixture_actions=[ - get_talker_node_action(), - get_listener_node_action(), - get_listener_node_action(node_name='_my_hidden_listener'), - ], - expected_output=[ - '/my_ns/my_listener', - '/my_ns/my_talker' - ], - output_filter=basic_output_filter( - filtered_patterns=['.*launch_ros.*'] - ) - ), - CLITestConfiguration( - command='node', - arguments=['list', '-a'], - fixture_actions=[ - get_talker_node_action(), - get_listener_node_action(), - get_listener_node_action(node_name='_my_hidden_listener'), - ], - expected_output=[ - '/my_ns/_my_hidden_listener', - '/my_ns/my_listener', - '/my_ns/my_talker', - ], - output_filter=basic_output_filter( - filtered_patterns=['.*launch_ros.*', '.*ros2cli.*'] - ) - ), - CLITestConfiguration( - command='node', - arguments=['list', '-c'], - fixture_actions=[ - get_talker_node_action(), - get_listener_node_action(), - get_listener_node_action(node_name='_my_hidden_listener'), - ], - # Fixture nodes plus launch_ros node. - expected_output=[lambda line: int(line) == 3], - ), - CLITestConfiguration( - command='node', - arguments=['list', '-c', '-a'], - fixture_actions=[ - get_talker_node_action(), - get_listener_node_action(), - get_listener_node_action(node_name='_my_hidden_listener'), - ], - # Fixture nodes plus launch_ros and CLI daemon nodes. - expected_output=[lambda line: int(line) > 3], - ), - CLITestConfiguration( - command='node', - arguments=['info', '/my_ns/my_talker'], - fixture_actions=[get_talker_node_action()], - expected_output=itertools.chain([ - '/my_ns/my_talker', - ' Subscribers:', - '', - ' Publishers:', - ' /my_ns/chatter: std_msgs/msg/String', - ' /my_ns/parameter_events: rcl_interfaces/msg/ParameterEvent', - ' /my_ns/rosout: rcl_interfaces/msg/Log', - ' Services:', - ], itertools.repeat(re.compile( - r'\s*/my_ns/my_talker/.*parameter.*: rcl_interfaces/srv/.*Parameter.*' - ), 6)), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ) - ] diff --git a/test_ros2cli/test/ros2service_test_configuration.py b/test_ros2cli/test/ros2service_test_configuration.py deleted file mode 100644 index 09bac3ea1..000000000 --- a/test_ros2cli/test/ros2service_test_configuration.py +++ /dev/null @@ -1,209 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import itertools - -import os -import re -import sys - -from launch_testing_ros.tools import basic_output_filter - -sys.path.append(os.path.dirname(__file__)) - -from cli_test_configuration import CLITestConfiguration # noqa -from fixture_actions import get_add_two_ints_server_action # noqa - - -def get_add_two_ints_call_output(*, a=0, b=0): - return [ - 'requester: making request: ' + - 'example_interfaces.srv.AddTwoInts_Request(a={}, b={})'.format(a, b), - '', - 'response:', - 'example_interfaces.srv.AddTwoInts_Response(sum={})'.format(a + b), - '' - ] - - -def get_test_configurations(rmw_implementation): - return [ - CLITestConfiguration( - command='service', - arguments=['list'], - fixture_actions=[get_add_two_ints_server_action(), - get_add_two_ints_server_action(service_name='_add_two_ints')], - expected_output=itertools.chain( - # Cope with launch internal ROS 2 node. - itertools.repeat(re.compile(r'/launch_ros/.*parameter.*'), 6), - ['/my_ns/add_two_ints'], - itertools.repeat(re.compile( - r'/my_ns/my_add_two_ints_server/.*parameter.*' - ), 6) - ), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=['--include-hidden-services', 'list'], - fixture_actions=[get_add_two_ints_server_action(), - get_add_two_ints_server_action(service_name='_add_two_ints')], - expected_output=itertools.chain( - # Cope with launch internal ROS 2 node. - itertools.repeat(re.compile(r'/launch_ros/.*parameter.*'), 6), - ['/my_ns/_add_two_ints', '/my_ns/add_two_ints'], - itertools.repeat(re.compile( - r'/my_ns/my_add_two_ints_server/.*parameter.*' - ), 6) - ), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=['list', '-t'], - fixture_actions=[get_add_two_ints_server_action()], - expected_output=itertools.chain( - # Cope with launch internal ROS 2 node. - itertools.repeat(re.compile( - r'/launch_ros/.*parameter.* \[rcl_interfaces/srv/.*Parameter.*\]' - ), 6), - ['/my_ns/add_two_ints [example_interfaces/srv/AddTwoInts]'], - itertools.repeat(re.compile( - r'/my_ns/my_add_two_ints_server/.*parameter.*' - r' \[rcl_interfaces/srv/.*Parameter.*\]' - ), 6) - ), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=['list', '-c'], - fixture_actions=[get_add_two_ints_server_action()], - expected_output=[lambda line: int(line) > 0], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=['find', 'example_interfaces/srv/AddTwoInts'], - fixture_actions=[get_add_two_ints_server_action()], - expected_output=['/my_ns/add_two_ints'], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=['find', '-c', 'example_interfaces/srv/AddTwoInts'], - fixture_actions=[get_add_two_ints_server_action()], - expected_output=[lambda line: int(line) == 1], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=['find', 'not_a_service_type'], - expected_output=None, - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=['find', '--include-hidden-services', - 'example_interfaces/srv/AddTwoInts'], - fixture_actions=[get_add_two_ints_server_action(), - get_add_two_ints_server_action(service_name='_add_two_ints')], - expected_output=['/my_ns/_add_two_ints', '/my_ns/add_two_ints'], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=['type', '/not_a_service'], - expected_output=None, - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - exit_codes=[1] - ), - CLITestConfiguration( - command='service', - arguments=['type', '/my_ns/add_two_ints'], - fixture_actions=[get_add_two_ints_server_action()], - expected_output=['example_interfaces/srv/AddTwoInts'], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=['call', '/not_a_service', 'not_a_service_type'], - expected_output=['The passed service type is invalid'], - exit_codes=[1] - ), - CLITestConfiguration( - command='service', - arguments=['call', '/my_ns/add_two_ints', 'example_interfaces/srv/AddTwoInts'], - fixture_actions=[get_add_two_ints_server_action()], - expected_output=get_add_two_ints_call_output(), - output_filter=basic_output_filter( - filtered_prefixes=['waiting for service to become available...'], - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=[ - 'call', - '/my_ns/add_two_ints', - 'example_interfaces/srv/AddTwoInts', - '{a: 1, b: -1}' - ], - fixture_actions=[get_add_two_ints_server_action()], - expected_output=get_add_two_ints_call_output(a=1, b=-1), - output_filter=basic_output_filter( - filtered_prefixes=['waiting for service to become available...'], - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='service', - arguments=[ - 'call', - '-r', '1', - '/my_ns/add_two_ints', - 'example_interfaces/srv/AddTwoInts', - '{a: 1, b: 1}' - ], - fixture_actions=[get_add_two_ints_server_action()], - expected_output=itertools.cycle(get_add_two_ints_call_output(a=1, b=1)), - output_filter=basic_output_filter( - filtered_prefixes=['waiting for service to become available...'], - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=1.0 - ) - ] diff --git a/test_ros2cli/test/ros2srv_test_configuration.py b/test_ros2cli/test/ros2srv_test_configuration.py deleted file mode 100644 index 46938ef48..000000000 --- a/test_ros2cli/test/ros2srv_test_configuration.py +++ /dev/null @@ -1,92 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import re -import sys - -sys.path.append(os.path.dirname(__file__)) - -from cli_test_configuration import CLITestConfiguration # noqa - - -some_services_from_std_srvs = [ - 'std_srvs/srv/Empty', - 'std_srvs/srv/SetBool', - 'std_srvs/srv/Trigger', -] - - -def get_test_configurations(*args, **kwargs): - return [ - CLITestConfiguration( - command='srv', - arguments=['list'], - expected_output=( - lambda lines: ( - all(srv in lines for srv in some_services_from_std_srvs) and - all(re.match(r'.*/srv/.*', line) is not None for line in lines) - ) - ) - ), - CLITestConfiguration( - command='srv', - arguments=['package', 'std_srvs'], - expected_output=( - lambda lines: ( - all(srv in lines for srv in some_services_from_std_srvs) and - all(re.match(r'std_srvs/srv/.*', line) is not None for line in lines) - ) - ) - ), - CLITestConfiguration( - command='srv', - arguments=['package', 'not_a_package'], - expected_output=['Unknown package name'], - exit_codes=[1] - ), - CLITestConfiguration( - command='srv', - arguments=['packages'], - expected_output=lambda lines: 'std_srvs' in lines - ), - CLITestConfiguration( - command='srv', - arguments=['show', 'std_srvs/srv/SetBool'], - expected_output=['bool data', '---', 'bool success', 'string message'] - ), - CLITestConfiguration( - command='srv', - arguments=['show', 'std_srvs/srv/Trigger'], - expected_output=['---', 'bool success', 'string message'] - ), - CLITestConfiguration( - command='srv', - arguments=['show', 'std_srvs/srv/NotAServiceType'], - expected_output=['Unknown service name'], - exit_codes=[1] - ), - CLITestConfiguration( - command='srv', - arguments=['show', 'not_a_package/srv/Empty'], - expected_output=['Unknown package name'], - exit_codes=[1] - ), - CLITestConfiguration( - command='srv', - arguments=['show', 'not_a_service_type'], - expected_output=['The passed service type is invalid'], - exit_codes=[1] - ), - ] diff --git a/test_ros2cli/test/ros2topic_test_configuration.py b/test_ros2cli/test/ros2topic_test_configuration.py deleted file mode 100644 index 43257aa37..000000000 --- a/test_ros2cli/test/ros2topic_test_configuration.py +++ /dev/null @@ -1,498 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import itertools - -import os -import re -import sys -import time - -from launch_testing_ros.tools import basic_output_filter - -sys.path.append(os.path.dirname(__file__)) - -from cli_test_configuration import CLITestConfiguration # noqa -from fixture_actions import get_talker_node_action # noqa -from fixture_actions import get_listener_node_action # noqa -from fixture_actions import get_dummy_base_controller_node_action # noqa -from fixture_actions import get_publisher_node_action # noqa - - -def get_test_configurations(rmw_implementation): - now = time.time() - - return [ - CLITestConfiguration( - command='topic', - arguments=['list'], - fixture_actions=[ - get_talker_node_action(), - get_talker_node_action(topic_name='_chatter') - ], - expected_output=[ - '/my_ns/chatter', - '/my_ns/parameter_events', - '/my_ns/rosout', - '/parameter_events', - '/rosout' - ], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='topic', - arguments=['list', '--include-hidden-topics'], - fixture_actions=[ - get_talker_node_action(), - get_talker_node_action(topic_name='_chatter') - ], - expected_output=[ - '/my_ns/_chatter', - '/my_ns/chatter', - '/my_ns/parameter_events', - '/my_ns/rosout', - '/parameter_events', - '/rosout' - ], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='topic', - arguments=['list', '-t'], - fixture_actions=[get_talker_node_action()], - expected_output=[ - '/my_ns/chatter [std_msgs/msg/String]', - '/my_ns/parameter_events [rcl_interfaces/msg/ParameterEvent]', - '/my_ns/rosout [rcl_interfaces/msg/Log]', - '/parameter_events [rcl_interfaces/msg/ParameterEvent]', - '/rosout [rcl_interfaces/msg/Log]' - ], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='topic', - arguments=['list', '-c'], - fixture_actions=[get_talker_node_action()], - expected_output=[lambda line: int(line) == 5], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='topic', - arguments=['info', '/my_ns/chatter'], - fixture_actions=[get_talker_node_action()], - expected_output=[ - 'Topic: /my_ns/chatter', - 'Publisher count: 1', - 'Subscriber count: 0' - ], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='topic', - arguments=['info', '/some_chatter'], - fixture_actions=[get_talker_node_action()], - expected_output=[ - 'Topic: /some_chatter', - 'Publisher count: 0', - 'Subscriber count: 0' - ], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='topic', - arguments=['type', '/my_ns/chatter'], - fixture_actions=[get_talker_node_action()], - expected_output=['std_msgs/msg/String'], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='topic', - arguments=['type', '/my_ns/_chatter'], - fixture_actions=[get_talker_node_action(topic_name='_chatter')], - expected_output=None, - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - exit_codes=[1] - ), - CLITestConfiguration( - command='topic', - arguments=['find', 'rcl_interfaces/msg/Log'], - fixture_actions=[get_talker_node_action()], - expected_output=[ - '/my_ns/rosout', - '/rosout' - ] - ), - CLITestConfiguration( - command='topic', - arguments=['find', 'rcl_interfaces/msg/NotAMessageType'], - fixture_actions=[get_talker_node_action()], - expected_output=None, - ), - CLITestConfiguration( - command='topic', - arguments=['echo', '/my_ns/chatter'], - fixture_actions=[get_talker_node_action()], - expected_output=itertools.cycle([ - re.compile(r"data: 'Hello World: \d+'"), - '---' - ]), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=['echo', '--no-str', '/my_ns/chatter'], - fixture_actions=[get_talker_node_action()], - expected_output=itertools.cycle([ - re.compile(r"data: '>'"), - '---' - ]), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=['echo', '--csv', '/my_ns/test_defaults'], - fixture_actions=[get_publisher_node_action( - topic_name='/my_ns/test_defaults', - topic_type='test_msgs/msg/Defaults' - )], - expected_output=itertools.repeat( - "True,b'2',100,1.125,1.125,-50,200,-1000,2000,-30000,60000,-40000000,50000000" - ), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=['echo', '--no-arr', '/my_ns/test_arrays'], - fixture_actions=[get_publisher_node_action( - topic_name='/my_ns/test_arrays', - topic_type='test_msgs/msg/Arrays' - )], - expected_output=itertools.cycle([ - "byte_values: ''", - "float64_values: ''", - "int8_values: ''", - "uint8_values: ''", - "int16_values: ''", - "uint16_values: ''", - "int32_values: ''", - "uint32_values: ''", - "int64_values: ''", - "uint64_values: ''", - "string_values: ''", - "basic_types_values: ''", - "constants_values: ''", - "defaults_values: ''", - "bool_values_default: ''", - "byte_values_default: ''", - "char_values_default: ''", - "float32_values_default: ''", - "float64_values_default: ''", - "int8_values_default: ''", - "uint8_values_default: ''", - "int16_values_default: ''", - "uint16_values_default: ''", - "int32_values_default: ''", - "uint32_values_default: ''", - "int64_values_default: ''", - "uint64_values_default: ''", - "string_values_default: ''", - 'alignment_check: 0', - '---' - ]), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=['echo', '--no-arr', '/my_ns/test_unbounded_seq'], - fixture_actions=[get_publisher_node_action( - topic_name='/my_ns/test_unbounded_seq', - topic_type='test_msgs/msg/UnboundedSequence' - )], - expected_output=itertools.cycle([ - "bool_values: ''", - "byte_values: ''", - "char_values: ''", - "float32_values: ''", - "float64_values: ''", - "int8_values: ''", - "uint8_values: ''", - "int16_values: ''", - "uint16_values: ''", - "int32_values: ''", - "uint32_values: ''", - "int64_values: ''", - "uint64_values: ''", - "string_values: ''", - "basic_types_values: ''", - "constants_values: ''", - "defaults_values: ''", - "bool_values_default: ''", - "byte_values_default: ''", - "char_values_default: ''", - "float32_values_default: ''", - "float64_values_default: ''", - "int8_values_default: ''", - "uint8_values_default: ''", - "int16_values_default: ''", - "uint16_values_default: ''", - "int32_values_default: ''", - "uint32_values_default: ''", - "int64_values_default: ''", - "uint64_values_default: ''", - "string_values_default: ''", - 'alignment_check: 0', - '---' - ]), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=['echo', '--no-arr', '/my_ns/test_bounded_seq'], - fixture_actions=[get_publisher_node_action( - topic_name='/my_ns/test_bounded_seq', - topic_type='test_msgs/msg/BoundedSequence' - )], - expected_output=itertools.cycle([ - "int8_values: ''", - "uint8_values: ''", - "int16_values: ''", - "uint16_values: ''", - "int32_values: ''", - "uint32_values: ''", - "int64_values: ''", - "uint64_values: ''", - "string_values: ''", - "basic_types_values: ''", - "constants_values: ''", - "defaults_values: ''", - "bool_values_default: ''", - "byte_values_default: ''", - "char_values_default: ''", - "float32_values_default: ''", - "float64_values_default: ''", - "int8_values_default: ''", - "uint8_values_default: ''", - "int16_values_default: ''", - "uint16_values_default: ''", - "int32_values_default: ''", - "uint32_values_default: ''", - "int64_values_default: ''", - "uint64_values_default: ''", - "string_values_default: ''", - 'alignment_check: 0', - '---' - ]), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=['echo', '--truncate-length', '5', '/my_ns/chatter'], - fixture_actions=[get_talker_node_action()], - expected_output=itertools.cycle([ - re.compile(r'data: Hello...'), - '---' - ]), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0, - ), - CLITestConfiguration( - command='topic', - arguments=['pub', '/my_ns/chatter', 'std_msgs/msg/String', '{data: test}'], - fixture_actions=[get_listener_node_action(node_name='my_listener')], - expected_output={ - 'cli': itertools.chain( - ['publisher: beginning loop'], - itertools.cycle([ - re.compile(r"publishing #\d+: std_msgs\.msg\.String\(data='test'\)"), - '' - ]) - ), - 'my_listener': itertools.repeat('[INFO] [my_ns.my_listener]: I heard: [test]') - }, - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=['pub', '--once', '/my_ns/chatter', 'std_msgs/msg/String', '{data: test_once}'], - expected_output=[ - 'publisher: beginning loop', - re.compile(r"publishing #1: std_msgs\.msg\.String\(data='test'\)"), - '' - ], - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ) - ), - CLITestConfiguration( - command='topic', - arguments=[ - 'pub', - '-p', '2', - '/my_ns/chatter', - 'std_msgs/msg/String', - '{data: test}' - ], - expected_output=itertools.chain( - ['publisher: beginning loop'], - itertools.cycle([ - lambda line: int(re.match( - "publishing #(\d+): std_msgs\.msg\.String\(data='test'\)", line - ).group(1)) % 2 == 0, - '' - ]) - ), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=['delay', '/my_ns/cmd_vel'], - fixture_actions=[get_dummy_base_controller_node_action(start_time=now)], - expected_output=itertools.cycle([ - lambda line: ( - re.match(r'average delay: \d+.\d{3}', line) is not None - and abs(float(line.split(':')[-1]) - (time.time() - now)) < 5. - ), - re.compile( - r'\s*min: \d+.\d{3}s max: \d+.\d{3}s std dev: \d+.\d{5}s window: \d+' - ) - ]), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=['hz', '/my_ns/chatter'], - fixture_actions=[get_talker_node_action()], - expected_output=itertools.cycle([ - lambda line: ( - re.match(r'average rate: \d\.\d{3}', line) is not None - and abs(float(line[-5:]) - 1.) < 0.01 - ), - re.compile(r'\s*min: \d\.\d{3}s max: \d\.\d{3}s std dev: \d\.\d{5}s window: \d+') - ]), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - CLITestConfiguration( - command='topic', - arguments=[ - 'hz', - '--filter', - 'int(m.data.rpartition(\":\")[-1]) % 2 == 0', - '/my_ns/chatter' - ], - fixture_actions=[get_talker_node_action()], - expected_output=itertools.cycle([ - lambda line: ( - re.match(r'average rate: \d\.\d{3}', line) is not None - and abs(float(line[-5:]) - 0.5) < 0.01 - ), - re.compile(r'\s*min: \d\.\d{3}s max: \d\.\d{3}s std dev: \d\.\d{5}s window: \d+') - ]), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=10.0 - ), - CLITestConfiguration( - command='topic', - arguments=['bw', '/my_ns/chatter'], - fixture_actions=[get_talker_node_action()], - expected_output=itertools.chain( - ['Subscribed to [/my_ns/chatter]'], - itertools.cycle([ - re.compile(r'average: 2\d\.\d{2}B/s'), - re.compile( - r'\s*mean: 2\d\.\d{2}B/s min: 2\d\.\d{2}B/s max: 2\d\.\d{2}B/s window: \d+' - ) - ]) - ), - output_filter=basic_output_filter( - filtered_rmw_implementation=rmw_implementation - ), - self_terminates=False, - exit_codes=[2], - timeout=5.0 - ), - ] diff --git a/test_ros2cli/test/test_cli.py.in b/test_ros2cli/test/test_cli.py.in deleted file mode 100644 index 4fce2a7bd..000000000 --- a/test_ros2cli/test/test_cli.py.in +++ /dev/null @@ -1,139 +0,0 @@ -# Copyright 2019 Open Source Robotics Foundation, Inc. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -import os -import sys - -import unittest - -from launch import LaunchDescription -from launch.actions import ExecuteProcess -from launch.actions import OpaqueFunction -from launch.actions import TimerAction -from launch.substitutions import LaunchConfiguration - -import launch_testing -import launch_testing.asserts -import launch_testing.util - -sys.path.append(os.path.dirname(__file__)) - -from @TEST_CONFIG_MODULE@ import get_test_configurations # noqa - - -@launch_testing.parametrize( - 'test_configuration', get_test_configurations( - rmw_implementation='@TEST_RMW_IMPLEMENTATION@' - ) -) -def generate_test_description(test_configuration, ready_fn): - cmd = ['ros2', test_configuration.command] - cmd.extend(test_configuration.arguments) - command_under_test = ExecuteProcess( - cmd=cmd, - name='cli', - output='screen', - additional_env={'PYTHONUNBUFFERED': '1'}, - sigterm_timeout=LaunchConfiguration( - 'sigterm_timeout', default=60 - ) - ) - actions = [command_under_test] - actions.append( - OpaqueFunction(function=lambda context: ready_fn()) - ) - launch_description = LaunchDescription([ - # Always restart daemon to isolate tests. - ExecuteProcess( - cmd=['ros2', 'daemon', 'stop'], - name='daemon-stop', - on_exit=[ - ExecuteProcess( - cmd=['ros2', 'daemon', 'start'], - name='daemon-start', - on_exit=[ - # Add test fixture actions. - *test_configuration.fixture_actions, - # Spin up command under test with a setup delay. - TimerAction( - period=test_configuration.setup_delay or @TEST_SETUP_DELAY@, - actions=actions, - ) - ] - ) - ] - ), - ]) - return launch_description, locals() - - -class TestCommandFinishesInAFiniteAmountOfTime(unittest.TestCase): - - def @TEST_NAME@(self, proc_info, command_under_test, test_configuration): - """Test that the command under test finished in a finite amount of time.""" - success = proc_info.waitForShutdown( - process=command_under_test, timeout=test_configuration.timeout - ) - if test_configuration.self_terminates: - assert success, "Timed out waiting for process '{}' to finish".format( - command_under_test - ) - - -@launch_testing.post_shutdown_test() -class TestCommandOutput(unittest.TestCase): - - def @TEST_NAME@(self, proc_info, proc_output, command_under_test, test_configuration): - """Test that the command under test finished cleanly.""" - launch_testing.asserts.assertExitCodes( - proc_info, - test_configuration.exit_codes, - command_under_test - ) - processes_expected_output = test_configuration.expected_output - if not isinstance(processes_expected_output, dict): - processes_expected_output = {'cli': processes_expected_output} - for process_name, expected_output in processes_expected_output.items(): - try: - process = launch_testing.util.resolveProcesses( - info_obj=proc_output, process=process_name - )[0] - - process_output = ''.join( - output.text.decode() - for output in proc_output[process] - ) - - if test_configuration.output_filter is not None: - process_output = test_configuration.output_filter(process_output) - process_output_lines = process_output.splitlines() - print(process_output_lines, expected_output) - if expected_output: - assert process_output_lines - if not callable(expected_output): - output_lines_n_patterns = list(zip(process_output_lines, expected_output)) - assert len(output_lines_n_patterns) == len(process_output_lines) - for line, pattern in output_lines_n_patterns: - if callable(pattern): - assert pattern(line) - elif hasattr(pattern, 'match'): - assert pattern.match(line) is not None - else: - assert pattern in line - else: - assert expected_output(process_output_lines) - else: - assert not process_output_lines - except launch_testing.util.NoMatchingProcessException: - assert not expected_output