diff --git a/ros2bag/package.xml b/ros2bag/package.xml index fcf8d0f57..8e153ed44 100644 --- a/ros2bag/package.xml +++ b/ros2bag/package.xml @@ -13,6 +13,7 @@ ros2cli rosbag2_transport + rosbag2_py ament_copyright ament_flake8 diff --git a/ros2bag/ros2bag/verb/convert.py b/ros2bag/ros2bag/verb/convert.py new file mode 100644 index 000000000..8f76a0c77 --- /dev/null +++ b/ros2bag/ros2bag/verb/convert.py @@ -0,0 +1,140 @@ +# Copyright 2020 AIT Austrian Institute of Technology GmbH +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import datetime +import os +from ros2bag.api import print_error +from ros2bag.verb import VerbExtension + + +def get_rosbag_options(path, serialization_format='cdr'): + import rosbag2_py + storage_options = rosbag2_py.StorageOptions(uri=path, storage_id='sqlite3') + + converter_options = rosbag2_py.ConverterOptions( + input_serialization_format=serialization_format, + output_serialization_format=serialization_format) + + return storage_options, converter_options + + +def determine_compression_mode(path): + # TODO determine from metadata + return 'none' + + +def compression_mode_from_string(mode): + if mode == 'none': + return 0 + if mode == 'file': + return 1 + if mode == 'message': + return 2 + raise ValueError(f'invalid compression mode: {mode}') + + +class ConvertVerb(VerbExtension): + """ros2 bag convert.""" + + def add_arguments(self, parser, cli_name): # noqa: D102 + parser.add_argument( + 'bag_file', help='bag file to convert') + parser.add_argument( + '-o', '--output', + help='destination of the bagfile to create, \ + defaults to a timestamped folder in the current directory') + parser.add_argument( + '-s', '--in-storage', + help='storage identifier to be used for the input bag, defaults to "sqlite3"') + parser.add_argument( + '--out-storage', default='sqlite3', + help='storage identifier to be used for the output bag, defaults to "sqlite3"') + parser.add_argument( + '--compression-mode', type=str, default='none', + choices=['none', 'file', 'message'], + help="Determine whether to compress by file or message. Default is 'none'.") + parser.add_argument( + '--compression-format', type=str, default='', choices=['zstd'], + help='Specify the compression format/algorithm. Default is none.') + parser.add_argument( + '-f', '--serialization-format', default='', + help='rmw serialization format in which the messages are saved, defaults to the' + ' rmw currently in use') + + def main(self, *, args): # noqa: D102 + bag_file = args.bag_file + if not os.path.exists(bag_file): + return print_error("bag file '{}' does not exist!".format(bag_file)) + + uri = args.output or datetime.datetime.now().strftime('rosbag2_%Y_%m_%d-%H_%M_%S') + + if os.path.isdir(uri): + return print_error("Output folder '{}' already exists.".format(uri)) + + if args.compression_format and args.compression_mode == 'none': + return print_error('Invalid choice: Cannot specify compression format ' + 'without a compression mode.') + + # NOTE(hidmic): in merged install workspaces on Windows, Python entrypoint lookups + # combined with constrained environments (as imposed by colcon test) + # may result in DLL loading failures when attempting to import a C + # extension. Therefore, do not import rosbag2_transport at the module + # level but on demand, right before first use. + from rosbag2_py import ( + SequentialReader, + SequentialCompressionReader, + SequentialWriter, + SequentialCompressionWriter, + StorageOptions, + ConverterOptions, + # CompressionOptions, + ) + + if determine_compression_mode(bag_file) == 'none': + reader = SequentialReader() + else: + reader = SequentialCompressionReader() + + in_storage_options, in_converter_options = get_rosbag_options(bag_file) + if args.in_storage: + in_storage_options.storage = args.in_storage + reader.open(in_storage_options, in_converter_options) + + # out_compression_options = CompressionOptions() + # out_compression_options.compression_format = args.compression_format + # out_compression_options.compression_mode = compression_mode_from_string(args.compression_mode) + + if args.compression_mode != 'none': + writer = SequentialCompressionWriter() + else: + writer = SequentialWriter() + + out_storage_options = StorageOptions(uri=uri, storage_id=args.out_storage) + out_converter_options = ConverterOptions( + input_serialization_format=args.serialization_format, + output_serialization_format=args.serialization_format) + writer.open(out_storage_options, out_converter_options) + + for topic_metadata in reader.get_all_topics_and_types(): + writer.create_topic(topic_metadata) + + while reader.has_next(): + (topic, data, t) = reader.read_next() + writer.write(topic, data, t) + + del writer + del reader + + if os.path.isdir(uri) and not os.listdir(uri): + os.rmdir(uri) diff --git a/ros2bag/setup.py b/ros2bag/setup.py index f6efbec71..f38f02ea5 100644 --- a/ros2bag/setup.py +++ b/ros2bag/setup.py @@ -38,6 +38,7 @@ 'ros2bag.verb = ros2bag.verb:VerbExtension', ], 'ros2bag.verb': [ + 'convert = ros2bag.verb.convert:ConvertVerb', 'info = ros2bag.verb.info:InfoVerb', 'list = ros2bag.verb.list:ListVerb', 'play = ros2bag.verb.play:PlayVerb', diff --git a/ros2bag/test/test_convert.py b/ros2bag/test/test_convert.py new file mode 100644 index 000000000..972892475 --- /dev/null +++ b/ros2bag/test/test_convert.py @@ -0,0 +1,121 @@ +# Copyright 2020 AIT Austrian Institute of Technology +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +import contextlib +from pathlib import Path +import os +import re +import sys +import tempfile +import time + +import unittest + +from launch import LaunchDescription +from launch.actions import ExecuteProcess + +import launch_testing +import launch_testing.actions +import launch_testing.asserts +import launch_testing.markers +import launch_testing.tools +from launch_testing.asserts import EXIT_OK + +import pytest + + +RESOURCE_PATH = Path(__file__).parent / 'resources' +OUTPUT_WAIT_TIMEOUT = 2 +SHUTDOWN_TIMEOUT = 1 + + +@pytest.mark.rostest +@launch_testing.markers.keep_alive +def generate_test_description(): + return LaunchDescription([launch_testing.actions.ReadyToTest()]) + + +class TestRos2BagConvert(unittest.TestCase): + + @classmethod + def setUpClass(cls, launch_service, proc_info, proc_output): + @contextlib.contextmanager + def launch_bag_command(self, arguments, **kwargs): + pkg_command_action = ExecuteProcess( + cmd=['ros2', 'bag', 'convert', *arguments], + additional_env={'PYTHONUNBUFFERED': '1'}, + name='ros2bag-cli', + output='screen', + **kwargs + ) + with launch_testing.tools.launch_process( + launch_service, pkg_command_action, proc_info, proc_output + ) as pkg_command: + yield pkg_command + cls.launch_bag_command = launch_bag_command + cls.tmpdir = tempfile.TemporaryDirectory() + cls.allowed_exit_codes = [EXIT_OK, 1, 2] + + @classmethod + def tearDownClass(cls): + try: + cls.tmpdir.cleanup() + except OSError: + if sys.platform != 'win32': + raise + # HACK to allow Windows to close pending file handles + time.sleep(3) + cls.tmpdir.cleanup() + + def test_empty(self): + inbag_path = RESOURCE_PATH / 'empty_bag' + outbag_path = Path(self.tmpdir.name) / 'ros2bag_convert' + arguments = [inbag_path.as_posix(), + '--output', outbag_path.as_posix()] + expected_in_output_regex = re.compile( + r'\[rosbag2_storage]: Opened database .* for READ_ONLY') + expected_out_output_regex = re.compile( + r'\[rosbag2_storage]: Opened database .* for READ_WRITE') + with self.launch_bag_command(arguments=arguments) as bag_command: + bag_command.wait_for_output( + condition=lambda output: expected_in_output_regex.search(output) is not None, + timeout=OUTPUT_WAIT_TIMEOUT) + bag_command.wait_for_output( + condition=lambda output: expected_out_output_regex.search(output) is not None, + timeout=OUTPUT_WAIT_TIMEOUT) + bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT) + assert bag_command.terminated + assert bag_command.exit_code in self.allowed_exit_codes + + def test_compressed_output(self): + inbag_path = RESOURCE_PATH / 'empty_bag' + outbag_path = Path(self.tmpdir.name) / 'ros2bag_convert' + arguments = [inbag_path.as_posix(), + '--output', outbag_path.as_posix(), + '--compression-mode', 'file', + '--compression-format', 'zstd'] + expected_in_output_regex = re.compile( + r'\[rosbag2_storage]: Opened database .* for READ_ONLY') + expected_out_output_regex = re.compile( + r'\[rosbag2_storage]: Opened database .* for READ_WRITE') + with self.launch_bag_command(arguments=arguments) as bag_command: + bag_command.wait_for_output( + condition=lambda output: expected_in_output_regex.search(output) is not None, + timeout=OUTPUT_WAIT_TIMEOUT) + bag_command.wait_for_output( + condition=lambda output: expected_out_output_regex.search(output) is not None, + timeout=OUTPUT_WAIT_TIMEOUT) + bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT) + assert bag_command.terminated + assert bag_command.exit_code in self.allowed_exit_codes