From c2710604deecba94ba7b5f5467fb17417e159cd2 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 28 Oct 2024 10:08:10 +0200 Subject: [PATCH] chore: refactor snapshot expanding logic (#4003) S3 and file expansion logic had some duplicate code. this PR refactors it before adding GCS support. Signed-off-by: Roman Gershman --- src/server/detail/snapshot_storage.cc | 152 +++++++++++++------------- src/server/detail/snapshot_storage.h | 26 +++-- src/server/server_family.cc | 2 +- 3 files changed, 95 insertions(+), 85 deletions(-) diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc index c10280d87cda..b92ad090b82f 100644 --- a/src/server/detail/snapshot_storage.cc +++ b/src/server/detail/snapshot_storage.cc @@ -30,17 +30,19 @@ namespace dfly { namespace detail { using namespace util; +using namespace std; -std::optional> GetBucketPath(std::string_view path) { +// Returns bucket_name, obj_path for an s3 path. +optional> GetBucketPath(string_view path) { std::string_view clean = absl::StripPrefix(path, kS3Prefix); size_t pos = clean.find('/'); - if (pos == std::string_view::npos) { - return std::make_pair(std::string(clean), ""); + if (pos == string_view::npos) { + return make_pair(string(clean), ""); } - std::string bucket_name{clean.substr(0, pos)}; - std::string obj_path{clean.substr(pos + 1)}; + string bucket_name{clean.substr(0, pos)}; + string obj_path{clean.substr(pos + 1)}; return std::make_pair(std::move(bucket_name), std::move(obj_path)); } @@ -48,6 +50,30 @@ std::optional> GetBucketPath(std::string_vie const int kRdbWriteFlags = O_CREAT | O_WRONLY | O_TRUNC | O_CLOEXEC | O_DIRECT; #endif +io::Result, GenericError> SnapshotStorage::ExpandSnapshot(const string& load_path) { + if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) { + return nonstd::make_unexpected( + GenericError(std::make_error_code(std::errc::invalid_argument), "Bad filename extension")); + } + + error_code ec = CheckPath(load_path); + if (ec) { + return nonstd::make_unexpected(GenericError(ec, "File not found")); + } + + vector paths{{load_path}}; + + // Collect all other files in case we're loading dfs. + if (absl::EndsWith(load_path, "summary.dfs")) { + auto res = ExpandFromPath(load_path); + if (!res) { + return res; + } + paths.insert(paths.end(), res->begin(), res->end()); + } + return paths; +} + FileSnapshotStorage::FileSnapshotStorage(fb2::FiberQueueThreadPool* fq_threadpool) : fq_threadpool_{fq_threadpool} { } @@ -143,43 +169,29 @@ io::Result FileSnapshotStorage::LoadPath(std::string_ std::make_error_code(std::errc::no_such_file_or_directory), "Snapshot not found")); } -io::Result, GenericError> FileSnapshotStorage::LoadPaths( - const std::string& load_path) { - if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) { - return nonstd::make_unexpected( - GenericError(std::make_error_code(std::errc::invalid_argument), "Bad filename extension")); - } - - std::vector paths{{load_path}}; - - // Collect all other files in case we're loading dfs. - if (absl::EndsWith(load_path, "summary.dfs")) { - std::string glob = absl::StrReplaceAll(load_path, {{"summary", "????"}}); - io::Result files = io::StatFiles(glob); - - if (files && files->size() == 0) { - return nonstd::make_unexpected( - GenericError(std::make_error_code(std::errc::no_such_file_or_directory), - "Cound not find DFS shard files")); - } +io::Result, GenericError> FileSnapshotStorage::ExpandFromPath(const string& path) { + string glob = absl::StrReplaceAll(path, {{"summary", "????"}}); + io::Result files = io::StatFiles(glob); - for (auto& fstat : *files) { - paths.push_back(std::move(fstat.name)); - } + if (!files || files->size() == 0) { + return nonstd::make_unexpected(GenericError(make_error_code(errc::no_such_file_or_directory), + "Cound not find DFS shard files")); } - // Check all paths are valid. - for (const auto& path : paths) { - std::error_code ec; - (void)fs::canonical(path, ec); - if (ec) { - return nonstd::make_unexpected(ec); - } + vector paths; + for (auto& fstat : *files) { + paths.push_back(std::move(fstat.name)); } return paths; } +error_code FileSnapshotStorage::CheckPath(const string& path) { + error_code ec; + std::ignore = fs::canonical(path, ec); + return ec; +} + #ifdef WITH_AWS AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https, bool ec2_metadata, bool sign_payload) { @@ -288,54 +300,48 @@ io::Result AwsS3SnapshotStorage::LoadPath(std::string }); } -io::Result, GenericError> AwsS3SnapshotStorage::LoadPaths( - const std::string& load_path) { - if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) { +io::Result, GenericError> AwsS3SnapshotStorage::ExpandFromPath( + const string& load_path) { + fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor(); + optional> bucket_path = GetBucketPath(load_path); + if (!bucket_path) { return nonstd::make_unexpected( - GenericError(std::make_error_code(std::errc::invalid_argument), "Bad filename extension")); + GenericError{std::make_error_code(std::errc::invalid_argument), "Invalid S3 path"}); } + const auto [bucket_name, obj_path] = *bucket_path; + const std::regex re(absl::StrReplaceAll(obj_path, {{"summary", "[0-9]{4}"}})); - // Find snapshot shard files if we're loading DFS. - if (absl::EndsWith(load_path, "summary.dfs")) { - fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor(); - return proactor->Await([&]() -> io::Result, GenericError> { - std::vector paths{{load_path}}; - - std::optional> bucket_path = GetBucketPath(load_path); - if (!bucket_path) { - return nonstd::make_unexpected( - GenericError{std::make_error_code(std::errc::invalid_argument), "Invalid S3 path"}); - } - const auto [bucket_name, obj_path] = *bucket_path; + // Limit prefix to objects in the same 'directory' as load_path. + const size_t pos = obj_path.find_last_of('/'); + const std::string prefix = (pos == std::string_view::npos) ? "" : obj_path.substr(0, pos); - const std::regex re(absl::StrReplaceAll(obj_path, {{"summary", "[0-9]{4}"}})); - - // Limit prefix to objects in the same 'directory' as load_path. - const size_t pos = obj_path.find_last_of('/'); - const std::string prefix = (pos == std::string_view::npos) ? "" : obj_path.substr(0, pos); - const io::Result, GenericError> keys = ListObjects(bucket_name, prefix); - if (!keys) { - return nonstd::make_unexpected(keys.error()); - } - - for (const SnapStat& key : *keys) { - std::smatch m; - if (std::regex_match(key.name, m, re)) { - paths.push_back(std::string(kS3Prefix) + bucket_name + "/" + key.name); - } + auto paths = proactor->Await([&]() -> io::Result, GenericError> { + const io::Result, GenericError> keys = ListObjects(bucket_name, prefix); + if (!keys) { + return nonstd::make_unexpected(keys.error()); + } + vector res; + for (const SnapStat& key : *keys) { + std::smatch m; + if (std::regex_match(key.name, m, re)) { + res.push_back(std::string(kS3Prefix) + bucket_name + "/" + key.name); } + } - if (paths.size() <= 1) { - return nonstd::make_unexpected( - GenericError{std::make_error_code(std::errc::no_such_file_or_directory), - "Cound not find DFS snapshot shard files"}); - } + return res; + }); - return paths; - }); + if (!paths || paths->empty()) { + return nonstd::make_unexpected( + GenericError{std::make_error_code(std::errc::no_such_file_or_directory), + "Cound not find DFS snapshot shard files"}); } - return std::vector{{load_path}}; + return *paths; +} + +error_code AwsS3SnapshotStorage::CheckPath(const std::string& path) { + return {}; } io::Result, GenericError> diff --git a/src/server/detail/snapshot_storage.h b/src/server/detail/snapshot_storage.h index 83b2bd49661a..cda401b52168 100644 --- a/src/server/detail/snapshot_storage.h +++ b/src/server/detail/snapshot_storage.h @@ -48,9 +48,14 @@ class SnapshotStorage { virtual io::Result LoadPath(std::string_view dir, std::string_view dbfilename) = 0; - // Returns the snapshot paths given the RDB file or DFS summary file path. - virtual io::Result, GenericError> LoadPaths( - const std::string& load_path) = 0; + // Searches for all the relevant snapshot files given the RDB file or DFS summary file path. + io::Result, GenericError> ExpandSnapshot(const std::string& load_path); + + protected: + virtual io::Result, GenericError> ExpandFromPath( + const std::string& path) = 0; + + virtual std::error_code CheckPath(const std::string& path) = 0; }; class FileSnapshotStorage : public SnapshotStorage { @@ -65,10 +70,10 @@ class FileSnapshotStorage : public SnapshotStorage { io::Result LoadPath(std::string_view dir, std::string_view dbfilename) override; - io::Result, GenericError> LoadPaths( - const std::string& load_path) override; - private: + io::Result, GenericError> ExpandFromPath(const std::string& path) final; + + std::error_code CheckPath(const std::string& path) final; util::fb2::FiberQueueThreadPool* fq_threadpool_; }; @@ -86,10 +91,11 @@ class AwsS3SnapshotStorage : public SnapshotStorage { io::Result LoadPath(std::string_view dir, std::string_view dbfilename) override; - io::Result, GenericError> LoadPaths( - const std::string& load_path) override; - private: + io::Result, GenericError> ExpandFromPath(const std::string& path) final; + + std::error_code CheckPath(const std::string& path) final; + struct SnapStat { SnapStat(std::string file_name, int64_t ts) : name(std::move(file_name)), last_modified(std::move(ts)) { @@ -105,8 +111,6 @@ class AwsS3SnapshotStorage : public SnapshotStorage { std::shared_ptr s3_; }; -// Returns bucket_name, obj_path for an s3 path. -std::optional> GetBucketPath(std::string_view path); #endif #ifdef __linux__ diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 76b6e0a2dd22..f781e880aba0 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -1061,7 +1061,7 @@ std::optional> ServerFamily::Load(string_view load_pat return future; } - auto paths_result = snapshot_storage_->LoadPaths(path); + auto paths_result = snapshot_storage_->ExpandSnapshot(path); if (!paths_result) { LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format();