Skip to content

Commit

Permalink
Merge pull request #2360 from matkatz/playback_realtime_fix
Browse files Browse the repository at this point in the history
Playback realtime fix
  • Loading branch information
ev-mp authored Sep 17, 2018
2 parents ff4f5e5 + 06b1a38 commit 3231974
Show file tree
Hide file tree
Showing 17 changed files with 347 additions and 176 deletions.
16 changes: 11 additions & 5 deletions src/archive.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ namespace librealsense
rs2_time_t backend_timestamp = 0;
rs2_time_t last_timestamp = 0;
unsigned long long last_frame_number = 0;
bool is_blocking = false;

frame_additional_data() {};

Expand All @@ -44,14 +45,16 @@ namespace librealsense
const uint8_t* md_buf,
double backend_time,
rs2_time_t last_timestamp,
unsigned long long last_frame_number)
unsigned long long last_frame_number,
bool in_is_blocking)
: timestamp(in_timestamp),
frame_number(in_frame_number),
system_time(in_system_time),
metadata_size(md_size),
backend_timestamp(backend_time),
last_timestamp(last_timestamp),
last_frame_number(last_frame_number)
last_frame_number(last_frame_number),
is_blocking(in_is_blocking)
{
// Copy up to 255 bytes to preserve metadata as raw data
if (metadata_size)
Expand Down Expand Up @@ -156,6 +159,9 @@ namespace librealsense
void mark_fixed() override { _fixed = true; }
bool is_fixed() const override { return _fixed; }

void set_blocking(bool state) override { additional_data.is_blocking = state; }
bool is_blocking() const override { return additional_data.is_blocking; }

private:
// TODO: check boost::intrusive_ptr or an alternative
std::atomic<int> ref_count; // the reference count is on how many times this placeholder has been observed (not lifetime, not content)
Expand Down Expand Up @@ -303,7 +309,7 @@ namespace librealsense
return((depth_frame*)_original.frame)->get_distance(x, y);

uint64_t pixel = 0;
switch (get_bpp()/8) // bits per pixel
switch (get_bpp() / 8) // bits per pixel
{
case 1: pixel = get_frame_data()[y*get_width() + x]; break;
case 2: pixel = reinterpret_cast<const uint16_t*>(get_frame_data())[y*get_width() + x]; break;
Expand Down Expand Up @@ -353,7 +359,7 @@ namespace librealsense
try
{
auto depth_sensor = As<librealsense::depth_sensor>(sensor);
if(depth_sensor != nullptr)
if (depth_sensor != nullptr)
{
return depth_sensor->get_depth_scale();
}
Expand Down Expand Up @@ -484,4 +490,4 @@ namespace librealsense

MAP_EXTENSION(RS2_EXTENSION_POSE_FRAME, librealsense::pose_frame);

}
}
183 changes: 130 additions & 53 deletions src/concurrency.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,113 +14,181 @@ const int QUEUE_MAX_SIZE = 10;
template<class T>
class single_consumer_queue
{
std::deque<T> q;
std::mutex mutex;
std::condition_variable cv; // not empty signal
unsigned int cap;
bool accepting;
std::deque<T> _queue;
std::mutex _mutex;
std::condition_variable _deq_cv; // not empty signal
std::condition_variable _enq_cv; // not empty signal

unsigned int _cap;
bool _accepting;

// flush mechanism is required to abort wait on cv
// when need to stop
std::atomic<bool> need_to_flush;
std::atomic<bool> was_flushed;
std::condition_variable was_flushed_cv;
std::mutex was_flushed_mutex;
std::atomic<bool> _need_to_flush;
std::atomic<bool> _was_flushed;
public:
explicit single_consumer_queue<T>(unsigned int cap = QUEUE_MAX_SIZE)
: q(), mutex(), cv(), cap(cap), need_to_flush(false), was_flushed(false), accepting(true)
: _queue(), _mutex(), _deq_cv(), _enq_cv(), _cap(cap), _need_to_flush(false), _was_flushed(false), _accepting(true)
{}

void enqueue(T&& item)
{
std::unique_lock<std::mutex> lock(mutex);
if (accepting)
std::unique_lock<std::mutex> lock(_mutex);
if (_accepting)
{
q.push_back(std::move(item));
if (q.size() > cap)
_queue.push_back(std::move(item));
if (_queue.size() > _cap)
{
q.pop_front();
_queue.pop_front();
}
}
lock.unlock();
cv.notify_one();
_deq_cv.notify_one();
}

bool dequeue(T* item ,unsigned int timeout_ms = 5000)
void blocking_enqueue(T&& item)
{
std::unique_lock<std::mutex> lock(mutex);
accepting = true;
was_flushed = false;
const auto ready = [this]() { return (q.size() > 0) || need_to_flush; };
if (!ready() && !cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
{
return false;
}
auto pred = [this]()->bool { return _queue.size() <= _cap; };

if (q.size() <= 0)
std::unique_lock<std::mutex> lock(_mutex);
if (_accepting)
{
return false;
_enq_cv.wait(lock, pred);
_queue.push_back(std::move(item));
}
*item = std::move(q.front());
q.pop_front();
return true;
lock.unlock();
_deq_cv.notify_one();
}

bool peek(T** item)

bool dequeue(T* item ,unsigned int timeout_ms = 5000)
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(_mutex);
_accepting = true;
_was_flushed = false;
const auto ready = [this]() { return (_queue.size() > 0) || _need_to_flush; };
if (!ready() && !_deq_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready))
{
return false;
}

if (q.size() <= 0)
if (_queue.size() <= 0)
{
return false;
}
*item = &q.front();
*item = std::move(_queue.front());
_queue.pop_front();
_enq_cv.notify_one();
return true;
}

bool try_dequeue(T* item)
{
std::unique_lock<std::mutex> lock(mutex);
accepting = true;
if (q.size() > 0)
std::unique_lock<std::mutex> lock(_mutex);
_accepting = true;
if (_queue.size() > 0)
{
auto val = std::move(q.front());
q.pop_front();
auto val = std::move(_queue.front());
_queue.pop_front();
*item = std::move(val);
_enq_cv.notify_one();
return true;
}
return false;
}

bool peek(T** item)
{
std::unique_lock<std::mutex> lock(_mutex);

if (_queue.size() <= 0)
{
return false;
}
*item = &_queue.front();
return true;
}

void clear()
{
std::unique_lock<std::mutex> lock(mutex);
std::unique_lock<std::mutex> lock(_mutex);

accepting = false;
need_to_flush = true;
_accepting = false;
_need_to_flush = true;

while (q.size() > 0)
while (_queue.size() > 0)
{
auto item = std::move(q.front());
q.pop_front();
auto item = std::move(_queue.front());
_queue.pop_front();
}
cv.notify_all();
_deq_cv.notify_all();
}

void start()
{
std::unique_lock<std::mutex> lock(mutex);
need_to_flush = false;
accepting = true;
std::unique_lock<std::mutex> lock(_mutex);
_need_to_flush = false;
_accepting = true;
}

size_t size()
{
std::unique_lock<std::mutex> lock(mutex);
return q.size();
std::unique_lock<std::mutex> lock(_mutex);
return _queue.size();
}
};

template<class T>
class single_consumer_frame_queue
{
single_consumer_queue<T> _queue;

public:
single_consumer_frame_queue<T>(unsigned int cap = QUEUE_MAX_SIZE) : _queue(cap) {}

void enqueue(T&& item)
{
if (item.is_blocking())
_queue.blocking_enqueue(std::move(item));
else
_queue.enqueue(std::move(item));
}

bool dequeue(T* item, unsigned int timeout_ms = 5000)
{
return _queue.dequeue(item, timeout_ms);
}

bool peek(T** item)
{
return _queue.peek(item);
}

bool try_dequeue(T* item)
{
return _queue.try_dequeue(item);
}

void clear()
{
_queue.clear();
}

void start()
{
_queue.start();
}

void flush()
{
_queue.flush();
}

size_t size()
{
return _queue.size();
}
};

class dispatcher
{
Expand Down Expand Up @@ -181,11 +249,14 @@ class dispatcher
}

template<class T>
void invoke(T item)
void invoke(T item, bool is_blocking = false)
{
if (!_was_stopped)
{
_queue.enqueue(std::move(item));
if(is_blocking)
_queue.blocking_enqueue(std::move(item));
else
_queue.enqueue(std::move(item));
}
}

Expand Down Expand Up @@ -248,6 +319,12 @@ class dispatcher
*wait_sucess = cv.wait_for(locker, std::chrono::seconds(10), [&]() { return invoked || _was_stopped; });
return *wait_sucess;
}

bool empty()
{
return _queue.size() == 0;
}

private:
friend cancellable_timer;
single_consumer_queue<std::function<void(cancellable_timer)>> _queue;
Expand Down
46 changes: 44 additions & 2 deletions src/core/streaming.h
Original file line number Diff line number Diff line change
Expand Up @@ -111,12 +111,54 @@ namespace librealsense

virtual void mark_fixed() = 0;
virtual bool is_fixed() const = 0;
virtual void set_blocking(bool state) = 0;
virtual bool is_blocking() const = 0;

virtual void keep() = 0;

virtual ~frame_interface() = default;
};

struct frame_holder
{
frame_interface* frame;

frame_interface* operator->()
{
return frame;
}

operator bool() const { return frame != nullptr; }

operator frame_interface*() const { return frame; }

frame_holder(frame_interface* f)
{
frame = f;
}

~frame_holder();

frame_holder(frame_holder&& other)
: frame(other.frame)
{
other.frame = nullptr;
}

frame_holder() : frame(nullptr) {}


frame_holder& operator=(frame_holder&& other);

frame_holder clone() const;

bool is_blocking() const { return frame->is_blocking(); };

private:
frame_holder& operator=(const frame_holder& other) = delete;
frame_holder(const frame_holder& other);
};

using on_frame = std::function<void(frame_interface*)>;
using stream_profiles = std::vector<std::shared_ptr<stream_profile_interface>>;

Expand Down Expand Up @@ -221,7 +263,7 @@ namespace librealsense

MAP_EXTENSION(RS2_EXTENSION_DEPTH_STEREO_SENSOR, librealsense::depth_stereo_sensor);

class depth_stereo_sensor_snapshot : public depth_stereo_sensor, public depth_sensor_snapshot
class depth_stereo_sensor_snapshot : public depth_stereo_sensor, public depth_sensor_snapshot
{
public:
depth_stereo_sensor_snapshot(float depth_units, float stereo_bl_mm) :
Expand Down Expand Up @@ -256,4 +298,4 @@ namespace librealsense
private:
float m_stereo_baseline_mm;
};
}
}
Loading

0 comments on commit 3231974

Please sign in to comment.