Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Port pointcloud creation to numpy. #175

Merged
merged 20 commits into from
Mar 29, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,7 @@
*.exe
*.out
*.app

# Python temporary files
*.pyc
__pycache__/
1 change: 1 addition & 0 deletions sensor_msgs_py/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
<author email="[email protected]">Sebastian Grans</author>

<exec_depend>sensor_msgs</exec_depend>
<exec_depend>python3-numpy</exec_depend>

<test_depend>ament_copyright</test_depend>
<test_depend>ament_flake8</test_depend>
Expand Down
313 changes: 209 additions & 104 deletions sensor_msgs_py/sensor_msgs_py/point_cloud2.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,27 +37,37 @@
sensor_msgs/src/sensor_msgs/point_cloud2.py
"""

import array
from collections import namedtuple
import ctypes
import math
import struct
import sys
from typing import Iterable, List, NamedTuple, Optional

import numpy as np
from numpy.lib.recfunctions import (structured_to_unstructured,
unstructured_to_structured)
from sensor_msgs.msg import PointCloud2, PointField
from std_msgs.msg import Header


_DATATYPES = {}
_DATATYPES[PointField.INT8] = ('b', 1)
_DATATYPES[PointField.UINT8] = ('B', 1)
_DATATYPES[PointField.INT16] = ('h', 2)
_DATATYPES[PointField.UINT16] = ('H', 2)
_DATATYPES[PointField.INT32] = ('i', 4)
_DATATYPES[PointField.UINT32] = ('I', 4)
_DATATYPES[PointField.FLOAT32] = ('f', 4)
_DATATYPES[PointField.FLOAT64] = ('d', 8)


def read_points(cloud, field_names=None, skip_nans=False, uvs=[]):
_DATATYPES[PointField.INT8] = np.dtype(np.int8)
_DATATYPES[PointField.UINT8] = np.dtype(np.uint8)
_DATATYPES[PointField.INT16] = np.dtype(np.int16)
_DATATYPES[PointField.UINT16] = np.dtype(np.uint16)
_DATATYPES[PointField.INT32] = np.dtype(np.int32)
_DATATYPES[PointField.UINT32] = np.dtype(np.uint32)
_DATATYPES[PointField.FLOAT32] = np.dtype(np.float32)
_DATATYPES[PointField.FLOAT64] = np.dtype(np.float64)

DUMMY_FIELD_PREFIX = 'unnamed_field'


def read_points(
cloud: PointCloud2,
field_names: Optional[List[str]] = None,
skip_nans: bool = False,
uvs: Optional[Iterable] = None,
reshape_organized_cloud: bool = False) -> np.ndarray:
"""
Read points from a sensor_msgs.PointCloud2 message.

Expand All @@ -67,56 +77,92 @@ def read_points(cloud, field_names=None, skip_nans=False, uvs=[]):
:param skip_nans: If True, then don't return any point with a NaN value.
(Type: Bool, Default: False)
:param uvs: If specified, then only return the points at the given
coordinates. (Type: Iterable, Default: empty list)
:return: Generator which yields a list of values for each point.
coordinates. (Type: Iterable, Default: None)
:param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
:return: Structured NumPy array containing all points.
"""
assert isinstance(cloud, PointCloud2), \
'cloud is not a sensor_msgs.msg.PointCloud2'
fmt = _get_struct_fmt(cloud.is_bigendian, cloud.fields, field_names)
width, height, point_step, row_step, data, isnan = \
cloud.width, cloud.height, \
cloud.point_step, cloud.row_step, \
cloud.data, math.isnan

unpack_from = struct.Struct(fmt).unpack_from

if skip_nans:
if uvs:
for u, v in uvs:
p = unpack_from(data, (row_step * v) + (point_step * u))
has_nan = False
for pv in p:
if isnan(pv):
has_nan = True
break
if not has_nan:
yield p
else:
for v in range(height):
offset = row_step * v
for u in range(width):
p = unpack_from(data, offset)
has_nan = False
for pv in p:
if isnan(pv):
has_nan = True
break
if not has_nan:
yield p
offset += point_step
else:
if uvs:
for u, v in uvs:
yield unpack_from(data, (row_step * v) + (point_step * u))
else:
for v in range(height):
offset = row_step * v
for u in range(width):
yield unpack_from(data, offset)
offset += point_step
'Cloud is not a sensor_msgs.msg.PointCloud2'

# Cast bytes to numpy array
points = np.ndarray(
shape=(cloud.width * cloud.height, ),
dtype=dtype_from_fields(cloud.fields),
buffer=cloud.data)

# Keep only the requested fields
if field_names is not None:
assert all(field_name in points.dtype.names for field_name in field_names), \
'Requests field is not in the fields of the PointCloud!'
# Mask fields
points = points[list(field_names)]

# Swap array if byte order does not match
if bool(sys.byteorder != 'little') != bool(cloud.is_bigendian):
points = points.byteswap(inplace=True)

# Check if we want to drop points with nan values
if skip_nans and not cloud.is_dense:
# Init mask which selects all points
not_nan_mask = np.ones(len(points), dtype=bool)
for field_name in points.dtype.names:
# Only keep points without any non values in the mask
not_nan_mask = np.logical_and(
not_nan_mask, ~np.isnan(points[field_name]))
# Select these points
points = points[not_nan_mask]

# Select points indexed by the uvs field
if uvs is not None:
# Don't convert to numpy array if it is already one
if not isinstance(uvs, np.ndarray):
uvs = np.fromiter(uvs, int)
# Index requested points
points = points[uvs]

# Cast into 2d array if cloud is 'organized'
if reshape_organized_cloud and cloud.height > 1:
points = points.reshape(cloud.width, cloud.height)

return points


def read_points_numpy(
cloud: PointCloud2,
field_names: Optional[List[str]] = None,
skip_nans: bool = False,
uvs: Optional[Iterable] = None,
reshape_organized_cloud: bool = False) -> np.ndarray:
"""
Read equally typed fields from sensor_msgs.PointCloud2 message as a unstructured numpy array.

This method is better suited if one wants to perform math operations
on e.g. all x,y,z fields.
But it is limited to fields with the same dtype as unstructured numpy arrays
only contain one dtype.

def read_points_list(cloud, field_names=None, skip_nans=False, uvs=[]):
:param cloud: The point cloud to read from sensor_msgs.PointCloud2.
:param field_names: The names of fields to read. If None, read all fields.
(Type: Iterable, Default: None)
:param skip_nans: If True, then don't return any point with a NaN value.
(Type: Bool, Default: False)
:param uvs: If specified, then only return the points at the given
coordinates. (Type: Iterable, Default: None)
:param reshape_organized_cloud: Returns the array as an 2D organized point cloud if set.
:return: Numpy array containing all points.
"""
assert all(cloud.fields[0].datatype == field.datatype for field in cloud.fields[1:]), \
'All fields need to have the same datatype. Use `read_points()` otherwise.'
structured_numpy_array = read_points(
cloud, field_names, skip_nans, uvs, reshape_organized_cloud)
return structured_to_unstructured(structured_numpy_array)


def read_points_list(
cloud: PointCloud2,
field_names: Optional[List[str]] = None,
skip_nans: bool = False,
uvs: Optional[Iterable] = None) -> List[NamedTuple]:
"""
Read points from a sensor_msgs.PointCloud2 message.

Expand All @@ -129,7 +175,7 @@ def read_points_list(cloud, field_names=None, skip_nans=False, uvs=[]):
:param skip_nans: If True, then don't return any point with a NaN value.
(Type: Bool, Default: False)
:param uvs: If specified, then only return the points at the given
coordinates. (Type: Iterable, Default: empty list]
coordinates. (Type: Iterable, Default: None]
:return: List of namedtuples containing the values for each point
"""
assert isinstance(cloud, PointCloud2), \
Expand All @@ -144,7 +190,53 @@ def read_points_list(cloud, field_names=None, skip_nans=False, uvs=[]):
skip_nans, uvs)]


def create_cloud(header, fields, points):
def dtype_from_fields(fields: Iterable[PointField]) -> np.dtype:
"""
Convert a Iterable of sensor_msgs.msg.PointField messages to a np.dtype.

:param fields: The point cloud fields.
(Type: iterable of sensor_msgs.msg.PointField)
:returns: NumPy datatype
"""
# Create a lists containing the names, offsets and datatypes of all fields
field_names = []
field_offsets = []
field_datatypes = []
for i, field in enumerate(fields):
# Datatype as numpy datatype
datatype = _DATATYPES[field.datatype]
# Name field
if field.name == '':
name = f'{DUMMY_FIELD_PREFIX}_{i}'
else:
name = field.name
# Handle fields with count > 1 by creating subfields with a suffix consiting
# of "_" followed by the subfield counter [0 -> (count - 1)]
assert field.count > 0, "Can't process fields with count = 0."
for a in range(field.count):
# Add suffix if we have multiple subfields
if field.count > 1:
subfield_name = f'{name}_{a}'
else:
subfield_name = name
assert subfield_name not in field_names, 'Duplicate field names are not allowed!'
field_names.append(subfield_name)
# Create new offset that includes subfields
field_offsets.append(field.offset + a * datatype.itemsize)
field_datatypes.append(datatype.str)

# Create a tuple for each field containing name and data type
return np.dtype({
'names': field_names,
'formats': field_datatypes,
'offsets': field_offsets,
})


def create_cloud(
header: Header,
fields: Iterable[PointField],
points: Iterable) -> PointCloud2:
"""
Create a sensor_msgs.msg.PointCloud2 message.

Expand All @@ -157,28 +249,61 @@ def create_cloud(header, fields, points):
the fields parameter)
:return: The point cloud as sensor_msgs.msg.PointCloud2
"""
cloud_struct = struct.Struct(_get_struct_fmt(False, fields))

buff = ctypes.create_string_buffer(cloud_struct.size * len(points))

point_step, pack_into = cloud_struct.size, cloud_struct.pack_into
offset = 0
for p in points:
pack_into(buff, offset, *p)
offset += point_step

return PointCloud2(header=header,
height=1,
width=len(points),
is_dense=False,
is_bigendian=False,
fields=fields,
point_step=cloud_struct.size,
row_step=cloud_struct.size * len(points),
data=buff.raw)


def create_cloud_xyz32(header, points):
# Check if input is numpy array
if isinstance(points, np.ndarray):
# Check if this is an unstructured array
if points.dtype.names is None:
assert all(fields[0].datatype == field.datatype for field in fields[1:]), \
'All fields need to have the same datatype. Pass a structured NumPy array \
with multiple dtypes otherwise.'
# Convert unstructured to structured array
points = unstructured_to_structured(
points,
dtype=dtype_from_fields(fields))
else:
assert points.dtype == dtype_from_fields(fields), \
'PointFields and structured NumPy array dtype do not match for all fields! \
Check their field order, names and types.'
else:
# Cast python objects to structured NumPy array (slow)
points = np.array(
# Points need to be tuples in the structured array
list(map(tuple, points)),
dtype=dtype_from_fields(fields))

# Handle organized clouds
assert len(points.shape) <= 2, \
'Too many dimensions for organized cloud! \
Points can only be organized in max. two dimensional space'
height = 1
width = points.shape[0]
# Check if input points are an organized cloud (2D array of points)
if len(points.shape) == 2:
height = points.shape[1]

# Convert numpy points to array.array
memory_view = memoryview(points)
casted = memory_view.cast('B')
array_array = array.array('B')
array_array.frombytes(casted)

# Put everything together
cloud = PointCloud2(
header=header,
height=height,
width=width,
is_dense=False,
is_bigendian=sys.byteorder != 'little',
fields=fields,
point_step=points.dtype.itemsize,
row_step=(points.dtype.itemsize * width))
# Set cloud via property instead of the constructor because of the bug described in
# https://github.com/ros2/common_interfaces/issues/176
cloud.data = array_array
return cloud


def create_cloud_xyz32(header: Header, points: Iterable) -> PointCloud2:
"""
Create a sensor_msgs.msg.PointCloud2 message with (x, y, z) fields.

Expand All @@ -193,23 +318,3 @@ def create_cloud_xyz32(header, points):
PointField(name='z', offset=8,
datatype=PointField.FLOAT32, count=1)]
return create_cloud(header, fields, points)


def _get_struct_fmt(is_bigendian, fields, field_names=None):
fmt = '>' if is_bigendian else '<'

offset = 0
for field in (f for f in sorted(fields, key=lambda f: f.offset)
if field_names is None or f.name in field_names):
if offset < field.offset:
fmt += 'x' * (field.offset - offset)
offset = field.offset
if field.datatype not in _DATATYPES:
print('Skipping unknown PointField datatype [%d]' %
field.datatype, file=sys.stderr)
else:
datatype_fmt, datatype_length = _DATATYPES[field.datatype]
fmt += field.count * datatype_fmt
offset += field.count * datatype_length

return fmt
Loading