Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Parallelize State call in ECM #451

Merged
merged 35 commits into from
Jan 15, 2021
Merged
Show file tree
Hide file tree
Changes from 24 commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3904c21
draft
Oct 14, 2020
c526f78
vector to map
Oct 15, 2020
3976133
Finish conversion
Oct 15, 2020
67a3351
Finish implementation of optimization
Oct 16, 2020
93cdb81
Merge branch 'ign-gazebo3' into jshep1/scenebroadcaster_optimizations
Oct 16, 2020
b5e86b6
Clean up
Oct 16, 2020
31812ff
Clean up again
Oct 16, 2020
b74426d
Another pass
Oct 16, 2020
5adc0c6
Another one
Oct 16, 2020
8eda00c
Merge branch 'ign-gazebo3' into jshep1/scenebroadcaster_optimizations
Oct 20, 2020
afee595
requested fixes
Oct 21, 2020
b521bed
Merge branch 'jshep1/scenebroadcaster_optimizations' of https://githu…
Oct 21, 2020
95cf961
Merge branch 'ign-gazebo3' into jshep1/scenebroadcaster_optimizations
Oct 23, 2020
e7acbc8
Merge branch 'ign-gazebo3' into jshep1/scenebroadcaster_optimizations
Oct 27, 2020
755b88d
remove unneeded if statement
Oct 29, 2020
5e0a9cd
init commit
Oct 29, 2020
5a7806d
Add multithreading implementation
Nov 3, 2020
c729487
prototype version
Nov 6, 2020
f9281e6
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Nov 6, 2020
d5d7a5e
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Nov 12, 2020
32dbb2b
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Nov 17, 2020
18e38b7
clean up and optimize more
Nov 19, 2020
6e931ec
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Nov 19, 2020
a1bb525
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Dec 2, 2020
8fbcd5d
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Dec 10, 2020
23ab021
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Dec 15, 2020
8c274e1
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Dec 16, 2020
213071e
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Dec 17, 2020
0ba4684
fix left out components bug
Dec 17, 2020
45e97f2
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
chapulina Dec 31, 2020
97d98fb
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Jan 7, 2021
2e25321
revert unused function, add more docs
Jan 8, 2021
72db3ed
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
chapulina Jan 14, 2021
e376b1e
Suggestions to #451 (#560)
chapulina Jan 15, 2021
7343281
Merge branch 'ign-gazebo3' into jshep1/parallelize_ecm
Jan 15, 2021
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions include/ignition/gazebo/EntityComponentManager.hh
Original file line number Diff line number Diff line change
Expand Up @@ -727,6 +727,10 @@ namespace ignition
const std::unordered_set<ComponentTypeId> &_types = {},
bool _full = false) const;

/// \brief Allots the work for multiple threads prior to running
/// `AddEntityToMessage`.
private: void CalculateComponentThreadLoad() const;

// Make runners friends so that they can manage entity creation and
// removal. This should be safe since runners are internal
// to Gazebo.
Expand Down
151 changes: 142 additions & 9 deletions src/EntityComponentManager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,37 @@ class ignition::gazebo::EntityComponentManagerPrivate
/// \brief Flag that indicates if all entities should be removed.
public: bool removeAllEntities{false};

/// \brief True if the entityComponents map was changed. Primarily used
/// by the multithreading functionality in `State()` to allocate work to
/// each thread.
public: bool entityComponentsDirty{true};

/// \brief The number of threads to run entity component computations.
/// Initialized to 1, but calculated later based on the hardware thread
/// support and number of components each thread is allowed to handle, see
/// `componentsPerThread`
public: uint64_t numComponentThreads = 1;
chapulina marked this conversation as resolved.
Show resolved Hide resolved

/// \brief The minimum number of entity components each thread is allowed
/// to process. For instance, if `componentsPerThread` is 50, any number of
/// entity components below 50 would only spawn one worker thread, 100 would
/// spawn two, and so on. If the number of components is greater than
/// `componentsPerThread` * `numComponentThreads`, the work will be
/// distributed evenly between all threads.
public: const uint64_t componentsPerThread = 50;

/// \brief The set of components that each entity has
public: std::unordered_map<Entity,
std::unordered_map<ComponentTypeId, ComponentKey>> entityComponents;

/// \brief A vector of iterators to evenly distributed spots in the
/// `entityComponents` map. Threads use this vector for easy access of
/// their pre-allocated work. This vector is recalculated if
/// `entityComponents` is changed (when `entityComponentsDirty` == true).
public: std::vector<std::unordered_map<Entity,
std::unordered_map<ComponentTypeId, ComponentKey>>::iterator>
entityComponentIterators;

/// \brief A mutex to protect newly created entityes.
public: std::mutex entityCreatedMutex;

Expand Down Expand Up @@ -222,6 +249,7 @@ void EntityComponentManager::ProcessRemoveEntityRequests()
this->dataPtr->entities = EntityGraph();
this->dataPtr->entityComponents.clear();
this->dataPtr->toRemoveEntities.clear();
this->dataPtr->entityComponentsDirty = true;

for (std::pair<const ComponentTypeId,
std::unique_ptr<ComponentStorageBase>> &comp: this->dataPtr->components)
Expand Down Expand Up @@ -257,6 +285,7 @@ void EntityComponentManager::ProcessRemoveEntityRequests()

// Remove the entry in the entityComponent map
this->dataPtr->entityComponents.erase(entity);
this->dataPtr->entityComponentsDirty = true;
}

// Remove the entity from views.
Expand Down Expand Up @@ -295,6 +324,7 @@ bool EntityComponentManager::RemoveComponent(
this->dataPtr->entityComponents[_entity].erase(_key.first);
this->dataPtr->oneTimeChangedComponents.erase(_key);
this->dataPtr->periodicChangedComponents.erase(_key);
this->dataPtr->entityComponentsDirty = true;

this->UpdateViews(_entity);
return true;
Expand Down Expand Up @@ -463,6 +493,7 @@ ComponentKey EntityComponentManager::CreateComponentImplementation(
this->dataPtr->entityComponents[_entity].insert(
{_componentTypeId, componentKey});
this->dataPtr->oneTimeChangedComponents.insert(componentKey);
this->dataPtr->entityComponentsDirty = true;

if (componentIdPair.second)
this->RebuildViews();
Expand Down Expand Up @@ -894,24 +925,94 @@ void EntityComponentManager::ChangedState(
// TODO(anyone) New / removed / changed components
}

void EntityComponentManager::CalculateComponentThreadLoad() const
{
// If the entity component vector is dirty, we need to recalculate the
// threads and each threads work load
if (!this->dataPtr->entityComponentsDirty)
return;

this->dataPtr->entityComponentsDirty = false;
this->dataPtr->entityComponentIterators.clear();
auto startIt = this->dataPtr->entityComponents.begin();
int numComponents = this->dataPtr->entityComponents.size();

// Each thread runs `this->dataPtr->componentsPerThread` or more
// entity components, an arbitrary number
int numThreads = numComponents / this->dataPtr->componentsPerThread + 1;
int maxThreads = std::thread::hardware_concurrency();

// Set the number of threads to spawn to the min of the calculated thread
// count or max threads that the hardware supports
this->dataPtr->numComponentThreads = std::min(numThreads, maxThreads);
chapulina marked this conversation as resolved.
Show resolved Hide resolved
int componentsPerThread = numComponents / this->dataPtr->numComponentThreads;

// Push back the starting iterator
this->dataPtr->entityComponentIterators.push_back(startIt);
for (uint64_t i = 0; i < this->dataPtr->numComponentThreads; i++)
{
// If we have added all of the components to the iterator vector, we are
// done so push back the end iterator
numComponents -= componentsPerThread;
if (numComponents <= 0)
{
this->dataPtr->entityComponentIterators.push_back(
this->dataPtr->entityComponents.end());
break;
}

// Get the iterator to the next starting group of components
auto nextIt = std::next(startIt, componentsPerThread);
this->dataPtr->entityComponentIterators.push_back(nextIt);
startIt = nextIt;
}
}

//////////////////////////////////////////////////
ignition::msgs::SerializedState EntityComponentManager::State(
const std::unordered_set<Entity> &_entities,
const std::unordered_set<ComponentTypeId> &_types) const
{
std::mutex stateMapMutex;
std::vector<std::thread> workers;
ignition::msgs::SerializedState stateMsg;
for (const auto &it : this->dataPtr->entityComponents)
{
auto entity = it.first;
if (!_entities.empty() && _entities.find(entity) == _entities.end())

this->CalculateComponentThreadLoad();

auto functor = [&](auto itStart, auto itEnd) {
msgs::SerializedState threadStateMsg;
while (itStart != itEnd)
{
continue;
if (_entities.empty() ||
_entities.find((*itStart).first) != _entities.end())
{
this->AddEntityToMessage(threadStateMsg, (*itStart).first, _types);
}
itStart++;
}
std::lock_guard<std::mutex> lock(stateMapMutex);

for (auto &entity : threadStateMsg.entities())
{
auto entityMsg = stateMsg.add_entities();
entityMsg->set_id(entity.id());
entityMsg->set_remove(entity.remove());
JShep1 marked this conversation as resolved.
Show resolved Hide resolved
}
};

this->AddEntityToMessage(stateMsg, entity, _types);
// Spawn workers
uint64_t numThreads = this->dataPtr->entityComponentIterators.size() - 1;
for (uint64_t i = 0; i < numThreads; i++)
{
workers.push_back(std::thread(functor,
this->dataPtr->entityComponentIterators[i],
this->dataPtr->entityComponentIterators[i+1]));
}

// Wait for each thread to finish processing its components
std::for_each(workers.begin(), workers.end(),
[](std::thread &t){ t.join(); });

return stateMsg;
}

Expand All @@ -922,11 +1023,43 @@ void EntityComponentManager::State(
const std::unordered_set<ComponentTypeId> &_types,
bool _full) const
{
for (const auto &it : this->dataPtr->entityComponents)
std::mutex stateMapMutex;
std::vector<std::thread> workers;

this->CalculateComponentThreadLoad();

auto functor = [&](auto itStart, auto itEnd) {
msgs::SerializedStateMap threadMap;
while (itStart != itEnd)
{
if (_entities.empty() ||
_entities.find((*itStart).first) != _entities.end())
{
this->AddEntityToMessage(threadMap, (*itStart).first, _types, _full);
}
itStart++;
}
std::lock_guard<std::mutex> lock(stateMapMutex);

for (auto &entity : threadMap.entities())
{
(*_state.mutable_entities())[static_cast<uint64_t>(entity.first)] =
entity.second;
}
};

// Spawn workers
uint64_t numThreads = this->dataPtr->entityComponentIterators.size() - 1;
for (uint64_t i = 0; i < numThreads; i++)
{
if (_entities.empty() || _entities.find(it.first) != _entities.end())
this->AddEntityToMessage(_state, it.first, _types, _full);
workers.push_back(std::thread(functor,
this->dataPtr->entityComponentIterators[i],
this->dataPtr->entityComponentIterators[i+1]));
}

// Wait for each thread to finish processing its components
std::for_each(workers.begin(), workers.end(),
[](std::thread &t){ t.join(); });
}

//////////////////////////////////////////////////
Expand Down