Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix syncer bug causing wrong frame order #8378

Merged
merged 9 commits into from
Feb 23, 2021
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/core/streaming.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ namespace librealsense
{
frame_interface* frame;

frame_interface* operator->()
frame_interface* operator->() const
{
return frame;
}
Expand Down
12 changes: 2 additions & 10 deletions src/l500/l500-depth.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -184,25 +184,17 @@ namespace librealsense

std::vector<stream_interface*> streams = { _depth_stream.get(), _ir_stream.get(), _confidence_stream.get() };

// TODO
for (auto& s : streams)
{
depth_matchers.push_back(std::make_shared<identity_matcher>(s->get_unique_id(), s->get_stream_type()));
}
std::vector<std::shared_ptr<matcher>> matchers;
if (!frame.frame->supports_frame_metadata(RS2_FRAME_METADATA_FRAME_COUNTER))
{
matchers.push_back(std::make_shared<timestamp_composite_matcher>(depth_matchers));
}
else
{
matchers.push_back(std::make_shared<timestamp_composite_matcher>(depth_matchers));
}
matchers.push_back(std::make_shared<timestamp_composite_matcher>(depth_matchers));

return std::make_shared<timestamp_composite_matcher>(matchers);
}




// If the user did not ask for IR, The open function will add it anyway.
Expand Down
4 changes: 2 additions & 2 deletions src/l500/l500-device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ namespace librealsense
[=]() {
auto z16rot = std::make_shared<rotation_transform>(RS2_FORMAT_Z16, RS2_STREAM_DEPTH, RS2_EXTENSION_DEPTH_FRAME);
auto y8rot = std::make_shared<rotation_transform>(RS2_FORMAT_Y8, RS2_STREAM_INFRARED, RS2_EXTENSION_VIDEO_FRAME);
auto sync = std::make_shared<syncer_process_unit>(); // is_zo_enabled_opt );
auto sync = std::make_shared<syncer_process_unit>(nullptr, false); // disable logging on this internal syncer

auto cpb = std::make_shared<composite_processing_block>();
cpb->add(z16rot);
Expand All @@ -310,7 +310,7 @@ namespace librealsense
auto z16rot = std::make_shared<rotation_transform>(RS2_FORMAT_Z16, RS2_STREAM_DEPTH, RS2_EXTENSION_DEPTH_FRAME);
auto y8rot = std::make_shared<rotation_transform>(RS2_FORMAT_Y8, RS2_STREAM_INFRARED, RS2_EXTENSION_VIDEO_FRAME);
auto conf = std::make_shared<confidence_rotation_transform>();
auto sync = std::make_shared<syncer_process_unit>(); // is_zo_enabled_opt );
auto sync = std::make_shared<syncer_process_unit>(nullptr, false); // disable logging on this internal syncer

auto cpb = std::make_shared<composite_processing_block>();
cpb->add(z16rot);
Expand Down
132 changes: 90 additions & 42 deletions src/proc/syncer-processing-block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -10,63 +10,111 @@

namespace librealsense
{
syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts )
: processing_block("syncer"), _matcher((new timestamp_composite_matcher({})))
, _enable_opts( enable_opts.begin(), enable_opts.end() )
{
_matcher->set_callback([this](frame_holder f, syncronization_environment env)
syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts,
bool log )
: processing_block( "syncer" )
, _enable_opts( enable_opts.begin(), enable_opts.end() )
{
// This callback gets called by the previous processing block when it is done with a frame. We
// call the matchers the frame and eventually call the next callback in the list using frame_ready().
auto f = [this, log]( frame_holder frame, synthetic_source_interface * source ) {
// if the syncer is disabled passthrough the frame
bool enabled = false;
size_t n_opts = 0;
for( auto & wopt : _enable_opts )
{
std::stringstream ss;
ss << "SYNCED: ";
auto composite = dynamic_cast<composite_frame*>(f.frame);
for (int i = 0; i < composite->get_embedded_frames_count(); i++)
auto opt = wopt.lock();
if( opt )
{
auto matched = composite->get_frame(i);
ss << matched->get_stream()->get_stream_type() << " " << matched->get_frame_number() << ", "<<std::fixed<< matched->get_frame_timestamp()<<" ";
++n_opts;
if( opt->is_true() )
{
enabled = true;
break;
}
}
}
if( n_opts && ! enabled )
{
get_source().frame_ready( std::move( frame ) );
return;
}

LOG_DEBUG(ss.str());
env.matches.enqueue(std::move(f));
});

auto f = [&](frame_holder frame, synthetic_source_interface* source)
{
// if the syncer is disabled passthrough the frame
bool enabled = false;
size_t n_opts = 0;
for( auto& wopt : _enable_opts )
std::lock_guard< std::mutex > lock( _mutex );

if( !_matcher )
{
auto opt = wopt.lock();
if( opt )
{
++n_opts;
if( opt->is_true() )
{
enabled = true;
break;
}
}
if(!create_matcher( frame ))
get_source().frame_ready(std::move(frame));
}
if( n_opts && ! enabled )
else
{
get_source().frame_ready( std::move( frame ) );
return;
auto env = syncronization_environment{ source, _matches, log };
_matcher->dispatch(std::move(frame), env);
}
}

single_consumer_frame_queue<frame_holder> matches;
frame_holder f;
{
// Another thread has the lock, meaning will get into the following loop and dequeue all
// the frames. So there's nothing for us to do...
std::unique_lock< std::mutex > lock(_callback_mutex, std::try_to_lock);
if (!lock.owns_lock())
return;

while( _matches.try_dequeue( &f ) )
{
std::lock_guard<std::mutex> lock(_mutex);
_matcher->dispatch(std::move(frame), { source, matches });
get_source().frame_ready( std::move( f ) );
}
}
};

set_processing_callback( std::shared_ptr< rs2_frame_processor_callback >(
new internal_frame_processor_callback< decltype( f ) >( f ) ) );
}

frame_holder f;
while (matches.try_dequeue(&f))
get_source().frame_ready(std::move(f));
bool syncer_process_unit::create_matcher( const frame_holder & frame )
{
try
{
auto sensor = frame.frame->get_sensor().get();
if (!sensor)
{
LOG_DEBUG("Sensor is not exist any more cannot create matcher the frame will passthrough ");
return false;
}

};
const device_interface * dev = nullptr;
dev = sensor->get_device().shared_from_this().get();
if (dev)
{
_matcher = dev->create_matcher(frame);

_matcher->set_callback([this](frame_holder f, const syncronization_environment& env) {
if (env.log)
{
LOG_DEBUG("SYNCED: " << frame_to_string(f));
}

// We get here from within a dispatch() call, already protected by a mutex -- so only one thread can enqueue!
env.matches.enqueue(std::move(f));
});
}
else
{
LOG_DEBUG("Device is not exist any more cannot create matcher, the frame will passthrough ");
return false;
}

set_processing_callback(std::shared_ptr<rs2_frame_processor_callback>(
new internal_frame_processor_callback<decltype(f)>(f)));
}
catch( const std::bad_weak_ptr & )
{
LOG_DEBUG("Device was destryed while trying get shared ptr of it, couldn't create matcher, the frame will passthrough ");
return false;
}

return true;

}
} // namespace librealsense
13 changes: 9 additions & 4 deletions src/proc/syncer-processing-block.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,10 @@ namespace librealsense
class syncer_process_unit : public processing_block
{
public:
syncer_process_unit( std::initializer_list< bool_option::ptr > enable_opts );
syncer_process_unit(std::initializer_list< bool_option::ptr > enable_opts, bool log = true);

syncer_process_unit( bool_option::ptr is_enabled_opt = nullptr )
: syncer_process_unit( { is_enabled_opt } ) {}
syncer_process_unit( bool_option::ptr is_enabled_opt = nullptr, bool log = true)
: syncer_process_unit( { is_enabled_opt }, log) {}

void add_enabling_option( bool_option::ptr is_enabled_opt )
{
Expand All @@ -34,7 +34,12 @@ namespace librealsense
_matcher.reset();
}
private:
std::unique_ptr<timestamp_composite_matcher> _matcher;
bool create_matcher(const frame_holder& frame);

std::shared_ptr<matcher> _matcher;
std::vector< std::weak_ptr<bool_option> > _enable_opts;

single_consumer_frame_queue<frame_holder> _matches;
std::mutex _callback_mutex;
};
}
Loading