Skip to content

Commit

Permalink
CR fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
aangerma committed Feb 18, 2021
1 parent a49d873 commit 6d05f45
Show file tree
Hide file tree
Showing 6 changed files with 108 additions and 146 deletions.
2 changes: 1 addition & 1 deletion src/core/streaming.h
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ namespace librealsense
{
frame_interface* frame;

frame_interface* operator->()
frame_interface* operator->() const
{
return frame;
}
Expand Down
4 changes: 2 additions & 2 deletions src/l500/l500-device.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,7 +283,7 @@ namespace librealsense
[=]() {
auto z16rot = std::make_shared<rotation_transform>(RS2_FORMAT_Z16, RS2_STREAM_DEPTH, RS2_EXTENSION_DEPTH_FRAME);
auto y8rot = std::make_shared<rotation_transform>(RS2_FORMAT_Y8, RS2_STREAM_INFRARED, RS2_EXTENSION_VIDEO_FRAME);
auto sync = std::make_shared<syncer_process_unit>(nullptr, false); // is_zo_enabled_opt );
auto sync = std::make_shared<syncer_process_unit>(nullptr, false); // disable logging on this internal syncer

auto cpb = std::make_shared<composite_processing_block>();
cpb->add(z16rot);
Expand All @@ -310,7 +310,7 @@ namespace librealsense
auto z16rot = std::make_shared<rotation_transform>(RS2_FORMAT_Z16, RS2_STREAM_DEPTH, RS2_EXTENSION_DEPTH_FRAME);
auto y8rot = std::make_shared<rotation_transform>(RS2_FORMAT_Y8, RS2_STREAM_INFRARED, RS2_EXTENSION_VIDEO_FRAME);
auto conf = std::make_shared<confidence_rotation_transform>();
auto sync = std::make_shared<syncer_process_unit>(nullptr, false); // is_zo_enabled_opt );
auto sync = std::make_shared<syncer_process_unit>(nullptr, false); // disable logging on this internal syncer

auto cpb = std::make_shared<composite_processing_block>();
cpb->add(z16rot);
Expand Down
80 changes: 46 additions & 34 deletions src/proc/syncer-processing-block.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,18 @@ syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::pt
{
std::lock_guard< std::mutex > lock( _mutex );

if( _matcher == nullptr )
if( !_matcher )
{
create_matcher( frame, log );
if(!create_matcher( frame, log ))
get_source().frame_ready(std::move(frame));
}

_matcher->dispatch( std::move( frame ), { source, matches, log } );
else
_matcher->dispatch( std::move( frame ), { source, matches, log } );
}

frame_holder f;
{
std::lock_guard< std::mutex > lock(callback_mutex);
std::lock_guard< std::mutex > lock(_callback_mutex);

while( matches.try_dequeue( &f ) )
{
Expand All @@ -64,44 +65,55 @@ syncer_process_unit::syncer_process_unit( std::initializer_list< bool_option::pt
new internal_frame_processor_callback< decltype( f ) >( f ) ) );
}

void syncer_process_unit::create_matcher( const frame_holder & frame, bool log )
bool syncer_process_unit::create_matcher( const frame_holder & frame, bool log )
{
auto sensor = frame.frame->get_sensor().get();
const device_interface * dev = nullptr;
try
{
auto sensor = frame.frame->get_sensor().get();
if (!sensor)
{
LOG_DEBUG("Sensor is not exist any more cannot create matcher the frame will passthrough ");
return false;
}

const device_interface * dev = nullptr;
dev = sensor->get_device().shared_from_this().get();
if (dev)
{
_matcher = dev->create_matcher(frame);

_matcher->set_callback([this](frame_holder f, const syncronization_environment& env) {
if (env.log)
{
std::stringstream ss;
ss << "SYNCED: ";
auto composite = dynamic_cast<composite_frame *>(f.frame);
for (int i = 0; i < composite->get_embedded_frames_count(); i++)
{
auto matched = composite->get_frame(i);
ss << matched->get_stream()->get_stream_type() << " " << matched->get_frame_number()
<< ", " << std::fixed << matched->get_frame_timestamp() << " ";
}

LOG_DEBUG(ss.str());
}

env.matches.enqueue(std::move(f));
});
}
else
{
LOG_DEBUG("Device is not exist any more cannot create matcher, the frame will passthrough ");
return false;
}

}
catch( const std::bad_weak_ptr & )
{
LOG_WARNING( "Device destroyed" );
}
if( dev )
{
_matcher = dev->create_matcher( frame );
}
else
{
_matcher = std::shared_ptr< matcher >( new timestamp_composite_matcher( {} ) );
}

_matcher->set_callback( [this, log]( frame_holder f, syncronization_environment env ) {
if( log )
{
std::stringstream ss;
ss << "SYNCED: ";
auto composite = dynamic_cast< composite_frame * >( f.frame );
for( int i = 0; i < composite->get_embedded_frames_count(); i++ )
{
auto matched = composite->get_frame( i );
ss << matched->get_stream()->get_stream_type() << " " << matched->get_frame_number()
<< ", " << std::fixed << matched->get_frame_timestamp() << " ";
}

LOG_DEBUG( ss.str() );
}

env.matches.enqueue( std::move( f ) );
} );
return true;

}
} // namespace librealsense
4 changes: 2 additions & 2 deletions src/proc/syncer-processing-block.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,13 +34,13 @@ namespace librealsense
_matcher.reset();
}
private:
void create_matcher(const frame_holder& frame, bool log = true);
bool create_matcher(const frame_holder& frame, bool log = true);

std::shared_ptr<matcher> _matcher;
std::vector< std::weak_ptr<bool_option> > _enable_opts;

single_consumer_frame_queue<frame_holder> matches;
std::mutex callback_mutex;
std::mutex _callback_mutex;
bool _log;
};
}
138 changes: 44 additions & 94 deletions src/sync.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ namespace librealsense
{
const int MAX_GAP = 1000;

std::string frame_to_string(frame_holder& f)
std::string frame_to_string(const frame_holder& f)
{
std::ostringstream s;
auto composite = dynamic_cast<composite_frame*>(f.frame);
Expand All @@ -32,7 +32,7 @@ namespace librealsense
return s.str();
}

void log_if_enable(std::string str, syncronization_environment env)
static void log_if_enable(std::string str, const syncronization_environment& env)
{
if (env.log)
{
Expand All @@ -43,7 +43,7 @@ namespace librealsense
matcher::matcher(std::vector<stream_id> streams_id)
: _streams_id(streams_id){}

void matcher::sync(frame_holder f, syncronization_environment env)
void matcher::sync(frame_holder f, const syncronization_environment& env)
{
auto cb = begin_callback();
_callback(std::move(f), env);
Expand Down Expand Up @@ -104,7 +104,7 @@ namespace librealsense
_name = "I " + std::string(rs2_stream_to_string(stream_type));
}

void identity_matcher::dispatch(frame_holder f, syncronization_environment env)
void identity_matcher::dispatch(frame_holder f, const syncronization_environment& env)
{
std::stringstream s;
s <<_name<<"--> "<< f->get_stream()->get_stream_type() << " " << f->get_frame_number() << ", "<<std::fixed<< f->get_frame_timestamp()<<"\n";
Expand Down Expand Up @@ -132,7 +132,7 @@ namespace librealsense
{
for (auto&& stream : matcher->get_streams())
{
matcher->set_callback([&](frame_holder f, syncronization_environment env)
matcher->set_callback([&](frame_holder f, const syncronization_environment& env)
{
sync(std::move(f), env);
});
Expand All @@ -148,7 +148,7 @@ namespace librealsense
_name = create_composite_name(matchers, name);
}

void composite_matcher::dispatch(frame_holder f, syncronization_environment env)
void composite_matcher::dispatch(frame_holder f, const syncronization_environment& env)
{

std::stringstream s;
Expand All @@ -157,8 +157,18 @@ namespace librealsense

clean_inactive_streams(f);
auto matcher = find_matcher(f);
update_last_arrived(f, matcher.get());
matcher->dispatch(std::move(f), env);

if (matcher)
{
update_last_arrived(f, matcher.get());
matcher->dispatch(std::move(f), env);
}
else
{
LOG_ERROR("didn't find any matcher for " << frame_to_string(f) << " will not be syncronyzed");
_callback(std::move(f), env);
}

}

std::shared_ptr<matcher> composite_matcher::find_matcher(const frame_holder& frame)
Expand All @@ -167,92 +177,15 @@ namespace librealsense
auto stream_id = frame.frame->get_stream()->get_unique_id();
auto stream_type = frame.frame->get_stream()->get_stream_type();

auto sensor = frame.frame->get_sensor().get(); //TODO: Potential deadlock if get_sensor() gets a hold of the last reference of that sensor

auto dev_exist = false;
matcher = _matchers[stream_id];

if (sensor)
{

const device_interface* dev = nullptr;
try
{
dev = sensor->get_device().shared_from_this().get();
}
catch (const std::bad_weak_ptr&)
{
LOG_WARNING("Device destroyed");
}
if (dev)
{
dev_exist = true;
matcher = _matchers[stream_id];
if (!matcher)
{
std::ostringstream ss;
for( auto const & it : _matchers )
ss << ' ' << it.first;
LOG_DEBUG( "stream id " << stream_id << " was not found; trying to create, existing streams=" << ss.str() );
matcher = dev->create_matcher(frame);

matcher->set_callback(
[&](frame_holder f, syncronization_environment env)
{
sync(std::move(f), env);
});

for (auto stream : matcher->get_streams())
{
if (_matchers[stream])
{
_frames_queue.erase(_matchers[stream].get());
}
_matchers[stream] = matcher;
_streams_id.push_back(stream);
}
for (auto stream : matcher->get_streams_types())
{
_streams_type.push_back(stream);
}

if (std::find(_streams_type.begin(), _streams_type.end(), stream_type) == _streams_type.end())
{
LOG_ERROR("Stream matcher not found! stream=" << rs2_stream_to_string(stream_type));
}
}
else if(!matcher->get_active())
{
matcher->set_active(true);
_frames_queue[matcher.get()].start();
}
}
}
else
if (!matcher->get_active())
{
LOG_DEBUG( "sensor does not exist" );
matcher->set_active(true);
_frames_queue[matcher.get()].start();
}

if(!dev_exist)
{
matcher = _matchers[stream_id];
// We don't know what device this frame came from, so just store it under device NULL with ID matcher
if (!matcher)
{
if (_matchers[stream_id])
{
_frames_queue.erase(_matchers[stream_id].get());
}
_matchers[stream_id] = std::make_shared<identity_matcher>(stream_id, stream_type);
_streams_id.push_back(stream_id);
_streams_type.push_back(stream_type);
matcher = _matchers[stream_id];

matcher->set_callback([&](frame_holder f, syncronization_environment env)
{
sync(std::move(f), env);
});
}
}
return matcher;
}

Expand All @@ -269,14 +202,21 @@ namespace librealsense
return str;
}

void composite_matcher::sync(frame_holder f, syncronization_environment env)
void composite_matcher::sync(frame_holder f, const syncronization_environment& env)
{
std::ostringstream s;
s <<"SYNC "<<_name<<"--> "<< frame_to_string(f)<<"\n";
log_if_enable(s.str(), env);

update_next_expected(f);
auto matcher = find_matcher(f);
if (!matcher)
{
LOG_ERROR("didn't find any matcher for " << frame_to_string(f) << " will not be syncronyzed");
_callback(std::move(f), env);
return;
}

_frames_queue[matcher.get()].enqueue(std::move(f));

std::vector<frame_holder*> frames_arrived;
Expand Down Expand Up @@ -451,7 +391,7 @@ namespace librealsense
}
}

bool frame_number_composite_matcher::skip_missing_stream(std::vector<matcher*> synced, matcher* missing, syncronization_environment env)
bool frame_number_composite_matcher::skip_missing_stream(std::vector<matcher*> synced, matcher* missing, const syncronization_environment& env)
{
frame_holder* synced_frame;

Expand All @@ -472,6 +412,12 @@ namespace librealsense
void frame_number_composite_matcher::update_next_expected(const frame_holder& f)
{
auto matcher = find_matcher(f);
if (!matcher)
{
LOG_ERROR("didn't find any matcher for " << frame_to_string(f) << " will not be syncronyzed");
return;
}

_next_expected[matcher.get()] = f.frame->get_frame_number()+1.;
}

Expand Down Expand Up @@ -542,7 +488,11 @@ namespace librealsense
auto gap = 1000.f / (float)fps;

auto matcher = find_matcher(f);

if (!matcher)
{
LOG_ERROR("didn't find any matcher for " << frame_to_string(f) );
return;
}
_next_expected[matcher.get()] = f.frame->get_frame_timestamp() + gap;
_next_expected_domain[matcher.get()] = f.frame->get_frame_timestamp_domain();
LOG_DEBUG(_name << frame_to_string(const_cast<frame_holder&>(f))<<"fps " <<fps<<" gap " <<gap<<" next_expected: "<< _next_expected[matcher.get()]);
Expand Down Expand Up @@ -581,7 +531,7 @@ namespace librealsense
}
}

bool timestamp_composite_matcher::skip_missing_stream(std::vector<matcher*> synced, matcher* missing, syncronization_environment env)
bool timestamp_composite_matcher::skip_missing_stream(std::vector<matcher*> synced, matcher* missing, const syncronization_environment& env)
{
if(!missing->get_active())
return true;
Expand Down Expand Up @@ -620,7 +570,7 @@ namespace librealsense
composite_identity_matcher::composite_identity_matcher(std::vector<std::shared_ptr<matcher>> matchers) :composite_matcher(matchers, "CI: ")
{}

void composite_identity_matcher::sync(frame_holder f, syncronization_environment env)
void composite_identity_matcher::sync(frame_holder f, const syncronization_environment& env)
{
LOG_DEBUG("by_pass_composite_matcher: " << _name << " " << frame_to_string(f));
_callback(std::move(f), env);
Expand Down
Loading

0 comments on commit 6d05f45

Please sign in to comment.