diff --git a/ros2bag/package.xml b/ros2bag/package.xml
index fcf8d0f57..8e153ed44 100644
--- a/ros2bag/package.xml
+++ b/ros2bag/package.xml
@@ -13,6 +13,7 @@
ros2cli
rosbag2_transport
+ rosbag2_py
ament_copyright
ament_flake8
diff --git a/ros2bag/ros2bag/verb/convert.py b/ros2bag/ros2bag/verb/convert.py
new file mode 100644
index 000000000..8f76a0c77
--- /dev/null
+++ b/ros2bag/ros2bag/verb/convert.py
@@ -0,0 +1,140 @@
+# Copyright 2020 AIT Austrian Institute of Technology GmbH
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import datetime
+import os
+from ros2bag.api import print_error
+from ros2bag.verb import VerbExtension
+
+
+def get_rosbag_options(path, serialization_format='cdr'):
+ import rosbag2_py
+ storage_options = rosbag2_py.StorageOptions(uri=path, storage_id='sqlite3')
+
+ converter_options = rosbag2_py.ConverterOptions(
+ input_serialization_format=serialization_format,
+ output_serialization_format=serialization_format)
+
+ return storage_options, converter_options
+
+
+def determine_compression_mode(path):
+ # TODO determine from metadata
+ return 'none'
+
+
+def compression_mode_from_string(mode):
+ if mode == 'none':
+ return 0
+ if mode == 'file':
+ return 1
+ if mode == 'message':
+ return 2
+ raise ValueError(f'invalid compression mode: {mode}')
+
+
+class ConvertVerb(VerbExtension):
+ """ros2 bag convert."""
+
+ def add_arguments(self, parser, cli_name): # noqa: D102
+ parser.add_argument(
+ 'bag_file', help='bag file to convert')
+ parser.add_argument(
+ '-o', '--output',
+ help='destination of the bagfile to create, \
+ defaults to a timestamped folder in the current directory')
+ parser.add_argument(
+ '-s', '--in-storage',
+ help='storage identifier to be used for the input bag, defaults to "sqlite3"')
+ parser.add_argument(
+ '--out-storage', default='sqlite3',
+ help='storage identifier to be used for the output bag, defaults to "sqlite3"')
+ parser.add_argument(
+ '--compression-mode', type=str, default='none',
+ choices=['none', 'file', 'message'],
+ help="Determine whether to compress by file or message. Default is 'none'.")
+ parser.add_argument(
+ '--compression-format', type=str, default='', choices=['zstd'],
+ help='Specify the compression format/algorithm. Default is none.')
+ parser.add_argument(
+ '-f', '--serialization-format', default='',
+ help='rmw serialization format in which the messages are saved, defaults to the'
+ ' rmw currently in use')
+
+ def main(self, *, args): # noqa: D102
+ bag_file = args.bag_file
+ if not os.path.exists(bag_file):
+ return print_error("bag file '{}' does not exist!".format(bag_file))
+
+ uri = args.output or datetime.datetime.now().strftime('rosbag2_%Y_%m_%d-%H_%M_%S')
+
+ if os.path.isdir(uri):
+ return print_error("Output folder '{}' already exists.".format(uri))
+
+ if args.compression_format and args.compression_mode == 'none':
+ return print_error('Invalid choice: Cannot specify compression format '
+ 'without a compression mode.')
+
+ # NOTE(hidmic): in merged install workspaces on Windows, Python entrypoint lookups
+ # combined with constrained environments (as imposed by colcon test)
+ # may result in DLL loading failures when attempting to import a C
+ # extension. Therefore, do not import rosbag2_transport at the module
+ # level but on demand, right before first use.
+ from rosbag2_py import (
+ SequentialReader,
+ SequentialCompressionReader,
+ SequentialWriter,
+ SequentialCompressionWriter,
+ StorageOptions,
+ ConverterOptions,
+ # CompressionOptions,
+ )
+
+ if determine_compression_mode(bag_file) == 'none':
+ reader = SequentialReader()
+ else:
+ reader = SequentialCompressionReader()
+
+ in_storage_options, in_converter_options = get_rosbag_options(bag_file)
+ if args.in_storage:
+ in_storage_options.storage = args.in_storage
+ reader.open(in_storage_options, in_converter_options)
+
+ # out_compression_options = CompressionOptions()
+ # out_compression_options.compression_format = args.compression_format
+ # out_compression_options.compression_mode = compression_mode_from_string(args.compression_mode)
+
+ if args.compression_mode != 'none':
+ writer = SequentialCompressionWriter()
+ else:
+ writer = SequentialWriter()
+
+ out_storage_options = StorageOptions(uri=uri, storage_id=args.out_storage)
+ out_converter_options = ConverterOptions(
+ input_serialization_format=args.serialization_format,
+ output_serialization_format=args.serialization_format)
+ writer.open(out_storage_options, out_converter_options)
+
+ for topic_metadata in reader.get_all_topics_and_types():
+ writer.create_topic(topic_metadata)
+
+ while reader.has_next():
+ (topic, data, t) = reader.read_next()
+ writer.write(topic, data, t)
+
+ del writer
+ del reader
+
+ if os.path.isdir(uri) and not os.listdir(uri):
+ os.rmdir(uri)
diff --git a/ros2bag/setup.py b/ros2bag/setup.py
index f6efbec71..f38f02ea5 100644
--- a/ros2bag/setup.py
+++ b/ros2bag/setup.py
@@ -38,6 +38,7 @@
'ros2bag.verb = ros2bag.verb:VerbExtension',
],
'ros2bag.verb': [
+ 'convert = ros2bag.verb.convert:ConvertVerb',
'info = ros2bag.verb.info:InfoVerb',
'list = ros2bag.verb.list:ListVerb',
'play = ros2bag.verb.play:PlayVerb',
diff --git a/ros2bag/test/test_convert.py b/ros2bag/test/test_convert.py
new file mode 100644
index 000000000..972892475
--- /dev/null
+++ b/ros2bag/test/test_convert.py
@@ -0,0 +1,121 @@
+# Copyright 2020 AIT Austrian Institute of Technology
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import contextlib
+from pathlib import Path
+import os
+import re
+import sys
+import tempfile
+import time
+
+import unittest
+
+from launch import LaunchDescription
+from launch.actions import ExecuteProcess
+
+import launch_testing
+import launch_testing.actions
+import launch_testing.asserts
+import launch_testing.markers
+import launch_testing.tools
+from launch_testing.asserts import EXIT_OK
+
+import pytest
+
+
+RESOURCE_PATH = Path(__file__).parent / 'resources'
+OUTPUT_WAIT_TIMEOUT = 2
+SHUTDOWN_TIMEOUT = 1
+
+
+@pytest.mark.rostest
+@launch_testing.markers.keep_alive
+def generate_test_description():
+ return LaunchDescription([launch_testing.actions.ReadyToTest()])
+
+
+class TestRos2BagConvert(unittest.TestCase):
+
+ @classmethod
+ def setUpClass(cls, launch_service, proc_info, proc_output):
+ @contextlib.contextmanager
+ def launch_bag_command(self, arguments, **kwargs):
+ pkg_command_action = ExecuteProcess(
+ cmd=['ros2', 'bag', 'convert', *arguments],
+ additional_env={'PYTHONUNBUFFERED': '1'},
+ name='ros2bag-cli',
+ output='screen',
+ **kwargs
+ )
+ with launch_testing.tools.launch_process(
+ launch_service, pkg_command_action, proc_info, proc_output
+ ) as pkg_command:
+ yield pkg_command
+ cls.launch_bag_command = launch_bag_command
+ cls.tmpdir = tempfile.TemporaryDirectory()
+ cls.allowed_exit_codes = [EXIT_OK, 1, 2]
+
+ @classmethod
+ def tearDownClass(cls):
+ try:
+ cls.tmpdir.cleanup()
+ except OSError:
+ if sys.platform != 'win32':
+ raise
+ # HACK to allow Windows to close pending file handles
+ time.sleep(3)
+ cls.tmpdir.cleanup()
+
+ def test_empty(self):
+ inbag_path = RESOURCE_PATH / 'empty_bag'
+ outbag_path = Path(self.tmpdir.name) / 'ros2bag_convert'
+ arguments = [inbag_path.as_posix(),
+ '--output', outbag_path.as_posix()]
+ expected_in_output_regex = re.compile(
+ r'\[rosbag2_storage]: Opened database .* for READ_ONLY')
+ expected_out_output_regex = re.compile(
+ r'\[rosbag2_storage]: Opened database .* for READ_WRITE')
+ with self.launch_bag_command(arguments=arguments) as bag_command:
+ bag_command.wait_for_output(
+ condition=lambda output: expected_in_output_regex.search(output) is not None,
+ timeout=OUTPUT_WAIT_TIMEOUT)
+ bag_command.wait_for_output(
+ condition=lambda output: expected_out_output_regex.search(output) is not None,
+ timeout=OUTPUT_WAIT_TIMEOUT)
+ bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT)
+ assert bag_command.terminated
+ assert bag_command.exit_code in self.allowed_exit_codes
+
+ def test_compressed_output(self):
+ inbag_path = RESOURCE_PATH / 'empty_bag'
+ outbag_path = Path(self.tmpdir.name) / 'ros2bag_convert'
+ arguments = [inbag_path.as_posix(),
+ '--output', outbag_path.as_posix(),
+ '--compression-mode', 'file',
+ '--compression-format', 'zstd']
+ expected_in_output_regex = re.compile(
+ r'\[rosbag2_storage]: Opened database .* for READ_ONLY')
+ expected_out_output_regex = re.compile(
+ r'\[rosbag2_storage]: Opened database .* for READ_WRITE')
+ with self.launch_bag_command(arguments=arguments) as bag_command:
+ bag_command.wait_for_output(
+ condition=lambda output: expected_in_output_regex.search(output) is not None,
+ timeout=OUTPUT_WAIT_TIMEOUT)
+ bag_command.wait_for_output(
+ condition=lambda output: expected_out_output_regex.search(output) is not None,
+ timeout=OUTPUT_WAIT_TIMEOUT)
+ bag_command.wait_for_shutdown(timeout=SHUTDOWN_TIMEOUT)
+ assert bag_command.terminated
+ assert bag_command.exit_code in self.allowed_exit_codes