Skip to content

Commit

Permalink
uORB: Remove dynamic queue size handling
Browse files Browse the repository at this point in the history
This changes the queue size handling to match the upstream commit

    "
      commit 57df7e3
      Author: Eric Katzfey <[email protected]>
      Date:   Fri Mar 8 13:28:24 2024 -0800

          uORB: make queue size (ORB_QUEUE_LENGTH) completely static (PX4#22815)
    "

Signed-off-by: Jukka Laitinen <[email protected]>
  • Loading branch information
Jukka Laitinen authored and jlaitine committed Feb 10, 2025
1 parent acbb2ae commit 460e27d
Show file tree
Hide file tree
Showing 6 changed files with 46 additions and 87 deletions.
4 changes: 2 additions & 2 deletions platforms/common/uORB/uORB.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,12 +77,12 @@ int orb_poll(orb_poll_struct_t *fds, unsigned int nfds, int timeout)

orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data)
{
return uORB::Manager::get_instance()->orb_advertise(meta, data, meta->o_queue);
return uORB::Manager::get_instance()->orb_advertise(meta, data);
}

orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance)
{
return uORB::Manager::get_instance()->orb_advertise_multi(meta, data, instance, meta->o_queue);
return uORB::Manager::get_instance()->orb_advertise_multi(meta, data, instance);
}

int orb_unadvertise(orb_advert_t handle)
Expand Down
5 changes: 2 additions & 3 deletions platforms/common/uORB/uORB.h
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,6 @@ typedef struct {
#ifndef CONFIG_BUILD_FLAT
void *data;
#endif
size_t data_size;
} orb_advert_t;

/**
Expand All @@ -144,9 +143,9 @@ typedef void *orb_sub_t;

static inline bool orb_advert_valid(orb_advert_t handle) {return handle.node != NULL;}
#ifndef CONFIG_BUILD_FLAT
static const orb_advert_t ORB_ADVERT_INVALID = {NULL, NULL, 0};
static const orb_advert_t ORB_ADVERT_INVALID = {NULL, NULL};
#else
static const orb_advert_t ORB_ADVERT_INVALID = {NULL, 0};
static const orb_advert_t ORB_ADVERT_INVALID = {NULL};
#endif
static inline bool orb_sub_valid(orb_sub_t handle) {return handle != NULL;}
static const orb_sub_t ORB_SUB_INVALID = NULL;
Expand Down
100 changes: 30 additions & 70 deletions platforms/common/uORB/uORBDeviceNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ orb_advert_t uORB::DeviceNode::MappingCache::map_node(ORB_ID orb_id, uint8_t ins
}

#if !defined(CONFIG_BUILD_FLAT)
orb_advert_t uORB::DeviceNode::MappingCache::map_data(orb_advert_t handle, int shm_fd, size_t size, bool publisher)
orb_advert_t uORB::DeviceNode::MappingCache::map_data(orb_advert_t handle, int shm_fd, bool publisher)
{
lock();

Expand All @@ -129,32 +129,28 @@ orb_advert_t uORB::DeviceNode::MappingCache::map_data(orb_advert_t handle, int s

if (item != nullptr) {

if (item->handle.data != nullptr && item->handle.data_size == size) {
if (shm_fd >= 0 && item->handle.data != nullptr) {
// Mapped already, return the mapping
handle = item->handle;

} else {
// Drop any old mapping if exists
if (handle.data != nullptr) {
px4_munmap(handle.data, handle.data_size);
px4_munmap(handle.data, node(handle)->data_size());
}

// Map the data with new size
if (shm_fd >= 0 && size > 0) {
handle.data = px4_mmap(0, size, publisher ? PROT_WRITE : PROT_READ, MAP_SHARED, shm_fd, 0);
// Map the data
if (shm_fd >= 0) {
handle.data = px4_mmap(0, node(handle)->data_size(), publisher ? PROT_WRITE : PROT_READ, MAP_SHARED, shm_fd, 0);

if (handle.data == MAP_FAILED) {
handle.data = nullptr;
handle.data_size = 0;
PX4_ERR("MMAP fail\n");

} else {
handle.data_size = size;
}

} else {
handle.data = nullptr;
handle.data_size = 0;
}

item->handle = handle;
Expand Down Expand Up @@ -195,7 +191,7 @@ bool uORB::DeviceNode::MappingCache::del(const orb_advert_t &handle)
#ifndef CONFIG_BUILD_FLAT

if (handle.data) {
px4_munmap(handle.data, handle.data_size);
px4_munmap(handle.data, node(handle)->data_size());
}

#endif
Expand Down Expand Up @@ -312,7 +308,6 @@ int uORB::DeviceNode::nodeClose(orb_advert_t &handle)
}

if (node(handle)->_publisher_count == 0) {
node(handle)->_queue_size = 0;
node(handle)->_data_valid = false;

// Delete the data
Expand All @@ -321,7 +316,7 @@ int uORB::DeviceNode::nodeClose(orb_advert_t &handle)
node(handle)->_data = nullptr;
#else
shm_unlink(node(handle)->get_devname() + 1);
MappingCache::map_data(handle, -1, 0, false);
MappingCache::map_data(handle, -1, false);
#endif

// If there are no more subscribers, delete the node and its mapping
Expand All @@ -343,7 +338,7 @@ int uORB::DeviceNode::nodeClose(orb_advert_t &handle)
return PX4_OK;
}

orb_advert_t uORB::DeviceNode::orb_advertise(const ORB_ID id, int instance, unsigned queue_size,
orb_advert_t uORB::DeviceNode::orb_advertise(const ORB_ID id, int instance,
bool publisher)
{
/* Open the node, if it exists or create a new one */
Expand All @@ -352,13 +347,13 @@ orb_advert_t uORB::DeviceNode::orb_advertise(const ORB_ID id, int instance, unsi
handle = nodeOpen(id, instance, true);

if (orb_advert_valid(handle)) {
node(handle)->advertise(publisher, queue_size);
node(handle)->advertise(publisher);
}

return handle;
}

int uORB::DeviceNode::advertise(bool publisher, uint8_t queue_size)
int uORB::DeviceNode::advertise(bool publisher)
{
int ret = -1;

Expand All @@ -368,8 +363,6 @@ int uORB::DeviceNode::advertise(bool publisher, uint8_t queue_size)
ret = ++_publisher_count;
}

update_queue_size(queue_size);

return ret;
}

Expand Down Expand Up @@ -443,7 +436,7 @@ uORB::DeviceNode::~DeviceNode()

/* Map the node data to the memory space of publisher or subscriber */

void uORB::DeviceNode::remap_data(orb_advert_t &handle, size_t new_size, bool publisher)
void uORB::DeviceNode::map_data(orb_advert_t &handle, bool publisher)
{
// In NuttX flat and protected builds, just malloc the data (from user heap)
// and store
Expand All @@ -452,20 +445,9 @@ void uORB::DeviceNode::remap_data(orb_advert_t &handle, size_t new_size, bool pu

#ifdef CONFIG_BUILD_FLAT

// Data size has changed, re-allocate (remap) by publisher
// The remapping may happen only on the first write,
// when the handle.data_size==0

if (publisher && handle.data_size == 0) {
free(_data);
_data = zalloc(new_size);
}

if (_data != nullptr) {
handle.data_size = new_size;

} else {
handle.data_size = 0;
// Allocate the data by the first publisher
if (publisher && _data == nullptr) {
_data = zalloc(node(handle)->data_size());
}

#else
Expand All @@ -476,16 +458,15 @@ void uORB::DeviceNode::remap_data(orb_advert_t &handle, size_t new_size, bool pu

// and mmap it
if (shm_fd >= 0) {
// For the publisher, set the new data size
if (publisher && handle.data_size == 0) {
if (ftruncate(shm_fd, new_size) != 0) {
if (publisher && handle.data == nullptr) {
if (ftruncate(shm_fd, node(handle)->data_size()) != 0) {
::close(shm_fd);
PX4_ERR("Setting advertise size failed\n");
return;
}
}

handle = MappingCache::map_data(handle, shm_fd, new_size, publisher);
handle = MappingCache::map_data(handle, shm_fd, node(handle)->data_size(), publisher);

// Close the shm, there is no need to leave it open
::close(shm_fd);
Expand All @@ -512,20 +493,20 @@ bool uORB::DeviceNode::copy(void *dst, orb_advert_t &handle, unsigned &generatio
}

size_t o_size = get_meta()->o_size;
size_t data_size = _queue_size * o_size;
size_t o_queue = get_meta()->o_queue;

lock();

if (data_size != handle.data_size) {
remap_data(handle, data_size, false);
if (node_data(handle) == nullptr) {
map_data(handle, false);

if (node_data(handle) == nullptr) {
unlock();
return false;
}
}

if (_queue_size == 1) {
if (o_queue == 1) {
memcpy(dst, node_data(handle), o_size);
generation = _generation.load();

Expand All @@ -540,12 +521,12 @@ bool uORB::DeviceNode::copy(void *dst, orb_advert_t &handle, unsigned &generatio
}

/* Compatible with normal and overflow conditions */
if (current_generation - generation > _queue_size) {
if (current_generation - generation > o_queue) {
/* Reader is too far behind: some messages are lost */
generation = current_generation - _queue_size;
generation = current_generation - o_queue;
}

memcpy(dst, ((uint8_t *)node_data(handle)) + (o_size * (generation % _queue_size)), o_size);
memcpy(dst, ((uint8_t *)node_data(handle)) + (o_size * (generation % o_queue)), o_size);

++generation;
}
Expand All @@ -560,15 +541,13 @@ ssize_t
uORB::DeviceNode::write(const char *buffer, const orb_metadata *meta, orb_advert_t &handle)
{
size_t o_size = meta->o_size;

/* If data size has changed, re-map the data */
size_t data_size = _queue_size * o_size;
size_t o_queue = meta->o_queue;

/* Perform an atomic copy. */
lock();

if (data_size != handle.data_size) {
remap_data(handle, data_size, true);
if (node_data(handle) == nullptr) {
map_data(handle, true);
}

if (node_data(handle) == nullptr) {
Expand All @@ -579,7 +558,7 @@ uORB::DeviceNode::write(const char *buffer, const orb_metadata *meta, orb_advert
/* wrap-around happens after ~49 days, assuming a publisher rate of 1 kHz */
unsigned generation = _generation.fetch_add(1);

memcpy(((uint8_t *)node_data(handle)) + o_size * (generation % _queue_size), buffer, o_size);
memcpy(((uint8_t *)node_data(handle)) + o_size * (generation % o_queue), buffer, o_size);

/* Mark at least one data has been published */
_data_valid = true;
Expand Down Expand Up @@ -711,7 +690,7 @@ orb_advert_t uORB::DeviceNode::add_subscriber(ORB_ID orb_id, uint8_t instance,
orb_advert_t handle;

if (advertise) {
handle = orb_advertise(orb_id, instance, 0, false);
handle = orb_advertise(orb_id, instance, false);

} else {
handle = nodeOpen(orb_id, instance, false);
Expand Down Expand Up @@ -827,25 +806,6 @@ int16_t uORB::DeviceNode::process_received_message(orb_advert_t &handle, int32_t
}
#endif /* CONFIG_ORB_COMMUNICATOR */

int uORB::DeviceNode::update_queue_size(unsigned int queue_size)
{
// subscribers may advertise the node, but not set the queue_size
if (queue_size == 0) {
return PX4_OK;
}

queue_size = round_pow_of_two_8(queue_size);

// queue size is limited to 255 for the single reason that we use uint8 to store it
if (queue_size > 255) {
return PX4_ERROR;
}

_queue_size = queue_size;

return PX4_OK;
}

bool
uORB::DeviceNode::_register_callback(uORB::SubscriptionCallback *cb_sub,
int8_t poll_lock, hrt_abstime last_update, uint32_t interval_us, uorb_cb_handle_t &cb_handle)
Expand Down
12 changes: 7 additions & 5 deletions platforms/common/uORB/uORBDeviceNode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,7 @@ class DeviceNode
*/
static ssize_t publish(const orb_metadata *meta, orb_advert_t &handle, const void *data);

static orb_advert_t orb_advertise(const ORB_ID id, int instance, unsigned queue_size, bool publisher);
static orb_advert_t orb_advertise(const ORB_ID id, int instance, bool publisher);

static int orb_unadvertise(orb_advert_t &handle, bool publisher);

Expand Down Expand Up @@ -178,7 +178,7 @@ class DeviceNode
*/
bool print_statistics(int max_topic_length);

uint8_t get_queue_size() const { return _queue_size; }
uint8_t get_queue_size() const { return get_orb_meta(_orb_id)->o_queue; }
/**
* Get count of subscribers for this topic
* @return number of active subscribers
Expand All @@ -200,6 +200,8 @@ class DeviceNode

const orb_metadata *get_meta() const { return get_orb_meta(_orb_id); }

size_t data_size() const { return get_meta()->o_size * get_meta()->o_queue; }

ORB_ID id() const { return _orb_id; }

const char *get_name() const { return get_orb_meta(_orb_id)->o_name; }
Expand Down Expand Up @@ -306,7 +308,7 @@ class DeviceNode
}
static orb_advert_t get(ORB_ID orb_id, uint8_t instance);
static orb_advert_t map_node(ORB_ID orb_id, uint8_t instance, int shm_fd);
static orb_advert_t map_data(orb_advert_t handle, int shm_fd, size_t size, bool publisher);
static orb_advert_t map_data(orb_advert_t handle, int shm_fd, bool publisher);
static bool del(const orb_advert_t &handle);

static void lock()
Expand All @@ -325,7 +327,7 @@ class DeviceNode

DeviceNode(const ORB_ID id, const uint8_t instance, const char *path);

int advertise(bool publisher, uint8_t queue_size);
int advertise(bool publisher);
int unadvertise(bool publisher);

/**
Expand All @@ -336,7 +338,7 @@ class DeviceNode

void _add_subscriber(unsigned *initial_generation);

void remap_data(orb_advert_t &handle, size_t new_size, bool advertiser);
void map_data(orb_advert_t &handle, bool advertiser);

inline static DeviceNode *node(const orb_advert_t &handle) { return static_cast<DeviceNode *>(handle.node); }
#ifdef CONFIG_BUILD_FLAT
Expand Down
5 changes: 2 additions & 3 deletions platforms/common/uORB/uORBManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -215,8 +215,7 @@ int uORB::Manager::orb_exists(const struct orb_metadata *meta, int instance)
return PX4_ERROR;
}

orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance,
uint8_t queue_size)
orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance)
{
#ifdef ORB_USE_PUBLISHER_RULES

Expand Down Expand Up @@ -263,7 +262,7 @@ orb_advert_t uORB::Manager::orb_advertise_multi(const struct orb_metadata *meta,
return ORB_ADVERT_INVALID;
}

orb_advert_t handle = uORB::DeviceNode::orb_advertise(static_cast<ORB_ID>(meta->o_id), group_tries, queue_size, true);
orb_advert_t handle = uORB::DeviceNode::orb_advertise(static_cast<ORB_ID>(meta->o_id), group_tries, true);

if (instance != nullptr) {
*instance = group_tries;
Expand Down
7 changes: 3 additions & 4 deletions platforms/common/uORB/uORBManager.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -119,9 +119,9 @@ class Manager
* ORB_DEFINE with no corresponding ORB_DECLARE)
* this function will return nullptr and set errno to ENOENT.
*/
orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data, unsigned int queue_size = 1)
orb_advert_t orb_advertise(const struct orb_metadata *meta, const void *data)
{
return orb_advertise_multi(meta, data, nullptr, queue_size);
return orb_advertise_multi(meta, data, nullptr);
}

/**
Expand Down Expand Up @@ -154,8 +154,7 @@ class Manager
* ORB_DEFINE with no corresponding ORB_DECLARE)
* this function will return nullptr and set errno to ENOENT.
*/
orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance,
uint8_t queue_size = 1);
orb_advert_t orb_advertise_multi(const struct orb_metadata *meta, const void *data, int *instance);

/**
* Unadvertise a topic.
Expand Down

0 comments on commit 460e27d

Please sign in to comment.