diff --git a/tf2_sensor_msgs/CMakeLists.txt b/tf2_sensor_msgs/CMakeLists.txt
index 4c02c8126..766c0ab35 100644
--- a/tf2_sensor_msgs/CMakeLists.txt
+++ b/tf2_sensor_msgs/CMakeLists.txt
@@ -12,7 +12,6 @@ endif()
find_package(ament_cmake_auto REQUIRED)
set(required_dependencies
"sensor_msgs"
- #"python_orocos_kdl"
"tf2"
"tf2_ros"
)
@@ -27,6 +26,8 @@ ament_export_dependencies(Eigen3)
ament_export_dependencies(tf2)
ament_export_dependencies(tf2_ros)
+ament_python_install_package(${PROJECT_NAME})
+
if(BUILD_TESTING)
find_package(ament_cmake_gtest REQUIRED)
find_package(ament_lint_auto REQUIRED)
@@ -37,9 +38,8 @@ if(BUILD_TESTING)
set(ament_cmake_cppcheck_LANGUAGE c++)
ament_lint_auto_find_test_dependencies()
- # TODO(ros2/orocos_kinematics_dynamics): reenable when PyKDL is ready to use
- #find_package(ament_cmake_pytest REQUIRED)
- #ament_add_pytest_test(test_tf2_sensor_msgs_py test/test_tf2_sensor_msgs.py)
+ find_package(ament_cmake_pytest REQUIRED)
+ ament_add_pytest_test(test_tf2_sensor_msgs_py test/test_tf2_sensor_msgs.py)
ament_add_gtest(test_tf2_sensor_msgs_cpp test/test_tf2_sensor_msgs.cpp)
if(TARGET test_tf2_sensor_msgs_cpp)
diff --git a/tf2_sensor_msgs/package.xml b/tf2_sensor_msgs/package.xml
index 8b624e9c7..c2eaee37b 100644
--- a/tf2_sensor_msgs/package.xml
+++ b/tf2_sensor_msgs/package.xml
@@ -20,7 +20,11 @@
eigen
+ geometry_msgs
+ python3-numpy
sensor_msgs
+ sensor_msgs_py
+ std_msgs
tf2
tf2_ros
diff --git a/tf2_sensor_msgs/src/tf2_sensor_msgs/tf2_sensor_msgs.py b/tf2_sensor_msgs/src/tf2_sensor_msgs/tf2_sensor_msgs.py
deleted file mode 100644
index a3014ce36..000000000
--- a/tf2_sensor_msgs/src/tf2_sensor_msgs/tf2_sensor_msgs.py
+++ /dev/null
@@ -1,70 +0,0 @@
-# Copyright 2008 Willow Garage, Inc.
-#
-# Redistribution and use in source and binary forms, with or without
-# modification, are permitted provided that the following conditions are met:
-#
-# * Redistributions of source code must retain the above copyright
-# notice, this list of conditions and the following disclaimer.
-#
-# * Redistributions in binary form must reproduce the above copyright
-# notice, this list of conditions and the following disclaimer in the
-# documentation and/or other materials provided with the distribution.
-#
-# * Neither the name of the Willow Garage, Inc. nor the names of its
-# contributors may be used to endorse or promote products derived from
-# this software without specific prior written permission.
-#
-# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
-# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
-# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
-# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
-# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
-# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
-# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
-# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
-# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
-# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
-# POSSIBILITY OF SUCH DAMAGE.
-
-import PyKDL
-from sensor_msgs.msg import PointCloud2
-from sensor_msgs.point_cloud2 import create_cloud, read_points
-import rospy # noqa(E401)
-import tf2_ros
-
-
-def to_msg_msg(msg):
- return msg
-
-
-tf2_ros.ConvertRegistration().add_to_msg(PointCloud2, to_msg_msg)
-
-
-def from_msg_msg(msg):
- return msg
-
-
-tf2_ros.ConvertRegistration().add_from_msg(PointCloud2, from_msg_msg)
-
-
-def transform_to_kdl(t):
- return PyKDL.Frame(PyKDL.Rotation.Quaternion(
- t.transform.rotation.x, t.transform.rotation.y,
- t.transform.rotation.z, t.transform.rotation.w),
- PyKDL.Vector(t.transform.translation.x,
- t.transform.translation.y,
- t.transform.translation.z))
-
-
-# PointStamped
-def do_transform_cloud(cloud, transform):
- t_kdl = transform_to_kdl(transform)
- points_out = []
- for p_in in read_points(cloud):
- p_out = t_kdl * PyKDL.Vector(p_in[0], p_in[1], p_in[2])
- points_out.append(p_out)
- res = create_cloud(transform.header, cloud.fields, points_out)
- return res
-
-
-tf2_ros.TransformRegistration().add(PointCloud2, do_transform_cloud)
diff --git a/tf2_sensor_msgs/test/test_tf2_sensor_msgs.py b/tf2_sensor_msgs/test/test_tf2_sensor_msgs.py
index 665f08e04..93f0c8211 100644
--- a/tf2_sensor_msgs/test/test_tf2_sensor_msgs.py
+++ b/tf2_sensor_msgs/test/test_tf2_sensor_msgs.py
@@ -29,61 +29,185 @@
# POSSIBILITY OF SUCH DAMAGE.
import copy
-import struct
import unittest
-from sensor_msgs import point_cloud2
+from geometry_msgs.msg import Transform, TransformStamped
+import numpy as np
from sensor_msgs.msg import PointField
-from tf2_ros import TransformStamped
-import tf2_sensor_msgs
+from sensor_msgs_py.point_cloud2 import create_cloud, create_cloud_xyz32, read_points_numpy
+from std_msgs.msg import Header
+import tf2_ros
+from tf2_sensor_msgs import do_transform_cloud, transform_points
-# A sample python unit test
class PointCloudConversions(unittest.TestCase):
def setUp(self):
- self.point_cloud_in = point_cloud2.PointCloud2()
- self.point_cloud_in.fields = [
- PointField('x', 0, PointField.FLOAT32, 1),
- PointField('y', 4, PointField.FLOAT32, 1),
- PointField('z', 8, PointField.FLOAT32, 1)]
-
- self.point_cloud_in.point_step = 4 * 3
- self.point_cloud_in.height = 1
- # we add two points (with x, y, z to the cloud)
- self.point_cloud_in.width = 2
- self.point_cloud_in.row_step = \
- self.point_cloud_in.point_step * self.point_cloud_in.width
-
- points = [1, 2, 0, 10, 20, 30]
- self.point_cloud_in.data = struct.pack('%sf' % len(points), *points)
-
- self.transform_translate_xyz_300 = TransformStamped()
- self.transform_translate_xyz_300.transform.translation.x = 300
- self.transform_translate_xyz_300.transform.translation.y = 300
- self.transform_translate_xyz_300.transform.translation.z = 300
- # no rotation so we only set w
- self.transform_translate_xyz_300.transform.rotation.w = 1
+ self.points = np.array([[1, 2, 0], [10, 20, 30]], dtype=np.float32)
- assert(list(point_cloud2.read_points(self.point_cloud_in)) ==
- [(1.0, 2.0, 0.0), (10.0, 20.0, 30.0)])
+ self.point_cloud_in = create_cloud_xyz32(
+ Header(frame_id='test'),
+ self.points)
def test_simple_transform(self):
- # deepcopy is not required here because we have a str for now
+ k = 300.0
+ transform_translate_xyz = TransformStamped()
+ transform_translate_xyz.transform.translation.x = k
+ transform_translate_xyz.transform.translation.y = k
+ transform_translate_xyz.transform.translation.z = k
+ # no rotation so we only set w
+ transform_translate_xyz.transform.rotation.w = 1.0
+
+ # Copy current state of the original message for later check
old_data = copy.deepcopy(self.point_cloud_in.data)
- point_cloud_transformed = tf2_sensor_msgs.do_transform_cloud(
- self.point_cloud_in, self.transform_translate_xyz_300)
-
- k = 300
- expected_coordinates = [(1+k, 2+k, 0+k), (10+k, 20+k, 30+k)]
- new_points = list(point_cloud2.read_points(point_cloud_transformed))
- print('new_points are %s' % new_points)
- assert(expected_coordinates == new_points)
+
+ # Apply transform
+ point_cloud_transformed = do_transform_cloud(
+ self.point_cloud_in, transform_translate_xyz)
+
+ # Expected output
+ expected_coordinates = self.points + k
+
+ new_points = read_points_numpy(point_cloud_transformed)
+ self.assertTrue(np.allclose(expected_coordinates, new_points))
+
# checking no modification in input cloud
- assert(old_data == self.point_cloud_in.data)
+ self.assertEqual(old_data, self.point_cloud_in.data)
+
+ def test_simple_rotation_transform(self):
+ # Create a transform containing only a 180° rotation around the x-axis
+ transform_rot = TransformStamped()
+ transform_rot.transform.rotation.x = 1.0
+ transform_rot.transform.rotation.w = 0.0
+
+ # Apply transform
+ point_cloud_transformed = do_transform_cloud(self.point_cloud_in, transform_rot)
+
+ # Expected output
+ expected_coordinates = np.array([[1, -2, 0], [10, -20, -30]], dtype=np.float32)
+
+ # Compare to ground truth
+ new_points = read_points_numpy(point_cloud_transformed)
+ self.assertTrue(np.allclose(expected_coordinates, new_points))
+
+ def test_rotation_and_translation_transform(self):
+ # Create a transform combining a 100m z translation with
+ # a 180° rotation around the x-axis
+ transform = TransformStamped()
+ transform.transform.translation.z = 100.0
+ transform.transform.rotation.x = 1.0
+ transform.transform.rotation.w = 0.0
+
+ # Apply transform
+ point_cloud_transformed = do_transform_cloud(
+ self.point_cloud_in, transform)
+
+ # Expected output
+ expected_coordinates = np.array([[1, -2, 100], [10, -20, 70]], dtype=np.float32)
+
+ # Compare to ground truth
+ new_points = read_points_numpy(point_cloud_transformed)
+ self.assertTrue(np.allclose(expected_coordinates, new_points))
+
+ def test_direct_transform(self):
+ # Create a transform combining a 100m z translation with
+ # a 180° rotation around the x-axis
+ transform = Transform()
+ transform.translation.z = 100.0
+ transform.rotation.x = 1.0
+ transform.rotation.w = 0.0
+
+ # Transform points
+ points = transform_points(self.points, transform)
+
+ # Expected output
+ expected_coordinates = np.array([[1, -2, 100], [10, -20, 70]], dtype=np.float32)
+
+ # Compare to ground truth
+ self.assertTrue(np.allclose(expected_coordinates, points))
+
+ def test_tf2_ros_transform(self):
+ # Our target frame
+ target_frame_name = 'base_footprint'
+
+ # We need to create a local test tf buffer
+ tf_buffer = tf2_ros.Buffer()
+
+ # We need to fill this tf_buffer with a possible transform
+ # So we create a transform with a 100m z translation
+ transform = TransformStamped()
+ transform.header.frame_id = 'test'
+ transform.child_frame_id = target_frame_name
+ transform.transform.translation.z = 100.0
+ transform.transform.rotation.w = 1.0
+
+ # Set the new transform in our local tf_buffer
+ tf_buffer.set_transform_static(transform, 'unittest')
+
+ point_cloud_transformed = tf_buffer.transform(self.point_cloud_in, target_frame_name)
+
+ # Check if our pointloud is in the correct frame
+ self.assertEqual(point_cloud_transformed.header.frame_id, target_frame_name)
+
+ # Check if the points are viewed from the target frame (inverse of the transform above)
+ self.assertTrue(np.allclose(
+ read_points_numpy(point_cloud_transformed),
+ self.points -
+ np.array([
+ transform.transform.translation.x,
+ transform.transform.translation.y,
+ transform.transform.translation.z])))
+
+ def test_non_coordinate_fields(self):
+ # Test if point clouds with additional fields (non xyz) behave as desired
+
+ # Create points with an additional field in addition to the x, y, and z ones
+ points = np.array([[1, 2, 0, 9], [10, 20, 30, 9]], dtype=np.float32)
+
+ # Create the field layout
+ fields = [
+ PointField(name='x', offset=0, datatype=PointField.FLOAT32, count=1),
+ PointField(name='y', offset=4, datatype=PointField.FLOAT32, count=1),
+ PointField(name='z', offset=8, datatype=PointField.FLOAT32, count=1),
+ PointField(name='a', offset=12, datatype=PointField.FLOAT32, count=1)]
+
+ # Create cloud with four fields
+ point_cloud = create_cloud(
+ Header(frame_id='test'),
+ fields,
+ points)
+
+ # Create the test transformation
+ k = 300.0
+ transform_translate_xyz = TransformStamped()
+ transform_translate_xyz.transform.translation.x = k
+ transform_translate_xyz.transform.translation.y = k
+ transform_translate_xyz.transform.translation.z = k
+ # no rotation so we only set w
+ transform_translate_xyz.transform.rotation.w = 1.0
+
+ # Copy current state of the original message for later check
+ old_data = copy.deepcopy(point_cloud.data)
+
+ # Check if the created point cloud contains our points
+ self.assertTrue(np.allclose(
+ read_points_numpy(point_cloud),
+ points))
+
+ # Apply transform
+ point_cloud_transformed = do_transform_cloud(
+ point_cloud, transform_translate_xyz)
+
+ # Expected output, the last field should be unaltered
+ expected_coordinates = points + np.array([k, k, k, 0])
+
+ # Check if this is the case
+ new_points = read_points_numpy(point_cloud_transformed)
+ self.assertTrue(np.allclose(expected_coordinates, new_points))
+
+ # Checking for no modification in input cloud
+ self.assertEqual(old_data, point_cloud.data)
if __name__ == '__main__':
- import rosunit
- rosunit.unitrun('test_tf2_sensor_msgs', 'test_point_cloud_conversion',
- PointCloudConversions)
+ unittest.main()
diff --git a/tf2_sensor_msgs/src/tf2_sensor_msgs/__init__.py b/tf2_sensor_msgs/tf2_sensor_msgs/__init__.py
similarity index 90%
rename from tf2_sensor_msgs/src/tf2_sensor_msgs/__init__.py
rename to tf2_sensor_msgs/tf2_sensor_msgs/__init__.py
index 6b4eefa5f..44d1d37d9 100644
--- a/tf2_sensor_msgs/src/tf2_sensor_msgs/__init__.py
+++ b/tf2_sensor_msgs/tf2_sensor_msgs/__init__.py
@@ -26,4 +26,10 @@
# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
# POSSIBILITY OF SUCH DAMAGE.
-from tf2_sensor_msgs import * # noqa(E401)
+from .tf2_sensor_msgs import do_transform_cloud
+from .tf2_sensor_msgs import transform_points
+
+__all__ = [
+ 'do_transform_cloud',
+ 'transform_points',
+]
diff --git a/tf2_sensor_msgs/tf2_sensor_msgs/tf2_sensor_msgs.py b/tf2_sensor_msgs/tf2_sensor_msgs/tf2_sensor_msgs.py
new file mode 100644
index 000000000..0b8703e6e
--- /dev/null
+++ b/tf2_sensor_msgs/tf2_sensor_msgs/tf2_sensor_msgs.py
@@ -0,0 +1,171 @@
+# Copyright 2008 Willow Garage, Inc.
+#
+# Redistribution and use in source and binary forms, with or without
+# modification, are permitted provided that the following conditions are met:
+#
+# * Redistributions of source code must retain the above copyright
+# notice, this list of conditions and the following disclaimer.
+#
+# * Redistributions in binary form must reproduce the above copyright
+# notice, this list of conditions and the following disclaimer in the
+# documentation and/or other materials provided with the distribution.
+#
+# * Neither the name of the Willow Garage, Inc. nor the names of its
+# contributors may be used to endorse or promote products derived from
+# this software without specific prior written permission.
+#
+# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
+# AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
+# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
+# ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE
+# LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR
+# CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF
+# SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS
+# INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN
+# CONTRACT, STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE)
+# ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
+# POSSIBILITY OF SUCH DAMAGE.
+
+from typing import Union
+
+from geometry_msgs.msg import Transform, TransformStamped
+import numpy as np
+try:
+ from numpy.lib.recfunctions import (structured_to_unstructured, unstructured_to_structured)
+except ImportError:
+ # Fix for RHEL because its NumPy version does not include these functions
+ from sensor_msgs_py.numpy_compat import (structured_to_unstructured,
+ unstructured_to_structured)
+from sensor_msgs.msg import PointCloud2
+from sensor_msgs_py.point_cloud2 import create_cloud, read_points
+from std_msgs.msg import Header
+import tf2_ros
+
+
+def to_msg_msg(msg):
+ return msg
+
+
+tf2_ros.ConvertRegistration().add_to_msg(PointCloud2, to_msg_msg)
+
+
+def from_msg_msg(msg):
+ return msg
+
+
+tf2_ros.ConvertRegistration().add_from_msg(PointCloud2, from_msg_msg)
+
+
+def transform_points(
+ point_cloud: np.ndarray,
+ transform: Transform) -> np.ndarray:
+ """
+ Transform a bulk of points from an numpy array using a provided `Transform`.
+
+ :param point_cloud: nx3 Array of points where n is the number of points
+ :param transform: TF2 transform used for the transformation
+ :returns: Array with the same shape as the input array, but with the transformation applied
+ """
+ # Build affine transformation
+ transform_translation = np.array([
+ transform.translation.x,
+ transform.translation.y,
+ transform.translation.z
+ ])
+ transform_rotation_matrix = _get_mat_from_quat(
+ np.array([
+ transform.rotation.w,
+ transform.rotation.x,
+ transform.rotation.y,
+ transform.rotation.z
+ ]))
+
+ # "Batched" matmul meaning a matmul for each point
+ # First we offset all points by the translation part
+ # followed by a rotation using the rotation matrix
+ return np.einsum(
+ 'ij, pj -> pi',
+ transform_rotation_matrix,
+ point_cloud) + transform_translation
+
+
+def do_transform_cloud(
+ cloud: PointCloud2,
+ transform: Union[Transform, TransformStamped]) -> PointCloud2:
+ """
+ Apply a `Transform` or `TransformStamped` on a `PointCloud2`.
+
+ The x, y, and z values are transformed into a different frame,
+ while the rest of the cloud is kept untouched.
+
+ :param cloud: The point cloud that should be transformed
+ :param transform: The transform which will applied to the point cloud
+ :returns: The transformed point cloud
+ """
+ # Create new Header so original header is not altered
+ new_header = Header(
+ stamp=cloud.header.stamp,
+ frame_id=cloud.header.frame_id)
+
+ # Check if we have a TransformStamped and are able to update the frame_id
+ if isinstance(transform, TransformStamped):
+ new_header.frame_id = transform.header.frame_id
+ transform = transform.transform
+
+ # Check if xyz are a subset of the field names
+ required_fields = set('xyz')
+ present_fields = {field.name for field in cloud.fields}
+ assert required_fields <= present_fields, \
+ 'Point cloud needs the fields x, y, and z for the transformation'
+
+ # Read points as structured NumPy array
+ points = read_points(cloud)
+
+ # Transform xyz part of the pointcloud using the given transform
+ transformed_xyz = transform_points(
+ structured_to_unstructured(points[['x', 'y', 'z']]),
+ transform)
+
+ # Check if there are additional fields that need to be merged with the transformed coordinates
+ if required_fields != present_fields:
+ # Merge original array including non coordinate fields with the transformed coordinates
+ # The copy is needed as the original message would be altered otherwise
+ points = points.copy()
+ points[['x', 'y', 'z']] = unstructured_to_structured(transformed_xyz)
+ else:
+ points = transformed_xyz
+
+ # Serialize pointcloud message
+ return create_cloud(new_header, cloud.fields, points)
+
+
+tf2_ros.TransformRegistration().add(PointCloud2, do_transform_cloud)
+
+
+def _get_mat_from_quat(quaternion: np.ndarray) -> np.ndarray:
+ """
+ Convert a quaternion to a rotation matrix.
+
+ This method is currently needed because transforms3d is not released as a `.dep` and
+ would require user interaction to set up.
+
+ For reference see: https://github.com/matthew-brett/transforms3d/blob/
+ f185e866ecccb66c545559bc9f2e19cb5025e0ab/transforms3d/quaternions.py#L101
+
+ :param quaternion: A numpy array containing the w, x, y, and z components of the quaternion
+ :returns: An array containing an X, Y, and Z translation component
+ """
+ Nq = np.sum(np.square(quaternion))
+ if Nq < np.finfo(np.float64).eps:
+ return np.eye(3)
+
+ XYZ = quaternion[1:] * 2.0 / Nq
+ wXYZ = XYZ * quaternion[0]
+ xXYZ = XYZ * quaternion[1]
+ yYZ = XYZ[1:] * quaternion[2]
+ zZ = XYZ[2] * quaternion[3]
+
+ return np.array(
+ [[1.0-(yYZ[0]+zZ), xXYZ[1]-wXYZ[2], xXYZ[2]+wXYZ[1]],
+ [xXYZ[1]+wXYZ[2], 1.0-(xXYZ[0]+zZ), yYZ[1]-wXYZ[0]],
+ [xXYZ[2]-wXYZ[1], yYZ[1]+wXYZ[0], 1.0-(xXYZ[0]+yYZ[0])]])