Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

APIs for retrieving models in parallel #199

Merged
merged 9 commits into from
Nov 5, 2021
Merged
48 changes: 48 additions & 0 deletions include/ignition/fuel_tools/FuelClient.hh
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

#include <memory>
#include <string>
#include <tuple>
#include <vector>
#include <ignition/common/URI.hh>

Expand Down Expand Up @@ -203,6 +204,31 @@ namespace ignition
public: Result DownloadModel(const ModelIdentifier &_id,
const std::vector<std::string> &_headers);

/// \brief Download a model from ignition fuel. This will override an
/// existing local copy of the model.
/// \param[in] _id The model identifier.
/// \param[in] _headers Headers to set on the HTTP request.
/// \param[out] _dependencies List of models that this model depends on.
/// \return Result of the download operation
public: Result DownloadModel(const ModelIdentifier &_id,
const std::vector<std::string> &_headers,
std::vector<ModelIdentifier> &_dependencies);

/// \brief Retrieve the list of dependencies for a model.
/// \param[in] _id The model identifier.
/// \param[out] _dependencies The list of dependencies.
/// \return Result of the operation
public: Result ModelDependencies(const ModelIdentifier &_id,
std::vector<ModelIdentifier> &_dependencies);

/// \brief Retrieve the list of dependencies for a list of models.
/// \param[in] _id The list of model identifiers.
/// \param[out] _dependencies The list of dependencies.
/// \return Result of the operation
public: Result ModelDependencies(
const std::vector<ModelIdentifier> &_id,
std::vector<ModelIdentifier> &_dependencies);

/// \brief Download a world from Ignition Fuel. This will override an
/// existing local copy of the world.
/// \param[out] _id The world identifier, with local path updated.
Expand All @@ -227,6 +253,28 @@ namespace ignition
public: Result DownloadWorld(const common::URI &_worldUrl,
std::string &_path);

using ModelResult = std::tuple<ModelIdentifier, Result>;

/// \brief Download a list of models from ignition fuel.
/// \param[in] _ids The list of model ids to download.
/// This will also find all recursive dependencies of the models
/// \param[in] _jobs Number of parallel jobs to use to download models
/// \return Result of the download operation.
// The resulting vector will be at least the size of the _ids input
// vector, but may be larger depending on the number of depedencies
// downloaded
public: std::vector<ModelResult> DownloadModels(
const std::vector<ModelIdentifier> &_ids,
size_t _jobs = 2);

/// \brief Download a list of mworlds from ignition fuel.
/// \param[in] _ids The list of world ids to download.
/// \param[in] _jobs Number of parallel jobs to use to download worlds.
/// \return Result of the download operation.
public: Result DownloadWorlds(
const std::vector<WorldIdentifier> &_ids,
size_t _jobs = 2);

/// \brief Check if a model is already present in the local cache.
/// \param[in] _modelUrl The unique URL of the model on a Fuel server.
/// E.g.: https://fuel.ignitionrobotics.org/1.0/caguero/models/Beer
Expand Down
1 change: 0 additions & 1 deletion src/ClientConfig_TEST.cc
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,6 @@ std::string homePath()
/// \ToDo: Move this function to ignition::common::Filesystem
std::string cachePath()
{
std::string cachePath;
#ifndef _WIN32
return std::string("/tmp/ignition/fuel");
#else
Expand Down
254 changes: 248 additions & 6 deletions src/FuelClient.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,28 @@ Result FuelClient::DownloadModel(const ModelIdentifier &_id)
//////////////////////////////////////////////////
Result FuelClient::DownloadModel(const ModelIdentifier &_id,
const std::vector<std::string> &_headers)
{
std::vector<ModelIdentifier> dependencies;
auto res = this->DownloadModel(_id, _headers, dependencies);

if(!res)
return res;

for (auto dep : dependencies)
{
auto dep_res = this->DownloadModel(dep, _headers);

if(!dep_res)
return dep_res;
}

return res;
}

//////////////////////////////////////////////////
Result FuelClient::DownloadModel(const ModelIdentifier &_id,
const std::vector<std::string> &_headers,
std::vector<ModelIdentifier> &_dependencies)
{
// Server config
if (!_id.Server().Url().Valid() || _id.Server().Version().empty())
Expand Down Expand Up @@ -626,10 +648,19 @@ Result FuelClient::DownloadModel(const ModelIdentifier &_id,
if (!this->dataPtr->cache->SaveModel(newId, resp.data, true))
return Result(ResultType::FETCH_ERROR);

return this->ModelDependencies(_id, _dependencies);
}

//////////////////////////////////////////////////
Result FuelClient::ModelDependencies(const ModelIdentifier &_id,
std::vector<ModelIdentifier> &_dependencies)
{
_dependencies.clear();

// Locate any dependencies from the input model and download them.
std::string path;
ignition::msgs::FuelMetadata meta;
if (this->CachedModel(ignition::common::URI(newId.UniqueName()), path))
if (this->CachedModel(ignition::common::URI(_id.UniqueName()), path))
{
std::string metadataPath =
ignition::common::joinPaths(path, "metadata.pbtxt");
Expand Down Expand Up @@ -664,20 +695,61 @@ Result FuelClient::DownloadModel(const ModelIdentifier &_id,

for (int i = 0; i < meta.dependencies_size(); ++i)
{
std::string dependencyPath;
ignition::common::URI dependencyURI(meta.dependencies(i).uri());

// If the model is not already cached, download it; this prevents
// any sort of cyclic dependencies from running infinitely
if (!this->CachedModel(dependencyURI, dependencyPath))
this->DownloadModel(dependencyURI, dependencyPath);
ModelIdentifier dependencyID;
if(!this->ParseModelUrl(dependencyURI, dependencyID))
{
// There is a potential that depdencies are specified via
// [model://model_name], which is valid, but not something that we
// can fetch from Fuel. In that case, warn the user so they have
// a chance to update their specified dependencies.
ignwarn << "Error resolving URL for dependency [" <<
meta.dependencies(i).uri() << "] of model [" <<
_id.UniqueName() <<"]: Skipping" << std::endl;
} else {
_dependencies.push_back(dependencyID);
}
}
}
}

return Result(ResultType::FETCH);
}

//////////////////////////////////////////////////
Result FuelClient::ModelDependencies(
const std::vector<ModelIdentifier> &_ids,
std::vector<ModelIdentifier> &_dependencies)
{
std::vector<ModelIdentifier> newDeps;
for (auto modelId : _ids)
{
std::vector<ModelIdentifier> modelDeps;
auto result = this->ModelDependencies(modelId, modelDeps);

if (!modelDeps.empty())
{
std::vector<ModelIdentifier> recursiveDeps;
this->ModelDependencies(modelDeps, recursiveDeps);

for (auto dep : modelDeps)
{
newDeps.push_back(dep);
}

for (auto dep : recursiveDeps)
{
newDeps.push_back(dep);
}
}
}

_dependencies = std::vector<ModelIdentifier>(newDeps.begin(), newDeps.end());
return Result(ResultType::FETCH);
}


//////////////////////////////////////////////////
Result FuelClient::DownloadWorld(WorldIdentifier &_id)
{
Expand Down Expand Up @@ -745,6 +817,176 @@ Result FuelClient::DownloadWorld(WorldIdentifier &_id)
return Result(ResultType::FETCH);
}

//////////////////////////////////////////////////
namespace std
{
template<> struct hash<ModelIdentifier>
{
std::size_t operator()(const ModelIdentifier &_id) const noexcept
{
return std::hash<std::string>{}(_id.AsString());
}
};
}

//////////////////////////////////////////////////
std::vector<FuelClient::ModelResult> FuelClient::DownloadModels(
const std::vector<ModelIdentifier> &_ids,
size_t _jobs)
{
std::mutex resultMutex;
std::vector<FuelClient::ModelResult> result;

std::mutex idsMutex;
std::deque<ModelIdentifier> idsToDownload(_ids.begin(), _ids.end());
std::unordered_set<ModelIdentifier> uniqueIds(_ids.begin(), _ids.end());

std::atomic<bool> running = true;

auto downloadWorker = [&](){
ModelIdentifier id;

while(running)
{
// Pop the next ID off the queue
{
std::lock_guard<std::mutex> lock(idsMutex);

if (idsToDownload.empty())
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));
continue;
}

id = idsToDownload.front();
idsToDownload.pop_front();
}

std::vector<ModelIdentifier> dependencies;
auto modelResult = this->DownloadModel(id, {}, dependencies);

{
std::lock_guard<std::mutex> lock(resultMutex);
result.push_back(std::make_tuple(id, modelResult));
}

if (!dependencies.empty())
{
std::lock_guard<std::mutex> lock(idsMutex);
igndbg << "Adding " << dependencies.size()
<< " model dependencies to queue from " << id.Name() << "\n";
for (auto dep : dependencies)
{
if (uniqueIds.count(dep) == 0)
{
idsToDownload.push_back(dep);
uniqueIds.insert(dep);
}
}
}
}
};

std::vector<std::thread> workers;

for (size_t ii = 0; ii < _jobs; ++ii)
{
workers.push_back(std::thread(downloadWorker));
}

ignmsg << "Preparing to download "
<< idsToDownload.size() << " models with "
<< _jobs << " worker threads\n";


while (running)
{
std::this_thread::sleep_for(std::chrono::milliseconds(10));

if(idsToDownload.empty())
{
running = false;
}
}

for (auto& worker : workers)
{
worker.join();
}

ignmsg << "Finished, downloaded " << result.size() << " models in total\n";

return result;
}

//////////////////////////////////////////////////
Result FuelClient::DownloadWorlds(
const std::vector<WorldIdentifier> &_ids, size_t _jobs)
{
std::deque<std::future<ignition::fuel_tools::Result>> tasks;
// Check for finished tasks by checking if the status of their futures is
// "ready". If a task is finished, check if it succeeded and print out an
// error message if it failed. When a task is finished, it gets erased from
// the tasks list to make room for other tasks to be added.
size_t itemCount = 0;
const size_t totalItemCount = _ids.size();

ignmsg << "Using " << _jobs << " jobs to download collection of "
<< totalItemCount << " items" << std::endl;

auto checkForFinishedTasks = [&itemCount, &totalItemCount, &tasks] {
auto finishedIt =
std::partition(tasks.begin(), tasks.end(), [](const auto &_task)
{
return std::future_status::ready !=
_task.wait_for(std::chrono::milliseconds(100));
});

if (finishedIt != tasks.end())
{
for (auto taskIt = finishedIt; taskIt != tasks.end(); ++taskIt)
{
ignition::fuel_tools::Result result = taskIt->get();
if (result)
{
++itemCount;
}
else
{
ignerr << result.ReadableResult() << std::endl;
}
}

tasks.erase(finishedIt, tasks.end());
ignmsg << "Downloaded: " << itemCount << " / " << totalItemCount
<< std::endl;
}
};

// We need a mutable worldId because DownloadWorld modifies it
for (auto& id : _ids)
{
while (tasks.size() >= _jobs)
{
checkForFinishedTasks();
}
std::this_thread::sleep_for(std::chrono::milliseconds(10));
auto handle = std::async(std::launch::async, [&id, this]
{
WorldIdentifier tempId = id;
return this->DownloadWorld(tempId);
});
tasks.push_back(std::move(handle));
}

while (!tasks.empty())
{
checkForFinishedTasks();
}

return Result(ResultType::FETCH);
}

//////////////////////////////////////////////////
bool FuelClient::ParseModelUrl(const common::URI &_modelUrl,
ModelIdentifier &_id)
Expand Down
Loading