Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Deserialize the cdr message #473

Closed
anacsousa1 opened this issue Jul 27, 2020 · 23 comments
Closed

Deserialize the cdr message #473

anacsousa1 opened this issue Jul 27, 2020 · 23 comments
Labels
enhancement New feature or request

Comments

@anacsousa1
Copy link

Hello, thank you for the repository and for the instructions. I have been using the rosbag2 play, record, and info successfully.

However, I need to save data in another format (along with the bag files). And I have not found a python-API for data deserialization.

I innocently thought I could create a python API to open the CDR file and read it as messages. What I have done so far was
Opening and closing the bag file;
Finding if a specific topic is in the database;
Getting all messages in the topic;
Getting all topics names;
Getting all message types;
Getting message types for particular topics.

So, with that, I can get each message sent. However, I am at the moment of actual deserialization, and I couldn't fully understand how the cdr file is constructed.

When I print a message, I get, for example:
b'\x00\x01\x00\x00\x0f\x00\x00\x00Fake ergometer\x00\x00\x00\x00\x00\x00\xcd\xcc\xcc\xcc\xcc\xcc<@\xfd\xff\xff\xff\xff\xffA@'

I know that the first field is a string with the value "Fake ergometer," so I know I am on the right path. But how do I go from there? What does the header mean, for example?
\x00\x01\x00\x00\x0f\x00\x00\x00

I have been trying to read the CDR documentation (from OMG). Still, it is not direct, and sometimes it mentions different versions of GIOP, so I get confused.

I appreciate any help.

Bellow are the functions for the python-API that I created:

import sqlite3

def connect(sqlite_file):
    """ Make connection to an SQLite database file. """
    conn = sqlite3.connect(sqlite_file)
    c = conn.cursor()
    return conn, c

def close(conn):
    """ Close connection to the database. """
    conn.close()

def countRows(cursor, table_name, print_out=False):
    """ Returns the total number of rows in the database. """
    cursor.execute('SELECT COUNT(*) FROM {}'.format(table_name))
    count = cursor.fetchall()
    if print_out:
        print('\nTotal rows: {}'.format(count[0][0]))
    return count[0][0]

def getHeaders(cursor, table_name, print_out=False):
    """ Returns a list of tuples with column informations:
    (id, name, type, notnull, default_value, primary_key)
    """
    # Get headers from table "table_name"
    cursor.execute('PRAGMA TABLE_INFO({})'.format(table_name))
    info = cursor.fetchall()
    if print_out:
        print("\nColumn Info:\nID, Name, Type, NotNull, DefaultVal, PrimaryKey")
        for col in info:
            print(col)
    return info

def getAllElements(cursor, table_name, print_out=False):
    """ Returns a dictionary with all elements of the table database.
    """
    # Get elements from table "table_name"
    cursor.execute('SELECT * from({})'.format(table_name))
    records = cursor.fetchall()
    if print_out:
        print("\nAll elements:")
        for row in records:
            print(row)
    return records

def isTopic(cursor, topic_name, print_out=False):
    """ Returns topic_name header if it exists. If it doesn't, returns empty.
        It returns the last topic found with this name.
    """
    boolIsTopic = False
    topicFound = []

    # Get all records for 'topics'
    records = getAllElements(cursor, 'topics', print_out=False)

    # Look for specific 'topic_name' in 'records'
    for row in records:
        if(row[1] == topic_name): # 1 is 'name' TODO
            boolIsTopic = True
            topicFound = row
    if print_out:
        if boolIsTopic:
             # 1 is 'name', 0 is 'id' TODO
            print('\nTopic named', topicFound[1], ' exists at id ', topicFound[0] ,'\n')
        else:
            print('\nTopic', topic_name ,'could not be found. \n')

    return topicFound

def getAllMessagesInTopic(cursor, topic_name, print_out=False):
    """ Returns all timestamps and messages at that topic.
    There is no deserialization for the BLOB data.
    """
    count = 0
    timestamps = []
    messages = []

    # Find if topic exists and its id
    topicFound = isTopic(cursor, topic_name, print_out=False)

    # If not find return empty
    if not topicFound:
        print('Topic', topic_name ,'could not be found. \n')
    else:
        records = getAllElements(cursor, 'messages', print_out=False)

        # Look for message with the same id from the topic
        for row in records:
            if row[1] == topicFound[0]:     # 1 and 0 is 'topic_id' TODO
                count = count + 1           # count messages for this topic
                timestamps.append(row[2])   # 2 is for timestamp TODO
                messages.append(row[3])     # 3 is for all messages

        # Print
        if print_out:
            print('\nThere are ', count, 'messages in ', topicFound[1])

    return timestamps, messages

def getAllTopicsNames(cursor, print_out=False):
    """ Returns all topics names.
    """
    topicNames = []
    # Get all records for 'topics'
    records = getAllElements(cursor, 'topics', print_out=False)

    # Save all topics names
    for row in records:
        topicNames.append(row[1])  # 1 is for topic name TODO
    if print_out:
        print('\nTopics names are:')
        print(topicNames)

    return topicNames

def getAllMsgsTypes(cursor, print_out=False):
    """ Returns all messages types.
    """
    msgsTypes = []
    # Get all records for 'topics'
    records = getAllElements(cursor, 'topics', print_out=False)

    # Save all message types
    for row in records:
        msgsTypes.append(row[2])  # 2 is for message type TODO
    if print_out:
        print('\nMessages types are:')
        print(msgsTypes)

    return msgsTypes

def getMsgType(cursor, topic_name, print_out=False):
    """ Returns the message type of that specific topic.
    """
    msg_type = []
    # Get all topics names and all message types
    topic_names = getAllTopicsNames(cursor, print_out=False)
    msgs_types = getAllMsgsTypes(cursor, print_out=False)

    # look for topic at the topic_names list, and find its index
    for index, element in enumerate(topic_names):
        if element == topic_name:
            msg_type = msgs_types[index]
    if print_out:
        print('\nMessage type in', topic_name, 'is', msg_type)

    return msg_type

@emersonknapp
Copy link
Collaborator

@mabelzhang is this issue related to #476 ? Either way, do you have insight on how this functionality can be accomplished via the python API?

@mabelzhang
Copy link
Contributor

I have very limited understanding of CDR (zero understanding) and message deserialization (a little more than zero), so I will try to answer what I can and hope something hits.

Looks like this implementation deals with sqlite3 directly. If getAllMessagesInTopic() returns the same type as data in #308 here https://github.com/ros2/rosbag2/pull/308/files#diff-4f92c776f8e6c0d824f1b53120785d3dR45 (basically the std::shared_ptr<rcutils_uint8_array_t> serialized_data field of rosbag2_storage::SerializedBagMessage returned from Reader.read_next()), then you can use rosidl_runtime_py.utilities.get_message() and rclpy.serialization.deserialize_message() as in the linked file to deserialize. In that case, yes this is what #476 wants to add to make deserialization friendlier.

I think that's what you're asking - deserializing the ROS messages, as opposed to CDR? If that's not what you're asking, and you're asking about CDR, then I have no clue.

@emersonknapp
Copy link
Collaborator

From my understanding the goal here is "read data out of rosbag into an understandable format so that it can be saved out in a different format" - I think currently this is talking about CDR because that's the data that was discovered to be available, not because it's a desired format to work with. I'm assuming that if there were a Python API available to read out the bag as deserialized Python message objects then that would solve the problem. @anacsousa1 can you confirm if this is what you're trying to accomplish?

@emersonknapp emersonknapp added the enhancement New feature or request label Jul 31, 2020
@emersonknapp
Copy link
Collaborator

We might have rmw_deserialize methods in rclpy, need to double check

@mabelzhang
Copy link
Contributor

rmw_deserialize is exactly what's done in rclpy.serialization.deserialize_message() https://github.com/ros2/rclpy/pull/495/files#diff-80dda03110b5be606b823d6b0558b5c2R4775

@Karsten1987
Copy link
Collaborator

@anacsousa1 it's great that you've come already thus far and the link Mabel posted should do exactly what you're missing, namely taking a buffer with serialized CDR data and convert it into a python ROS2 message. Please don't go down that rabbit hole of implementing a CDR parser in python. ROS2 has code in place to do that for you ;-)

@anacsousa1
Copy link
Author

Hey! Thank you all for the responses! I won't be able to do any further tests until the beginning of the next week. But I wanted to make sure I understood the idea:
Using the rmw_deserialize (in rclpy), I may create a node that opens a bag file, and then deserialize that?
Is it possible to give me a basic example of that?

I really did not intend to create the API, if there is an easier path, I definitely prefer that!

@mabelzhang
Copy link
Contributor

mabelzhang commented Jul 31, 2020

Use rosidl_runtime_py.utilities.get_message() and rclpy.serialization.deserialize_message(). They wrap around rmw_deserialize and will save you a lot of time. See example usage here https://github.com/ros2/rosbag2/pull/308/files#diff-4f92c776f8e6c0d824f1b53120785d3dR46

If you want to try another API, you can try the branch in this PR #308 . It is almost ready to merge, I just need to add one more feature and look into a compiler warning on Windows. Everything is working.

@anacsousa1
Copy link
Author

Hey @mabelzhang , thanks a lot for the answer. I believe I am too noob in this. If you could help me with more "baby steps" I would really appreciate it.
I have foxy distribution installed in Ubuntu 20.04, and then I installed rosbag2 as Debian package as described at the readme file. I am able to record and play data successfully.

But I am having trouble using the functions you suggested. A few things I tried:

  1. Cloning the branch https://github.com/mabelzhang/rosbag2/tree/rosbag2_py: git clone https://github.com/mabelzhang/rosbag2/tree/rosbag2_py. It fails

    fatal: repository '[https://github.com/mabelzhang/rosbag2/tree/rosbag2_py/](https://github.com/mabelzhang/rosbag2/tree/rosbag2_py/)' not found
  2. Cloning the master, and building it from source (so I could use the cpp examples):

    mkdir -p ~/rosbag_ws/src
    cd ~/rosbag_ws/src
    git clone https://github.com/ros2/rosbag2.git
    colcon build

    If fails:

    CMake Error at CMakeLists.txt:109 (find_package):
      By not providing "Findtest_msgs.cmake" in CMAKE_MODULE_PATH this project
      has asked CMake to find a package configuration file provided by
      "test_msgs", but CMake did not find one.
    
      Could not find a package configuration file provided by "test_msgs" with
      any of the following names:
    
        test_msgsConfig.cmake
        test_msgs-config.cmake
    
      Add the installation prefix of "test_msgs" to CMAKE_PREFIX_PATH or set
      "test_msgs_DIR" to a directory containing one of the above files.  If
      "test_msgs" provides a separate development package or SDK, be sure it has
      been installed.
    
    ---
    Failed   <<< rosbag2_cpp [1.40s, exited with code 1]

    It also fails for colcon build --packages-select rosbag2_cpp

  3. I also tried to add the test_msgs and rcl_interfaces as suggested here. But it also fails:

    CMake Error at CMakeLists.txt:21 (find_package):
      By not providing "Findtest_interface_files.cmake" in CMAKE_MODULE_PATH this
      project has asked CMake to find a package configuration file provided by
      "test_interface_files", but CMake did not find one.
    
      Could not find a package configuration file provided by
      "test_interface_files" with any of the following names:
    
        test_interface_filesConfig.cmake
        test_interface_files-config.cmake
    
      Add the installation prefix of "test_interface_files" to CMAKE_PREFIX_PATH
      or set "test_interface_files_DIR" to a directory containing one of the
      above files.  If "test_interface_files" provides a separate development
      package or SDK, be sure it has been installed.

As you can see, I am totally lost...

@Karsten1987
Copy link
Collaborator

all you'd need to do is to install the test msgs package - we have binary packages for this, so you don't need to compile it yourself. You could either use rosdep install see an example here or simply install the test_msgs package needed for rosbag2 (sudo apt-get install ros-foxy-test-msgs).

@anacsousa1
Copy link
Author

Thanks a lot for the answer, I installed the test msgs and build it successfully!

sudo apt-get install ros-foxy-test-msgs
colcon build --packages-select rosbag2_cpp rosbag2_test_common shared_queues_vendor sqlite3_vendor rosbag2_storage rosbag2_storage_default_plugins

I expected to find an executable at the CMake, as it does not have one (so it is not just running ros2 run rosbag2_tests test), how do I run the code?

I thank you for the patience with this type of questions.

@mabelzhang
Copy link
Contributor

mabelzhang commented Aug 5, 2020

Cloning the branch https://github.com/mabelzhang/rosbag2/tree/rosbag2_py

Clone from the root (the URL from the green button on the repo page):

git clone https://github.com/mabelzhang/rosbag2.git
git checkout rosbag2_py

It also fails for colcon build --packages-select rosbag2_cpp

To simplify the packages you pass to --packages-select, use --packages-up-to to build all the dependencies of a package, like --packages-up-to rosbag2_py.

I expected to find an executable at the CMake

Which file are you trying to run? If you're looking to run the test Python scripts in rosbag2_py, you can do colcon test --packages-select rosbag2_py, and the log files will be in log/test_*/rosbag2_py/stdout.log. Or you can pass in --event-handlers console_direct+ to output verbosely to the terminal.

For your use case, another way to try out the deserialization is to copy those two lines get_message and deserialize_message from test_sequential_reader.py, with the relevant import lines, to your code and give it a try.

@anacsousa1
Copy link
Author

@mabelzhang, thanks a lot!

I used the get_message and deserialize_message as the example you showed me, and it is working perfectly with the functions I have already created:

import rosbag_api as bag
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message

bag_file = 'rosbag2_2020_07_13-18_13_16/rosbag2_2020_07_13-18_13_16_0.db3'
topic_name = '/control/stim'

### connect to the database
conn, c = bag.connect(bag_file)

### get all topics names and types
topic_names = bag.getAllTopicsNames(c, print_out=False)
topic_types = bag.getAllMsgsTypes(c, print_out=False)

# Create a map for quicker lookup
type_map = {topic_names[i]:topic_types[i] for i in range(len(topic_types))}

### get all timestamps and all messages
t, msgs = bag.getAllMessagesInTopic(c, topic_name, print_out=False)

### >>> deserialization
msg_type = get_message(type_map[topic_name])
x = deserialize_message(msgs[1], msg_type)
print(x)

### close connection to the database
bag.close(conn)

Side note: errors related to pybind11:

I cloned the repository, checkout to your branch, and build the code (thank you a lot for the --packages-up-to tip!). Then, I successfully tested the rosbag2_py and opened the log.

CMake Error at CMakeLists.txt:20 (find_package):
  By not providing "Findpybind11.cmake" in CMAKE_MODULE_PATH this project has
  asked CMake to find a package configuration file provided by "pybind11",
  but CMake did not find one.

I tried to fix with sudo apt install python3-pip and pip3 install pybind11 as mentioned here, but I was unable to install it through apt (E: Unable to locate package python-pybind11) and pip3 already installed it (Requirement already satisfied: pybind11 in /home/ana/.local/lib/python3.8/site-packages (2.5.0)). So, I had to install it from source as suggested here:

# Some prerequisites (but not all of them)
apt-get install cmake
pip3 install pytest

# Clone, build and install 
git clone https://github.com/pybind/pybind11.git 
cd pybind11 
mkdir build 
cd build 
cmake .. 
sudo make install

Issue closed

Anyway, now, I am able to get information from messages perfectly. =]

@mabelzhang
Copy link
Contributor

Thank you for the feedback! Glad it worked for you!

We added pybind11_vendor to ros2.repos a few days ago, as it is required for rosbag2_py. People should be able to check out pybind11_vendor, and building it will install pybind.

@justinberi
Copy link

For anyone landing here and using the foxy branch that doesn't have the rosbag2_py. Here is an example using sqlite3

import sqlite3
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message

import matplotlib.pyplot as plt

class BagFileParser():
    def __init__(self, bag_file):
        self.conn = sqlite3.connect(bag_file)
        self.cursor = self.conn.cursor()

        ## create a message type map
        topics_data = self.cursor.execute("SELECT id, name, type FROM topics").fetchall()
        self.topic_type = {name_of:type_of for id_of,name_of,type_of in topics_data}
        self.topic_id = {name_of:id_of for id_of,name_of,type_of in topics_data}
        self.topic_msg_message = {name_of:get_message(type_of) for id_of,name_of,type_of in topics_data}

    def __del__(self):
        self.conn.close()

    # Return [(timestamp0, message0), (timestamp1, message1), ...]
    def get_messages(self, topic_name):
        
        topic_id = self.topic_id[topic_name]
        # Get from the db
        rows = self.cursor.execute("SELECT timestamp, data FROM messages WHERE topic_id = {}".format(topic_id)).fetchall()
        # Deserialise all and timestamp them
        return [ (timestamp,deserialize_message(data, self.topic_msg_message[topic_name])) for timestamp,data in rows]



if __name__ == "__main__":

        bag_file = '/workspaces/foxy_leg_ws/rosbag2_2021_04_28-04_42_07/rosbag2_2021_04_28-04_42_07_0.db3'

        parser = BagFileParser(bag_file)

        trajectory = parser.get_messages("/joint_trajectory")[0][1] 
        p_des_1 = [trajectory.points[i].positions[0] for i in range(len(trajectory.points))]
        t_des = [trajectory.points[i].time_from_start.sec + trajectory.points[i].time_from_start.nanosec*1e-9 for i in range(len(trajectory.points))]

        actual = parser.get_messages("/joint_states")

        plt.plot(t_des, p_des_1)

        plt.show()

@lowbees
Copy link

lowbees commented Oct 29, 2021

How to deserialize cdr message without ros? (C++)

@cmd23333
Copy link

Such a method above were available in foxy as eloquent don't have rclpy.serialization
so can I do it with ros2 eloquent?

@reshmadhotre
Copy link

In Ros2, Is there an generic way of deserialising the messages as in ROS1, like
ros topic -b bagfile -p /topic
I am looking for generic way to convert ROS2 bag to csv

@ksatyaki
Copy link

I get a "string data is not null-terminated" on galactic when using the SequentialReader when deserializing messages.

@scch1092
Copy link

For anyone landing here and using the foxy branch that doesn't have the rosbag2_py. Here is an example using sqlite3

import sqlite3
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message
...

is there also the possibility to use own defined messages?

@weidezhang
Copy link

weidezhang commented Dec 15, 2023

For customized module, I get around it with using importlib directly. please see the following code:

while reader.has_next():
    topic, msg, t = reader.read_next()
    msg_type_str = "moduleprefix1.moduleprefix2." + type_map[topic]
    module_str, class_str = msg_type_str.rsplit(".", 1)
    module = importlib.import_module(module_str)
    msg_type = getattr(module, class_str)
    msg = deserialize_message(msg, msg_type)
    print(f'msg is {msg}')

For anyone landing here and using the foxy branch that doesn't have the rosbag2_py. Here is an example using sqlite3

import sqlite3
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message
...

is there also the possibility to use own defined messages?

@ksatyaki
Copy link

Is this obsolete with humble or rolling?

@nayak2358
Copy link

To add to the code of @anacsousa1 , here is the python script for anyone interested to get the values of each attribute of a topic in a CSV file:

import csv
import rosbag_api as bag
from rosidl_runtime_py.utilities import get_message
from rclpy.serialization import deserialize_message

bag_file = 'path/to/bag/file.db3'
topic_name = '/name_of_the_topic'
csv_file = '/output/deserialized/file.csv'

### connect to the database
conn, c = bag.connect(bag_file)

### get all topics names and types
topic_names = bag.getAllTopicsNames(c, print_out=False)
topic_types = bag.getAllMsgsTypes(c, print_out=False)

### Create a map for quicker lookup
type_map = {topic_names[i]:topic_types[i] for i in range(len(topic_types))}

### get all timestamps and all messages
t, msgs = bag.getAllMessagesInTopic(c, topic_name, print_out=False)

### To extract the attributes
def extract_fields(message):
    extracted_fields = []

    def recursive_extract(msg, parent_field=''):
        slots_without_underscores = [field[1:] if field.startswith('_') else field for field in msg.__slots__]
        for attr_name in slots_without_underscores:
            attr_value = getattr(msg, attr_name)
            if hasattr(attr_value, '__slots__'):
                recursive_extract(attr_value, f"{parent_field}.{attr_name}" if parent_field else attr_name)
            else:
                extracted_fields.append(f"{parent_field}.{attr_name}" if parent_field else attr_name)

    recursive_extract(message)
    
    return extracted_fields

### Open CSV file for writing
with open(csv_file, mode='w', newline='') as file:
    writer = csv.writer(file)

    ### Write headers
    ### Extract message structure to dynamically determine headers
    msg_type = get_message(type_map[topic_name])
    sample_msg = deserialize_message(msgs[0], msg_type)
    timestamp = sample_msg.header.stamp.sec + sample_msg.header.stamp.nanosec / 1e9
    
    headers = extract_fields(sample_msg)
    writer.writerow(['Time'] + headers)
    
    ### Debugging
    print(sample_msg)
    print(sample_msg.header.stamp.sec)
    print(eval(f"sample_msg.{headers[0]}"))
    
    ### Deserialize messages and write to CSV
    for msg in msgs:
        deserialized_msg = deserialize_message(msg, msg_type)
        timestamp = deserialized_msg.header.stamp.sec + deserialized_msg.header.stamp.nanosec / 1e9
        row = [timestamp] + [eval(f"sample_msg.{field}") for field in headers]
        writer.writerow(row)

### Close connection to the database
bag.close(conn)

print("Deserialized messages saved to", csv_file)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request
Projects
None yet
Development

No branches or pull requests