Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor System functionality into SystemManager #1340

Merged
merged 6 commits into from
Feb 25, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ set (sources
ServerPrivate.cc
SimulationRunner.cc
SystemLoader.cc
SystemManager.cc
TestFixture.cc
Util.cc
World.cc
Expand All @@ -86,6 +87,7 @@ set (gtest_sources
Server_TEST.cc
SimulationRunner_TEST.cc
SystemLoader_TEST.cc
SystemManager_TEST.cc
System_TEST.cc
TestFixture_TEST.cc
Util_TEST.cc
Expand Down
149 changes: 46 additions & 103 deletions src/SimulationRunner.cc
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,6 @@ SimulationRunner::SimulationRunner(const sdf::World *_world,
// Keep world name
this->worldName = _world->Name();

// Keep system loader so plugins can be loaded at runtime
this->systemLoader = _systemLoader;

// Get the physics profile
// TODO(luca): remove duplicated logic in SdfEntityCreator and LevelManager
auto physics = _world->PhysicsByIndex(0);
Expand Down Expand Up @@ -115,6 +112,10 @@ SimulationRunner::SimulationRunner(const sdf::World *_world,
static_cast<int>(this->stepSize.count() / this->desiredRtf));
}

// Create the system manager
this->systemMgr = std::make_unique<SystemManager>(_systemLoader,
&this->entityCompMgr, &this->eventMgr);

this->pauseConn = this->eventMgr.Connect<events::Pause>(
std::bind(&SimulationRunner::SetPaused, this, std::placeholders::_1));

Expand Down Expand Up @@ -173,7 +174,7 @@ SimulationRunner::SimulationRunner(const sdf::World *_world,

// If we have reached this point and no systems have been loaded, then load
// a default set of systems.
if (this->systems.empty() && this->pendingSystems.empty())
if (this->systemMgr->TotalCount() == 0)
{
ignmsg << "No systems loaded from SDF, loading defaults" << std::endl;
bool isPlayback = !this->serverConfig.LogPlaybackPath().empty();
Expand Down Expand Up @@ -455,7 +456,9 @@ void SimulationRunner::AddSystem(const SystemPluginPtr &_system,
std::optional<Entity> _entity,
std::optional<std::shared_ptr<const sdf::Element>> _sdf)
{
this->AddSystemImpl(SystemInternal(_system), _entity, _sdf);
auto entity = _entity.value_or(worldEntity(this->entityCompMgr));
auto sdf = _sdf.value_or(this->sdfWorld->Element());
this->systemMgr->AddSystem(_system, entity, sdf);
}

//////////////////////////////////////////////////
Expand All @@ -464,104 +467,57 @@ void SimulationRunner::AddSystem(
std::optional<Entity> _entity,
std::optional<std::shared_ptr<const sdf::Element>> _sdf)
{
this->AddSystemImpl(SystemInternal(_system), _entity, _sdf);
}

//////////////////////////////////////////////////
void SimulationRunner::AddSystemImpl(
SystemInternal _system,
std::optional<Entity> _entity,
std::optional<std::shared_ptr<const sdf::Element>> _sdf)
{
// Call configure
if (_system.configure)
{
// Default to world entity and SDF
auto entity = _entity.has_value() ? _entity.value()
: worldEntity(this->entityCompMgr);
auto sdf = _sdf.has_value() ? _sdf.value() : this->sdfWorld->Element();

_system.configure->Configure(
entity, sdf,
this->entityCompMgr,
this->eventMgr);
}

// Update callbacks will be handled later, add to queue
std::lock_guard<std::mutex> lock(this->pendingSystemsMutex);
this->pendingSystems.push_back(_system);
auto entity = _entity.value_or(worldEntity(this->entityCompMgr));
auto sdf = _sdf.value_or(this->sdfWorld->Element());
this->systemMgr->AddSystem(_system, entity, sdf);
mjcarroll marked this conversation as resolved.
Show resolved Hide resolved
}

/////////////////////////////////////////////////
void SimulationRunner::AddSystemToRunner(SystemInternal _system)
void SimulationRunner::ProcessSystemQueue()
{
this->systems.push_back(_system);
auto pending = this->systemMgr->PendingCount();

if (_system.preupdate)
this->systemsPreupdate.push_back(_system.preupdate);
if (0 == pending)
return;

if (_system.update)
this->systemsUpdate.push_back(_system.update);
// If additional systems are to be added, stop the existing threads.
this->StopWorkerThreads();

if (_system.postupdate)
this->systemsPostupdate.push_back(_system.postupdate);
}
this->systemMgr->ActivatePendingSystems();

/////////////////////////////////////////////////
void SimulationRunner::ProcessSystemQueue()
{
std::lock_guard<std::mutex> lock(this->pendingSystemsMutex);
auto pending = this->pendingSystems.size();

if (pending > 0)
{
// If additional systems are to be added, stop the existing threads.
this->StopWorkerThreads();
}
auto threadCount = this->systemMgr->SystemsPostUpdate().size() + 1u;

for (const auto &system : this->pendingSystems)
{
this->AddSystemToRunner(system);
}
this->pendingSystems.clear();
igndbg << "Creating PostUpdate worker threads: "
<< threadCount << std::endl;

// If additional systems were added, recreate the worker threads.
if (pending > 0)
{
igndbg << "Creating PostUpdate worker threads: "
<< this->systemsPostupdate.size() + 1 << std::endl;
this->postUpdateStartBarrier = std::make_unique<Barrier>(threadCount);
this->postUpdateStopBarrier = std::make_unique<Barrier>(threadCount);

this->postUpdateStartBarrier =
std::make_unique<Barrier>(this->systemsPostupdate.size() + 1u);
this->postUpdateStopBarrier =
std::make_unique<Barrier>(this->systemsPostupdate.size() + 1u);
this->postUpdateThreadsRunning = true;
int id = 0;

this->postUpdateThreadsRunning = true;
int id = 0;
for (auto &system : this->systemMgr->SystemsPostUpdate())
{
igndbg << "Creating postupdate worker thread (" << id << ")" << std::endl;

for (auto &system : this->systemsPostupdate)
this->postUpdateThreads.push_back(std::thread([&, id]()
{
igndbg << "Creating postupdate worker thread (" << id << ")" << std::endl;

this->postUpdateThreads.push_back(std::thread([&, id]()
std::stringstream ss;
ss << "PostUpdateThread: " << id;
IGN_PROFILE_THREAD_NAME(ss.str().c_str());
while (this->postUpdateThreadsRunning)
{
std::stringstream ss;
ss << "PostUpdateThread: " << id;
IGN_PROFILE_THREAD_NAME(ss.str().c_str());
while (this->postUpdateThreadsRunning)
this->postUpdateStartBarrier->Wait();
if (this->postUpdateThreadsRunning)
{
this->postUpdateStartBarrier->Wait();
if (this->postUpdateThreadsRunning)
{
system->PostUpdate(this->currentInfo, this->entityCompMgr);
}
this->postUpdateStopBarrier->Wait();
system->PostUpdate(this->currentInfo, this->entityCompMgr);
}
igndbg << "Exiting postupdate worker thread ("
<< id << ")" << std::endl;
}));
id++;
}
this->postUpdateStopBarrier->Wait();
}
igndbg << "Exiting postupdate worker thread ("
<< id << ")" << std::endl;
}));
id++;
}
}

Expand All @@ -577,13 +533,13 @@ void SimulationRunner::UpdateSystems()

{
IGN_PROFILE("PreUpdate");
for (auto& system : this->systemsPreupdate)
for (auto& system : this->systemMgr->SystemsPreUpdate())
system->PreUpdate(this->currentInfo, this->entityCompMgr);
}

{
IGN_PROFILE("Update");
for (auto& system : this->systemsUpdate)
for (auto& system : this->systemMgr->SystemsUpdate())
system->Update(this->currentInfo, this->entityCompMgr);
}

Expand Down Expand Up @@ -903,19 +859,7 @@ void SimulationRunner::LoadPlugin(const Entity _entity,
const std::string &_name,
const sdf::ElementPtr &_sdf)
{
std::optional<SystemPluginPtr> system;
{
std::lock_guard<std::mutex> lock(this->systemLoaderMutex);
system = this->systemLoader->LoadPlugin(_fname, _name, _sdf);
}

// System correctly loaded from library
if (system)
{
this->AddSystem(system.value(), _entity, _sdf);
igndbg << "Loaded system [" << _name
<< "] for entity [" << _entity << "]" << std::endl;
}
this->systemMgr->LoadPlugin(_entity, _fname, _name, _sdf);
}

//////////////////////////////////////////////////
Expand Down Expand Up @@ -1085,8 +1029,7 @@ size_t SimulationRunner::EntityCount() const
/////////////////////////////////////////////////
size_t SimulationRunner::SystemCount() const
{
std::lock_guard<std::mutex> lock(this->pendingSystemsMutex);
return this->systems.size() + this->pendingSystems.size();
return this->systemMgr->TotalCount();
}

/////////////////////////////////////////////////
Expand Down
Loading