diff --git a/include/ignition/fuel_tools/FuelClient.hh b/include/ignition/fuel_tools/FuelClient.hh index f2d958c3..0b39dddf 100644 --- a/include/ignition/fuel_tools/FuelClient.hh +++ b/include/ignition/fuel_tools/FuelClient.hh @@ -20,6 +20,7 @@ #include #include +#include #include #include @@ -203,6 +204,31 @@ namespace ignition public: Result DownloadModel(const ModelIdentifier &_id, const std::vector &_headers); + /// \brief Download a model from ignition fuel. This will override an + /// existing local copy of the model. + /// \param[in] _id The model identifier. + /// \param[in] _headers Headers to set on the HTTP request. + /// \param[out] _dependencies List of models that this model depends on. + /// \return Result of the download operation + public: Result DownloadModel(const ModelIdentifier &_id, + const std::vector &_headers, + std::vector &_dependencies); + + /// \brief Retrieve the list of dependencies for a model. + /// \param[in] _id The model identifier. + /// \param[out] _dependencies The list of dependencies. + /// \return Result of the operation + public: Result ModelDependencies(const ModelIdentifier &_id, + std::vector &_dependencies); + + /// \brief Retrieve the list of dependencies for a list of models. + /// \param[in] _id The list of model identifiers. + /// \param[out] _dependencies The list of dependencies. + /// \return Result of the operation + public: Result ModelDependencies( + const std::vector &_id, + std::vector &_dependencies); + /// \brief Download a world from Ignition Fuel. This will override an /// existing local copy of the world. /// \param[out] _id The world identifier, with local path updated. @@ -227,6 +253,28 @@ namespace ignition public: Result DownloadWorld(const common::URI &_worldUrl, std::string &_path); + using ModelResult = std::tuple; + + /// \brief Download a list of models from ignition fuel. + /// \param[in] _ids The list of model ids to download. + /// This will also find all recursive dependencies of the models + /// \param[in] _jobs Number of parallel jobs to use to download models + /// \return Result of the download operation. + // The resulting vector will be at least the size of the _ids input + // vector, but may be larger depending on the number of depedencies + // downloaded + public: std::vector DownloadModels( + const std::vector &_ids, + size_t _jobs = 2); + + /// \brief Download a list of mworlds from ignition fuel. + /// \param[in] _ids The list of world ids to download. + /// \param[in] _jobs Number of parallel jobs to use to download worlds. + /// \return Result of the download operation. + public: Result DownloadWorlds( + const std::vector &_ids, + size_t _jobs = 2); + /// \brief Check if a model is already present in the local cache. /// \param[in] _modelUrl The unique URL of the model on a Fuel server. /// E.g.: https://fuel.ignitionrobotics.org/1.0/caguero/models/Beer diff --git a/src/ClientConfig_TEST.cc b/src/ClientConfig_TEST.cc index 372b2005..24bb229e 100644 --- a/src/ClientConfig_TEST.cc +++ b/src/ClientConfig_TEST.cc @@ -64,7 +64,6 @@ std::string homePath() /// \ToDo: Move this function to ignition::common::Filesystem std::string cachePath() { - std::string cachePath; #ifndef _WIN32 return std::string("/tmp/ignition/fuel"); #else diff --git a/src/FuelClient.cc b/src/FuelClient.cc index 21f249bd..a5e72ca6 100644 --- a/src/FuelClient.cc +++ b/src/FuelClient.cc @@ -564,6 +564,28 @@ Result FuelClient::DownloadModel(const ModelIdentifier &_id) ////////////////////////////////////////////////// Result FuelClient::DownloadModel(const ModelIdentifier &_id, const std::vector &_headers) +{ + std::vector dependencies; + auto res = this->DownloadModel(_id, _headers, dependencies); + + if(!res) + return res; + + for (auto dep : dependencies) + { + auto dep_res = this->DownloadModel(dep, _headers); + + if(!dep_res) + return dep_res; + } + + return res; +} + +////////////////////////////////////////////////// +Result FuelClient::DownloadModel(const ModelIdentifier &_id, + const std::vector &_headers, + std::vector &_dependencies) { // Server config if (!_id.Server().Url().Valid() || _id.Server().Version().empty()) @@ -626,10 +648,19 @@ Result FuelClient::DownloadModel(const ModelIdentifier &_id, if (!this->dataPtr->cache->SaveModel(newId, resp.data, true)) return Result(ResultType::FETCH_ERROR); + return this->ModelDependencies(_id, _dependencies); +} + +////////////////////////////////////////////////// +Result FuelClient::ModelDependencies(const ModelIdentifier &_id, + std::vector &_dependencies) +{ + _dependencies.clear(); + // Locate any dependencies from the input model and download them. std::string path; ignition::msgs::FuelMetadata meta; - if (this->CachedModel(ignition::common::URI(newId.UniqueName()), path)) + if (this->CachedModel(ignition::common::URI(_id.UniqueName()), path)) { std::string metadataPath = ignition::common::joinPaths(path, "metadata.pbtxt"); @@ -664,20 +695,61 @@ Result FuelClient::DownloadModel(const ModelIdentifier &_id, for (int i = 0; i < meta.dependencies_size(); ++i) { - std::string dependencyPath; ignition::common::URI dependencyURI(meta.dependencies(i).uri()); - // If the model is not already cached, download it; this prevents - // any sort of cyclic dependencies from running infinitely - if (!this->CachedModel(dependencyURI, dependencyPath)) - this->DownloadModel(dependencyURI, dependencyPath); + ModelIdentifier dependencyID; + if(!this->ParseModelUrl(dependencyURI, dependencyID)) + { + // There is a potential that depdencies are specified via + // [model://model_name], which is valid, but not something that we + // can fetch from Fuel. In that case, warn the user so they have + // a chance to update their specified dependencies. + ignwarn << "Error resolving URL for dependency [" << + meta.dependencies(i).uri() << "] of model [" << + _id.UniqueName() <<"]: Skipping" << std::endl; + } else { + _dependencies.push_back(dependencyID); + } + } + } + } + + return Result(ResultType::FETCH); +} + +////////////////////////////////////////////////// +Result FuelClient::ModelDependencies( + const std::vector &_ids, + std::vector &_dependencies) +{ + std::vector newDeps; + for (auto modelId : _ids) + { + std::vector modelDeps; + auto result = this->ModelDependencies(modelId, modelDeps); + + if (!modelDeps.empty()) + { + std::vector recursiveDeps; + this->ModelDependencies(modelDeps, recursiveDeps); + + for (auto dep : modelDeps) + { + newDeps.push_back(dep); + } + + for (auto dep : recursiveDeps) + { + newDeps.push_back(dep); } } } + _dependencies = std::vector(newDeps.begin(), newDeps.end()); return Result(ResultType::FETCH); } + ////////////////////////////////////////////////// Result FuelClient::DownloadWorld(WorldIdentifier &_id) { @@ -745,6 +817,176 @@ Result FuelClient::DownloadWorld(WorldIdentifier &_id) return Result(ResultType::FETCH); } +////////////////////////////////////////////////// +namespace std +{ + template<> struct hash + { + std::size_t operator()(const ModelIdentifier &_id) const noexcept + { + return std::hash{}(_id.AsString()); + } + }; +} + +////////////////////////////////////////////////// +std::vector FuelClient::DownloadModels( + const std::vector &_ids, + size_t _jobs) +{ + std::mutex resultMutex; + std::vector result; + + std::mutex idsMutex; + std::deque idsToDownload(_ids.begin(), _ids.end()); + std::unordered_set uniqueIds(_ids.begin(), _ids.end()); + + std::atomic running = true; + + auto downloadWorker = [&](){ + ModelIdentifier id; + + while(running) + { + // Pop the next ID off the queue + { + std::lock_guard lock(idsMutex); + + if (idsToDownload.empty()) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + continue; + } + + id = idsToDownload.front(); + idsToDownload.pop_front(); + } + + std::vector dependencies; + auto modelResult = this->DownloadModel(id, {}, dependencies); + + { + std::lock_guard lock(resultMutex); + result.push_back(std::make_tuple(id, modelResult)); + } + + if (!dependencies.empty()) + { + std::lock_guard lock(idsMutex); + igndbg << "Adding " << dependencies.size() + << " model dependencies to queue from " << id.Name() << "\n"; + for (auto dep : dependencies) + { + if (uniqueIds.count(dep) == 0) + { + idsToDownload.push_back(dep); + uniqueIds.insert(dep); + } + } + } + } + }; + + std::vector workers; + + for (size_t ii = 0; ii < _jobs; ++ii) + { + workers.push_back(std::thread(downloadWorker)); + } + + ignmsg << "Preparing to download " + << idsToDownload.size() << " models with " + << _jobs << " worker threads\n"; + + + while (running) + { + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + + if(idsToDownload.empty()) + { + running = false; + } + } + + for (auto& worker : workers) + { + worker.join(); + } + + ignmsg << "Finished, downloaded " << result.size() << " models in total\n"; + + return result; +} + +////////////////////////////////////////////////// +Result FuelClient::DownloadWorlds( + const std::vector &_ids, size_t _jobs) +{ + std::deque> tasks; + // Check for finished tasks by checking if the status of their futures is + // "ready". If a task is finished, check if it succeeded and print out an + // error message if it failed. When a task is finished, it gets erased from + // the tasks list to make room for other tasks to be added. + size_t itemCount = 0; + const size_t totalItemCount = _ids.size(); + + ignmsg << "Using " << _jobs << " jobs to download collection of " + << totalItemCount << " items" << std::endl; + + auto checkForFinishedTasks = [&itemCount, &totalItemCount, &tasks] { + auto finishedIt = + std::partition(tasks.begin(), tasks.end(), [](const auto &_task) + { + return std::future_status::ready != + _task.wait_for(std::chrono::milliseconds(100)); + }); + + if (finishedIt != tasks.end()) + { + for (auto taskIt = finishedIt; taskIt != tasks.end(); ++taskIt) + { + ignition::fuel_tools::Result result = taskIt->get(); + if (result) + { + ++itemCount; + } + else + { + ignerr << result.ReadableResult() << std::endl; + } + } + + tasks.erase(finishedIt, tasks.end()); + ignmsg << "Downloaded: " << itemCount << " / " << totalItemCount + << std::endl; + } + }; + + // We need a mutable worldId because DownloadWorld modifies it + for (auto& id : _ids) + { + while (tasks.size() >= _jobs) + { + checkForFinishedTasks(); + } + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + auto handle = std::async(std::launch::async, [&id, this] + { + WorldIdentifier tempId = id; + return this->DownloadWorld(tempId); + }); + tasks.push_back(std::move(handle)); + } + + while (!tasks.empty()) + { + checkForFinishedTasks(); + } + + return Result(ResultType::FETCH); +} + ////////////////////////////////////////////////// bool FuelClient::ParseModelUrl(const common::URI &_modelUrl, ModelIdentifier &_id) diff --git a/src/FuelClient_TEST.cc b/src/FuelClient_TEST.cc index 002e5a7e..384a9a6c 100644 --- a/src/FuelClient_TEST.cc +++ b/src/FuelClient_TEST.cc @@ -590,6 +590,80 @@ TEST_F(FuelClientTest, DownloadModel) } } +///////////////////////////////////////////////// +// Windows doesn't support colons in filenames +// https://github.com/ignitionrobotics/ign-fuel-tools/issues/106 +TEST_F(FuelClientTest, IGN_UTILS_TEST_DISABLED_ON_WIN32(ModelDependencies)) +{ + // Configure to use binary path as cache + ASSERT_EQ(0, ChangeDirectory(PROJECT_BINARY_PATH)); + common::removeAll("test_cache"); + common::createDirectories("test_cache"); + ClientConfig config; + config.SetCacheLocation(common::joinPaths(common::cwd(), "test_cache")); + + // Create client + FuelClient client(config); + EXPECT_EQ(config.CacheLocation(), client.Config().CacheLocation()); + + // Download model with a dependency specified within its `metadata.pbtxt` + { + common::URI url{ + "https://fuel.ignitionrobotics.org/1.0/JShep1/models/hatchback_red_1"}; + common::URI depUrl{ + "https://fuel.ignitionrobotics.org/1.0/JShep1/models/hatchback_1"}; + + ModelIdentifier id; + ModelIdentifier depId; + + ASSERT_TRUE(client.ParseModelUrl(url, id)); + ASSERT_TRUE(client.ParseModelUrl(depUrl, depId)); + + // Check it is not cached + std::string cachedPath; + Result res1 = client.CachedModel(url, cachedPath); + EXPECT_FALSE(res1); + EXPECT_EQ(Result(ResultType::FETCH_ERROR), res1); + + // Check the dependency is not cached + Result res2 = client.CachedModel(depUrl, cachedPath); + EXPECT_FALSE(res2); + EXPECT_EQ(Result(ResultType::FETCH_ERROR), res2); + + // Download on the model, do not download dependencies + { + std::vector dependencies; + Result res3 = client.DownloadModel(id, {}, dependencies); + EXPECT_TRUE(res3); + EXPECT_EQ(Result(ResultType::FETCH_ALREADY_EXISTS), res3); + EXPECT_EQ(1u, dependencies.size()); + } + + // Check that the model is cached + { + Result res4 = client.CachedModel(url, cachedPath); + EXPECT_TRUE(res4); + EXPECT_EQ(Result(ResultType::FETCH_ALREADY_EXISTS), res4); + } + + // Check the dependency is not cached + { + Result res5 = client.CachedModel(depUrl, cachedPath); + EXPECT_FALSE(res5); + EXPECT_EQ(Result(ResultType::FETCH_ERROR), res5); + } + + // Check that the dependencies are populated + { + std::vector dependencies; + Result res6 = client.ModelDependencies(id, dependencies); + EXPECT_TRUE(res6); + EXPECT_EQ(1u, dependencies.size()); + } + } +} + + ///////////////////////////////////////////////// // Windows doesn't support colons in filenames // https://github.com/ignitionrobotics/ign-fuel-tools/issues/106 diff --git a/src/ign.cc b/src/ign.cc index 74402dcf..a3b65e3d 100644 --- a/src/ign.cc +++ b/src/ign.cc @@ -630,93 +630,17 @@ extern "C" IGNITION_FUEL_TOOLS_VISIBLE int downloadUrl(const char *_url, return false; } - size_t jobs = std::max(1, _jobs); - - ignmsg << "Using " << jobs << " jobs to download collection of " - << totalItemCount << " items" << std::endl; - - std::deque> tasks; - - // Check for finished tasks by checking if the status of their futures is - // "ready". If a task is finished, check if it succeeded and print out an - // error message if it failed. When a task is finished, it gets erased from - // the tasks list to make room for other tasks to be added. - size_t itemCount = 0; - auto checkForFinishedTasks = [&itemCount, &totalItemCount, &tasks] { - auto finishedIt = - std::partition(tasks.begin(), tasks.end(), [](const auto &_task) - { - return std::future_status::ready != - _task.wait_for(std::chrono::milliseconds(100)); - }); - - if (finishedIt != tasks.end()) - { - for (auto taskIt = finishedIt; taskIt != tasks.end(); ++taskIt) - { - ignition::fuel_tools::Result result = taskIt->get(); - if (result) - { - ++itemCount; - } - else - { - ignerr << result.ReadableResult() << std::endl; - } - } - - tasks.erase(finishedIt, tasks.end()); - ignmsg << "Downloaded: " << itemCount << " / " << totalItemCount - << std::endl; - } - }; - - // Here we use std::async to download items in parallel. The download task - // is started asynchronously and gets added to the task list which is - // monitored for completion. if (downloadModels) { - for (const auto &modelId : modelIds) - { - // Check if any of the tasks are done. Don't start a new task until the - // number of tasks in the tasks lists is below the number of jobs - // specified by the user. - while (tasks.size() >= jobs) - { - checkForFinishedTasks(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - auto handle = std::async(std::launch::async, [&modelId, &client] - { - return client.DownloadModel(modelId); - }); - tasks.push_back(std::move(handle)); - } + auto result = client.DownloadModels(modelIds, _jobs); } if (downloadWorlds) { - // We need a mutable worldId because DownloadWorld modifies it - for (auto &worldId : worldIds) - { - // Check if any of the tasks are done - while (tasks.size() >= jobs) - { - checkForFinishedTasks(); - } - std::this_thread::sleep_for(std::chrono::milliseconds(10)); - auto handle = std::async(std::launch::async, [&worldId, &client] - { - return client.DownloadWorld(worldId); - }); - tasks.push_back(std::move(handle)); - } - } - - // All the tasks have been queued. Now wait for them to finish - while (!tasks.empty()) - { - checkForFinishedTasks(); + auto result = client.DownloadWorlds(worldIds, _jobs); + ignerr << "Failed to download worlds for collection [" + << collection.Name() + << "]" << std::endl; } } else