diff --git a/ros2bag/ros2bag/api/__init__.py b/ros2bag/ros2bag/api/__init__.py index d4d4f7363..b96e81f8b 100644 --- a/ros2bag/ros2bag/api/__init__.py +++ b/ros2bag/ros2bag/api/__init__.py @@ -44,6 +44,9 @@ def dict_to_duration(time_dict: Optional[Dict[str, int]]) -> Duration: """Convert a QoS duration profile from YAML into an rclpy Duration.""" if time_dict: try: + if (Duration(seconds=time_dict['sec'], nanoseconds=time_dict['nsec']) < + Duration(seconds=0)): + raise ValueError('Time duration may not be a negative value.') return Duration(seconds=time_dict['sec'], nanoseconds=time_dict['nsec']) except KeyError: raise ValueError( @@ -61,6 +64,8 @@ def interpret_dict_as_qos_profile(qos_profile_dict: Dict) -> QoSProfile: elif policy_key in _QOS_POLICY_FROM_SHORT_NAME: new_profile_dict[policy_key] = _QOS_POLICY_FROM_SHORT_NAME[policy_key](policy_value) elif policy_key in _VALUE_KEYS: + if policy_value < 0: + raise ValueError('`{}` may not be a negative value.'.format(policy_key)) new_profile_dict[policy_key] = policy_value else: raise ValueError('Unexpected key `{}` for QoS profile.'.format(policy_key)) diff --git a/ros2bag/test/test_api.py b/ros2bag/test/test_api.py index e01992110..f90a9dbe4 100644 --- a/ros2bag/test/test_api.py +++ b/ros2bag/test/test_api.py @@ -74,3 +74,17 @@ def test_convert_yaml_to_qos_profile(self): assert qos_profiles[topic_name_2].avoid_ros_namespace_conventions == expected_convention assert qos_profiles[topic_name_2].history == \ QoSHistoryPolicy.RMW_QOS_POLICY_HISTORY_KEEP_ALL + + def test_interpret_dict_as_qos_profile_negative(self): + qos_dict = {'history': 'keep_all', 'depth': -1} + with self.assertRaises(ValueError): + interpret_dict_as_qos_profile(qos_dict) + qos_dict = {'history': 'keep_all', 'deadline': {'sec': -1, 'nsec': -1}} + with self.assertRaises(ValueError): + interpret_dict_as_qos_profile(qos_dict) + qos_dict = {'history': 'keep_all', 'lifespan': {'sec': -1, 'nsec': -1}} + with self.assertRaises(ValueError): + interpret_dict_as_qos_profile(qos_dict) + qos_dict = {'history': 'keep_all', 'liveliness_lease_duration': {'sec': -1, 'nsec': -1}} + with self.assertRaises(ValueError): + interpret_dict_as_qos_profile(qos_dict)