Skip to content

Commit

Permalink
uORB Subscription callbacks with WorkItem scheduling on publication
Browse files Browse the repository at this point in the history
  • Loading branch information
dagar committed Jun 10, 2019
1 parent b0176dc commit 571dc35
Show file tree
Hide file tree
Showing 12 changed files with 401 additions and 7 deletions.
26 changes: 26 additions & 0 deletions src/include/containers/IntrusiveQueue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,32 @@ class IntrusiveQueue
return ret;
}

bool remove(T removeNode)
{
// base case
if (removeNode == _head) {
_head = nullptr;
return true;
}

for (T node = _head; node != nullptr; node = node->next_intrusive_queue_node()) {
// is sibling the node to remove?
if (node->next_intrusive_queue_node() == removeNode) {
// replace sibling
if (node->next_intrusive_queue_node() != nullptr) {
node->set_next_intrusive_queue_node(node->next_intrusive_queue_node()->next_intrusive_queue_node());

} else {
node->set_next_intrusive_queue_node(nullptr);
}

return true;
}
}

return false;
}

private:

T _head{nullptr};
Expand Down
2 changes: 2 additions & 0 deletions src/modules/uORB/Subscription.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ class Subscription
uint8_t get_instance() const { return _instance; }
orb_id_t get_topic() const { return _meta; }

DeviceNode *get_node() { return _node; }

protected:

bool subscribe();
Expand Down
137 changes: 137 additions & 0 deletions src/modules/uORB/SubscriptionCallback.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
/****************************************************************************
*
* Copyright (c) 2019 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/

/**
* @file SubscriptionCallback.hpp
*
*/

#pragma once

#include <uORB/SubscriptionInterval.hpp>
#include <containers/List.hpp>
#include <px4_work_queue/WorkItem.hpp>

namespace uORB
{

// Subscription wrapper class with callbacks on new publications
class SubscriptionCallback : public SubscriptionInterval, public ListNode<SubscriptionCallback *>
{
public:
/**
* Constructor
*
* @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic.
* @param instance The instance for multi sub.
*/
SubscriptionCallback(const orb_metadata *meta, uint8_t interval_ms, uint8_t instance = 0) :
SubscriptionInterval(meta, interval_ms, instance)
{
}

virtual ~SubscriptionCallback()
{
unregister_callback();
};

bool register_callback()
{
if (_subscription.get_node() && _subscription.get_node()->register_callback(this)) {
// registered
_callback_registered = true;

} else {
// force topic creation by subscribing with old API
int fd = orb_subscribe_multi(_subscription.get_topic(), _subscription.get_instance());

// try to register callback again
if (_subscription.forceInit() && _subscription.get_node()->register_callback(this)) {
_callback_registered = true;
}

orb_unsubscribe(fd);
}

return _callback_registered;
}

void unregister_callback()
{
if (_subscription.get_node()) {
_subscription.get_node()->unregister_callback(this);
}

_callback_registered = false;
}

virtual void call() = 0;

protected:

bool _callback_registered{false};

};

// Subscription with callback that schedules a WorkItem
class SubscriptionCallbackWorkItem : public SubscriptionCallback
{
public:
/**
* Constructor
*
* @param work_item The WorkItem that will be scheduled immediately on new publications.
* @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic.
* @param instance The instance for multi sub.
*/
SubscriptionCallbackWorkItem(px4::WorkItem *work_item, const orb_metadata *meta, uint8_t instance = 0) :
SubscriptionCallback(meta, instance),
_work_item(work_item)
{
}

virtual ~SubscriptionCallbackWorkItem() = default;

void call() override
{
// schedule immediately if no interval, otherwise check time elapsed
if ((_interval == 0) || (hrt_absolute_time() >= (_last_update + (_interval * 1000)))) {
_work_item->ScheduleNow();
}
}

private:
px4::WorkItem *_work_item;
};

} // namespace uORB
135 changes: 135 additions & 0 deletions src/modules/uORB/SubscriptionInterval.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,135 @@
/****************************************************************************
*
* Copyright (c) 2019 PX4 Development Team. All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions
* are met:
*
* 1. Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* 2. Redistributions in binary form must reproduce the above copyright
* notice, this list of conditions and the following disclaimer in
* the documentation and/or other materials provided with the
* distribution.
* 3. Neither the name PX4 nor the names of its contributors may be
* used to endorse or promote products derived from this software
* without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS
* FOR A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE
* COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT,
* INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING,
* BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS
* OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED
* AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT
* LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN
* ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
* POSSIBILITY OF SUCH DAMAGE.
*
****************************************************************************/

/**
* @file SubscriptionInterval.hpp
*
*/

#pragma once

#include <uORB/uORB.h>
#include <px4_defines.h>

#include "uORBDeviceNode.hpp"
#include "uORBManager.hpp"
#include "uORBUtils.hpp"

#include "Subscription.hpp"

namespace uORB
{

// Base subscription wrapper class
class SubscriptionInterval
{
public:

/**
* Constructor
*
* @param meta The uORB metadata (usually from the ORB_ID() macro) for the topic.
* @param interval The requested maximum update interval in milliseconds.
* @param instance The instance for multi sub.
*/
SubscriptionInterval(const orb_metadata *meta, uint16_t interval_ms = 0, uint8_t instance = 0) :
_subscription{meta, instance},
_interval(interval_ms)
{}

SubscriptionInterval() : _subscription{nullptr} {}

~SubscriptionInterval() = default;

bool published() { return _subscription.published(); }

/**
* Check if there is a new update.
* */
bool updated()
{
const bool elapsed = (hrt_elapsed_time(&_last_update) >= (_interval * 1000));

if (published() && elapsed) {
return _subscription.updated();
}

return false;
}

/**
* Copy the struct if updated.
* @param dst The destination pointer where the struct will be copied.
* @return true only if topic was updated and copied successfully.
*/
bool update(void *dst)
{
if (updated()) {
return copy(dst);
}

return false;
}

/**
* Copy the struct
* @param dst The destination pointer where the struct will be copied.
* @return true only if topic was copied successfully.
*/
bool copy(void *dst)
{
if (_subscription.copy(dst)) {
_last_update = hrt_absolute_time();
return true;
}

return false;
}

bool valid() const { return _subscription.valid(); }

uint8_t get_instance() const { return _subscription.get_instance(); }
orb_id_t get_topic() const { return _subscription.get_topic(); }
uint16_t get_interval() const { return _interval; }

void set_interval(uint16_t interval) { _interval = interval; }

protected:

Subscription _subscription;
uint64_t _last_update{0}; // last update in microseconds
uint16_t _interval{0}; // maximum update interval in milliseconds

};

} // namespace uORB
37 changes: 37 additions & 0 deletions src/modules/uORB/uORBDeviceNode.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@
#include "uORBUtils.hpp"
#include "uORBManager.hpp"

#include "SubscriptionCallback.hpp"

#ifdef ORB_COMMUNICATOR
#include "uORBCommunicator.hpp"
#endif /* ORB_COMMUNICATOR */
Expand Down Expand Up @@ -317,6 +319,11 @@ uORB::DeviceNode::write(cdev::file_t *filp, const char *buffer, size_t buflen)

_published = true;

// callbacks
for (auto item : _callbacks) {
item->call();
}

ATOMIC_LEAVE;

/* notify any poll waiters */
Expand Down Expand Up @@ -673,3 +680,33 @@ int uORB::DeviceNode::update_queue_size(unsigned int queue_size)
_queue_size = queue_size;
return PX4_OK;
}

bool
uORB::DeviceNode::register_callback(uORB::SubscriptionCallback *callback_sub)
{
if (callback_sub != nullptr) {
ATOMIC_ENTER;

// prevent duplicate registrations
for (auto existing_callbacks : _callbacks) {
if (callback_sub == existing_callbacks) {
ATOMIC_LEAVE;
return true;
}
}

_callbacks.add(callback_sub);
ATOMIC_LEAVE;
return true;
}

return false;
}

void
uORB::DeviceNode::unregister_callback(uORB::SubscriptionCallback *callback_sub)
{
ATOMIC_ENTER;
_callbacks.remove(callback_sub);
ATOMIC_LEAVE;
}
8 changes: 8 additions & 0 deletions src/modules/uORB/uORBDeviceNode.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ namespace uORB
class DeviceNode;
class DeviceMaster;
class Manager;
class SubscriptionCallback;
}

/**
Expand Down Expand Up @@ -231,6 +232,12 @@ class uORB::DeviceNode : public cdev::CDev, public ListNode<uORB::DeviceNode *>
*/
uint64_t copy_and_get_timestamp(void *dst, unsigned &generation);

// add item to list of work items to schedule on node update
bool register_callback(SubscriptionCallback *callback_sub);

// remove item from list of work items
void unregister_callback(SubscriptionCallback *callback_sub);

protected:

pollevent_t poll_state(cdev::file_t *filp) override;
Expand Down Expand Up @@ -269,6 +276,7 @@ class uORB::DeviceNode : public cdev::CDev, public ListNode<uORB::DeviceNode *>
uint8_t *_data{nullptr}; /**< allocated object buffer */
hrt_abstime _last_update{0}; /**< time the object was last updated */
volatile unsigned _generation{0}; /**< object generation count */
List<uORB::SubscriptionCallback *> _callbacks;
uint8_t _priority; /**< priority of the topic */
bool _published{false}; /**< has ever data been published */
uint8_t _queue_size; /**< maximum number of elements in the queue */
Expand Down
Loading

0 comments on commit 571dc35

Please sign in to comment.