diff --git a/src/archive.h b/src/archive.h index 73e63edeac..b582e9e58a 100644 --- a/src/archive.h +++ b/src/archive.h @@ -34,6 +34,7 @@ namespace librealsense rs2_time_t backend_timestamp = 0; rs2_time_t last_timestamp = 0; unsigned long long last_frame_number = 0; + bool is_blocking = false; frame_additional_data() {}; @@ -44,14 +45,16 @@ namespace librealsense const uint8_t* md_buf, double backend_time, rs2_time_t last_timestamp, - unsigned long long last_frame_number) + unsigned long long last_frame_number, + bool in_is_blocking) : timestamp(in_timestamp), frame_number(in_frame_number), system_time(in_system_time), metadata_size(md_size), backend_timestamp(backend_time), last_timestamp(last_timestamp), - last_frame_number(last_frame_number) + last_frame_number(last_frame_number), + is_blocking(in_is_blocking) { // Copy up to 255 bytes to preserve metadata as raw data if (metadata_size) @@ -156,6 +159,9 @@ namespace librealsense void mark_fixed() override { _fixed = true; } bool is_fixed() const override { return _fixed; } + void set_blocking(bool state) override { additional_data.is_blocking = state; } + bool is_blocking() const override { return additional_data.is_blocking; } + private: // TODO: check boost::intrusive_ptr or an alternative std::atomic ref_count; // the reference count is on how many times this placeholder has been observed (not lifetime, not content) @@ -303,7 +309,7 @@ namespace librealsense return((depth_frame*)_original.frame)->get_distance(x, y); uint64_t pixel = 0; - switch (get_bpp()/8) // bits per pixel + switch (get_bpp() / 8) // bits per pixel { case 1: pixel = get_frame_data()[y*get_width() + x]; break; case 2: pixel = reinterpret_cast(get_frame_data())[y*get_width() + x]; break; @@ -353,7 +359,7 @@ namespace librealsense try { auto depth_sensor = As(sensor); - if(depth_sensor != nullptr) + if (depth_sensor != nullptr) { return depth_sensor->get_depth_scale(); } @@ -484,4 +490,4 @@ namespace librealsense MAP_EXTENSION(RS2_EXTENSION_POSE_FRAME, librealsense::pose_frame); -} +} \ No newline at end of file diff --git a/src/concurrency.h b/src/concurrency.h index 068620bd9f..d8b01c12bc 100644 --- a/src/concurrency.h +++ b/src/concurrency.h @@ -14,113 +14,181 @@ const int QUEUE_MAX_SIZE = 10; template class single_consumer_queue { - std::deque q; - std::mutex mutex; - std::condition_variable cv; // not empty signal - unsigned int cap; - bool accepting; + std::deque _queue; + std::mutex _mutex; + std::condition_variable _deq_cv; // not empty signal + std::condition_variable _enq_cv; // not empty signal + + unsigned int _cap; + bool _accepting; // flush mechanism is required to abort wait on cv // when need to stop - std::atomic need_to_flush; - std::atomic was_flushed; - std::condition_variable was_flushed_cv; - std::mutex was_flushed_mutex; + std::atomic _need_to_flush; + std::atomic _was_flushed; public: explicit single_consumer_queue(unsigned int cap = QUEUE_MAX_SIZE) - : q(), mutex(), cv(), cap(cap), need_to_flush(false), was_flushed(false), accepting(true) + : _queue(), _mutex(), _deq_cv(), _enq_cv(), _cap(cap), _need_to_flush(false), _was_flushed(false), _accepting(true) {} void enqueue(T&& item) { - std::unique_lock lock(mutex); - if (accepting) + std::unique_lock lock(_mutex); + if (_accepting) { - q.push_back(std::move(item)); - if (q.size() > cap) + _queue.push_back(std::move(item)); + if (_queue.size() > _cap) { - q.pop_front(); + _queue.pop_front(); } } lock.unlock(); - cv.notify_one(); + _deq_cv.notify_one(); } - bool dequeue(T* item ,unsigned int timeout_ms = 5000) + void blocking_enqueue(T&& item) { - std::unique_lock lock(mutex); - accepting = true; - was_flushed = false; - const auto ready = [this]() { return (q.size() > 0) || need_to_flush; }; - if (!ready() && !cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready)) - { - return false; - } + auto pred = [this]()->bool { return _queue.size() <= _cap; }; - if (q.size() <= 0) + std::unique_lock lock(_mutex); + if (_accepting) { - return false; + _enq_cv.wait(lock, pred); + _queue.push_back(std::move(item)); } - *item = std::move(q.front()); - q.pop_front(); - return true; + lock.unlock(); + _deq_cv.notify_one(); } - bool peek(T** item) + + bool dequeue(T* item ,unsigned int timeout_ms = 5000) { - std::unique_lock lock(mutex); + std::unique_lock lock(_mutex); + _accepting = true; + _was_flushed = false; + const auto ready = [this]() { return (_queue.size() > 0) || _need_to_flush; }; + if (!ready() && !_deq_cv.wait_for(lock, std::chrono::milliseconds(timeout_ms), ready)) + { + return false; + } - if (q.size() <= 0) + if (_queue.size() <= 0) { return false; } - *item = &q.front(); + *item = std::move(_queue.front()); + _queue.pop_front(); + _enq_cv.notify_one(); return true; } bool try_dequeue(T* item) { - std::unique_lock lock(mutex); - accepting = true; - if (q.size() > 0) + std::unique_lock lock(_mutex); + _accepting = true; + if (_queue.size() > 0) { - auto val = std::move(q.front()); - q.pop_front(); + auto val = std::move(_queue.front()); + _queue.pop_front(); *item = std::move(val); + _enq_cv.notify_one(); return true; } return false; } + bool peek(T** item) + { + std::unique_lock lock(_mutex); + + if (_queue.size() <= 0) + { + return false; + } + *item = &_queue.front(); + return true; + } + void clear() { - std::unique_lock lock(mutex); + std::unique_lock lock(_mutex); - accepting = false; - need_to_flush = true; + _accepting = false; + _need_to_flush = true; - while (q.size() > 0) + while (_queue.size() > 0) { - auto item = std::move(q.front()); - q.pop_front(); + auto item = std::move(_queue.front()); + _queue.pop_front(); } - cv.notify_all(); + _deq_cv.notify_all(); } void start() { - std::unique_lock lock(mutex); - need_to_flush = false; - accepting = true; + std::unique_lock lock(_mutex); + _need_to_flush = false; + _accepting = true; } size_t size() { - std::unique_lock lock(mutex); - return q.size(); + std::unique_lock lock(_mutex); + return _queue.size(); } }; +template +class single_consumer_frame_queue +{ + single_consumer_queue _queue; + +public: + single_consumer_frame_queue(unsigned int cap = QUEUE_MAX_SIZE) : _queue(cap) {} + + void enqueue(T&& item) + { + if (item.is_blocking()) + _queue.blocking_enqueue(std::move(item)); + else + _queue.enqueue(std::move(item)); + } + + bool dequeue(T* item, unsigned int timeout_ms = 5000) + { + return _queue.dequeue(item, timeout_ms); + } + + bool peek(T** item) + { + return _queue.peek(item); + } + + bool try_dequeue(T* item) + { + return _queue.try_dequeue(item); + } + + void clear() + { + _queue.clear(); + } + + void start() + { + _queue.start(); + } + + void flush() + { + _queue.flush(); + } + + size_t size() + { + return _queue.size(); + } +}; class dispatcher { @@ -181,11 +249,14 @@ class dispatcher } template - void invoke(T item) + void invoke(T item, bool is_blocking = false) { if (!_was_stopped) { - _queue.enqueue(std::move(item)); + if(is_blocking) + _queue.blocking_enqueue(std::move(item)); + else + _queue.enqueue(std::move(item)); } } @@ -248,6 +319,12 @@ class dispatcher *wait_sucess = cv.wait_for(locker, std::chrono::seconds(10), [&]() { return invoked || _was_stopped; }); return *wait_sucess; } + + bool empty() + { + return _queue.size() == 0; + } + private: friend cancellable_timer; single_consumer_queue> _queue; diff --git a/src/core/streaming.h b/src/core/streaming.h index 4d0668d33d..6dadd6a6ae 100644 --- a/src/core/streaming.h +++ b/src/core/streaming.h @@ -111,12 +111,54 @@ namespace librealsense virtual void mark_fixed() = 0; virtual bool is_fixed() const = 0; + virtual void set_blocking(bool state) = 0; + virtual bool is_blocking() const = 0; virtual void keep() = 0; virtual ~frame_interface() = default; }; + struct frame_holder + { + frame_interface* frame; + + frame_interface* operator->() + { + return frame; + } + + operator bool() const { return frame != nullptr; } + + operator frame_interface*() const { return frame; } + + frame_holder(frame_interface* f) + { + frame = f; + } + + ~frame_holder(); + + frame_holder(frame_holder&& other) + : frame(other.frame) + { + other.frame = nullptr; + } + + frame_holder() : frame(nullptr) {} + + + frame_holder& operator=(frame_holder&& other); + + frame_holder clone() const; + + bool is_blocking() const { return frame->is_blocking(); }; + + private: + frame_holder& operator=(const frame_holder& other) = delete; + frame_holder(const frame_holder& other); + }; + using on_frame = std::function; using stream_profiles = std::vector>; @@ -221,7 +263,7 @@ namespace librealsense MAP_EXTENSION(RS2_EXTENSION_DEPTH_STEREO_SENSOR, librealsense::depth_stereo_sensor); - class depth_stereo_sensor_snapshot : public depth_stereo_sensor, public depth_sensor_snapshot + class depth_stereo_sensor_snapshot : public depth_stereo_sensor, public depth_sensor_snapshot { public: depth_stereo_sensor_snapshot(float depth_units, float stereo_bl_mm) : @@ -256,4 +298,4 @@ namespace librealsense private: float m_stereo_baseline_mm; }; -} +} \ No newline at end of file diff --git a/src/media/playback/playback_device.cpp b/src/media/playback/playback_device.cpp index 18fc2edaf0..5ba1144125 100644 --- a/src/media/playback/playback_device.cpp +++ b/src/media/playback/playback_device.cpp @@ -16,8 +16,9 @@ playback_device::playback_device(std::shared_ptr ctx, std::shared_ptr(std::numeric_limits::max()); }) { if (serializer == nullptr) @@ -238,7 +239,15 @@ void playback_device::seek_to_time(std::chrono::nanoseconds time) std::string error_msg = to_string() << "Unexpected sensor index while playing file (Read index = " << frame->stream_id.sensor_index << ")"; LOG_ERROR(error_msg); } - m_sensors.at(frame->stream_id.sensor_index)->handle_frame(std::move(frame->frame), m_real_time); + //push frame to the sensor (see handle_frame definition for more details) + m_sensors.at(frame->stream_id.sensor_index)->handle_frame(std::move(frame->frame), m_real_time, + []() { return device_serializer::nanoseconds(0); }, + []() { return false; }, + [this, time]() + { + std::lock_guard locker(m_last_published_timestamp_mutex); + m_last_published_timestamp = time; + }); } } } @@ -253,8 +262,8 @@ void playback_device::seek_to_time(std::chrono::nanoseconds time) rs2_playback_status playback_device::get_current_status() const { return m_is_started ? - m_is_paused ? RS2_PLAYBACK_STATUS_PAUSED : RS2_PLAYBACK_STATUS_PLAYING - : RS2_PLAYBACK_STATUS_STOPPED; + m_is_paused ? RS2_PLAYBACK_STATUS_PAUSED : RS2_PLAYBACK_STATUS_PLAYING + : RS2_PLAYBACK_STATUS_STOPPED; } uint64_t playback_device::get_duration() const @@ -275,21 +284,21 @@ void playback_device::pause() { LOG_DEBUG("Playback pause invoked"); - if (m_is_paused) - return; - - m_is_paused = true; - - if(m_is_started) - { - //Wait for any remaining sensor callbacks to return - for (auto sensor : m_sensors) - { - sensor.second->flush_pending_frames(); - } - } - LOG_DEBUG("Notifying RS2_PLAYBACK_STATUS_PAUSED"); - playback_status_changed(RS2_PLAYBACK_STATUS_PAUSED); + if (m_is_paused) + return; + + m_is_paused = true; + + if(m_is_started) + { + //Wait for any remaining sensor callbacks to return + for (auto sensor : m_sensors) + { + sensor.second->flush_pending_frames(); + } + } + LOG_DEBUG("Notifying RS2_PLAYBACK_STATUS_PAUSED"); + playback_status_changed(RS2_PLAYBACK_STATUS_PAUSED); }); if ((*m_read_thread)->flush() == false) { @@ -306,7 +315,14 @@ void playback_device::resume() { LOG_DEBUG("Playback resume invoked"); if (m_is_paused == false) - return; + return; + + auto total_duration = m_reader->query_duration(); + if (m_last_published_timestamp >= total_duration) + m_last_published_timestamp = device_serializer::nanoseconds(0); + m_reader->reset(); + m_reader->seek_to_time(m_last_published_timestamp); + while (m_last_published_timestamp != device_serializer::nanoseconds(0) && !m_reader->read_next_data()->is()); m_is_paused = false; catch_up(); @@ -356,6 +372,8 @@ void playback_device::update_time_base(device_serializer::nanoseconds base_times device_serializer::nanoseconds playback_device::calc_sleep_time(device_serializer::nanoseconds timestamp) const { + if (!m_real_time) + return device_serializer::nanoseconds(0); //The time to sleep returned here equals to the difference between the file recording time // and the playback time. auto now = std::chrono::high_resolution_clock::now(); @@ -367,9 +385,9 @@ device_serializer::nanoseconds playback_device::calc_sleep_time(device_serialize auto time_diff = timestamp - m_base_timestamp; auto recorded_time = std::chrono::duration_cast(time_diff / m_sample_rate.load()); - LOG_DEBUG("Time Now : " << now.time_since_epoch().count() << " , Time When Started: " << m_base_sys_time.time_since_epoch().count() << " , Diff: " << play_time.count() << " == " << (play_time.count()/1000)/1000 << "ms"); - LOG_DEBUG("Original Recording Delta: " << time_diff.count() << " == " << (time_diff.count() / 1000) / 1000 << "ms"); - LOG_DEBUG("Frame Time: " << timestamp.count() << " , First Frame: " << m_base_timestamp.count() << " , Diff: " << recorded_time.count() << " == " << (recorded_time.count() / 1000) / 1000 << "ms"); + LOG_DEBUG("Time Now : " << now.time_since_epoch().count() << " , Time When Started: " << m_base_sys_time.time_since_epoch().count() << " , Diff: " << play_time.count() << " == " << (play_time.count() * 1e-6) << "ms"); + LOG_DEBUG("Original Recording Delta: " << time_diff.count() << " == " << (time_diff.count() * 1e-6) << "ms"); + LOG_DEBUG("Frame Time: " << timestamp.count() << " , First Frame: " << m_base_timestamp.count() << " , Diff: " << recorded_time.count() << " == " << (recorded_time.count() * 1e-6) << "ms"); if(recorded_time < play_time) { @@ -377,7 +395,7 @@ device_serializer::nanoseconds playback_device::calc_sleep_time(device_serialize return device_serializer::nanoseconds(0); } auto sleep_time = (recorded_time - play_time); - LOG_DEBUG("Sleep Time: " << sleep_time.count() << " == " << (sleep_time.count() / 1000) / 1000 << " ms"); + LOG_DEBUG("Sleep Time: " << sleep_time.count() << " == " << (sleep_time.count() * 1e-6) << " ms"); return sleep_time; } @@ -393,7 +411,7 @@ void playback_device::start() LOG_DEBUG("playback start called"); if (m_is_started) - return ; //nothing to do + return; //nothing to do m_is_started = true; catch_up(); @@ -457,6 +475,9 @@ void playback_device::do_loop(T action) //On failure, exit thread if(action_succeeded == false) { + for (auto s : m_active_sensors) + s.second->flush_pending_frames(); + //Go over the sensors and stop them size_t active_sensors_count = m_active_sensors.size(); for (size_t i = 0; isecond->stop(false); } + + m_last_published_timestamp = device_serializer::nanoseconds(0); + //After all sensors were stopped, stop_internal() is called and flags m_is_started as false assert(m_is_started == false); } @@ -479,6 +503,16 @@ void playback_device::do_loop(T action) }); } +bool playback_device::prefetch_done() +{ + for (auto s : m_active_sensors) + { + if (!s.second->streams_contains_one_frame_or_more()) + return false; + } + return true; +} + void playback_device::try_looping() { //try_looping is called from start() or resume() @@ -519,17 +553,22 @@ void playback_device::try_looping() } //Calculate the duration for the reader to sleep (i.e wait for next frame) - auto sleep_time = calc_sleep_time(timestamp); - if (sleep_time.count() > 0) + if (m_real_time && prefetch_done()) { - if (m_sample_rate > 0) + auto sleep_time = calc_sleep_time(timestamp); + if (sleep_time.count() > 0) { - LOG_DEBUG("Sleeping for: " << (sleep_time.count() / 1000) / 1000); - std::this_thread::sleep_for(sleep_time); + if (m_sample_rate > 0) + { + LOG_DEBUG("Sleeping for: " << (sleep_time.count() * 1e-6)); + std::this_thread::sleep_for(sleep_time); + } } } + if (auto frame = data->as()) { + frame->frame.frame->set_blocking(!m_real_time); if (frame->stream_id.device_index != get_device_index() || frame->stream_id.sensor_index >= m_sensors.size()) { std::string error_msg = to_string() << "Unexpected sensor index while playing file (Read index = " << frame->stream_id.sensor_index << ")"; @@ -541,10 +580,17 @@ void playback_device::try_looping() if (data->is()) { LOG_WARNING("Bad frame from reader, ignoring"); - return true; + return true; } - //Dispatch frame to the relevant sensor - m_sensors.at(frame->stream_id.sensor_index)->handle_frame(std::move(frame->frame), m_real_time); + //Dispatch frame to the relevant sensor (see handle_frame definition for more details) + m_active_sensors.at(frame->stream_id.sensor_index)->handle_frame(std::move(frame->frame), m_real_time, + [this, timestamp]() { return calc_sleep_time(timestamp); }, + [this]() { return m_is_paused == true; }, + [this, timestamp]() + { + std::lock_guard locker(m_last_published_timestamp_mutex); + m_last_published_timestamp = timestamp; + }); return true; } diff --git a/src/media/playback/playback_device.h b/src/media/playback/playback_device.h index 29c96fc380..71a8a4211e 100644 --- a/src/media/playback/playback_device.h +++ b/src/media/playback/playback_device.h @@ -15,8 +15,8 @@ namespace librealsense { class playback_device : public device_interface, - public extendable_interface, - public info_container + public extendable_interface, + public info_container { public: playback_device(std::shared_ptr context, std::shared_ptr serializer); @@ -49,7 +49,7 @@ namespace librealsense bool is_valid() const override; std::vector get_profiles_tags() const override { return std::vector(); };//no hard-coded default streams for playback - void tag_profiles(stream_profiles profiles) const override + void tag_profiles(stream_profiles profiles) const override { for(auto profile : profiles) profile->tag_profile(profile_tag::PROFILE_TAG_DEFAULT | profile_tag::PROFILE_TAG_SUPERSET); @@ -69,6 +69,7 @@ namespace librealsense void register_device_info(const device_serializer::device_snapshot& device_description); void register_extrinsics(const device_serializer::device_snapshot& device_description); void update_extensions(const device_serializer::device_snapshot& device_description); + bool prefetch_done(); private: lazy> m_read_thread; @@ -86,7 +87,8 @@ namespace librealsense std::shared_ptr m_context; std::vector>> m_extrinsics_fetchers; std::map> m_extrinsics_map; - + device_serializer::nanoseconds m_last_published_timestamp; + std::mutex m_last_published_timestamp_mutex; }; MAP_EXTENSION(RS2_EXTENSION_PLAYBACK, playback_device); diff --git a/src/media/playback/playback_sensor.cpp b/src/media/playback/playback_sensor.cpp index 37dd0e10d2..5be3cd0260 100644 --- a/src/media/playback/playback_sensor.cpp +++ b/src/media/playback/playback_sensor.cpp @@ -29,7 +29,8 @@ playback_sensor::playback_sensor(const device_interface& parent_device, const de m_is_started(false), m_sensor_description(sensor_description), m_sensor_id(sensor_description.get_sensor_index()), - m_parent_device(parent_device) + m_parent_device(parent_device), + _default_queue_size(1) { register_sensor_streams(m_sensor_description.get_stream_profiles()); register_sensor_infos(m_sensor_description); @@ -40,6 +41,16 @@ playback_sensor::~playback_sensor() { } +bool playback_sensor::streams_contains_one_frame_or_more() +{ + for (auto d : m_dispatchers) + { + if (d.second->empty()) + return false; + } + return true; +} + stream_profiles playback_sensor::get_stream_profiles(int tag) const { if (tag == profile_tag::PROFILE_TAG_ANY) @@ -74,7 +85,7 @@ void playback_sensor::open(const stream_profiles& requests) //For each stream, create a dedicated dispatching thread for (auto&& profile : requests) { - m_dispatchers.emplace(std::make_pair(profile->get_unique_id(), std::make_shared(10))); //TODO: what size the queue should be? + m_dispatchers.emplace(std::make_pair(profile->get_unique_id(), std::make_shared(_default_queue_size))); m_dispatchers[profile->get_unique_id()]->start(); device_serializer::stream_identifier f{ get_device_index(), m_sensor_id, profile->get_stream_type(), static_cast(profile->get_stream_index()) }; opened_streams.push_back(f); @@ -114,7 +125,6 @@ notifications_callback_ptr playback_sensor::get_notifications_callback() const return _notifications_processor.get_callback(); } - void playback_sensor::start(frame_callback_ptr callback) { LOG_DEBUG("Start sensor " << m_sensor_id); @@ -125,10 +135,10 @@ void playback_sensor::start(frame_callback_ptr callback) m_user_callback = callback; } } + void playback_sensor::stop(bool invoke_required) { LOG_DEBUG("Stop sensor " << m_sensor_id); - if (m_is_started == true) { stopped(m_sensor_id, invoke_required); @@ -160,38 +170,11 @@ const device_interface& playback_sensor::get_device() return m_parent_device; } -void playback_sensor::handle_frame(frame_holder frame, bool is_real_time) -{ - if(frame == nullptr) - { - throw invalid_value_exception("null frame passed to handle_frame"); - } - if(m_is_started) - { - frame->get_owner()->set_sensor(shared_from_this()); - auto type = frame->get_stream()->get_stream_type(); - auto index = static_cast(frame->get_stream()->get_stream_index()); - frame->set_stream(m_streams[std::make_pair(type, index)]); - frame->set_sensor(shared_from_this()); - auto stream_id = frame.frame->get_stream()->get_unique_id(); - //TODO: Ziv, remove usage of shared_ptr when frame_holder is cpoyable - auto pf = std::make_shared(std::move(frame)); - m_dispatchers.at(stream_id)->invoke([this, pf](dispatcher::cancellable_timer t) - { - frame_interface* pframe = nullptr; - std::swap((*pf).frame, pframe); - m_user_callback->on_frame((rs2_frame*)pframe); - }); - if(is_real_time) - { - m_dispatchers.at(stream_id)->flush(); - } - } -} void playback_sensor::update_option(rs2_option id, std::shared_ptr