From 1f5dcb873e6b6e4d2e14cbec2a9519c30b91d5f2 Mon Sep 17 00:00:00 2001 From: Roman Gershman Date: Mon, 28 Oct 2024 11:17:09 +0200 Subject: [PATCH] chore: introduce GcsSnapshotStorage It's not been used yet so no functional changes. Signed-off-by: Roman Gershman --- helio | 2 +- src/server/CMakeLists.txt | 2 +- src/server/detail/snapshot_storage.cc | 125 ++++++++++++++++++++++++-- src/server/detail/snapshot_storage.h | 26 ++++++ 4 files changed, 148 insertions(+), 7 deletions(-) diff --git a/helio b/helio index 9dd56595b6b4..f102aa0371c1 160000 --- a/helio +++ b/helio @@ -1 +1 @@ -Subproject commit 9dd56595b6b4cff71338b8c728eb12a8017c6b97 +Subproject commit f102aa0371c1e157c3cbeb3e59e196b05ea39a88 diff --git a/src/server/CMakeLists.txt b/src/server/CMakeLists.txt index 433b76724da2..670077b4c8d7 100644 --- a/src/server/CMakeLists.txt +++ b/src/server/CMakeLists.txt @@ -84,7 +84,7 @@ endif() cxx_link(dfly_transaction dfly_core strings_lib TRDP::fast_float) cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib ${AWS_LIB} jsonpath - strings_lib html_lib + strings_lib html_lib gcp_lib http_client_lib absl::random_random TRDP::jsoncons ${ZSTD_LIB} TRDP::lz4 TRDP::croncpp TRDP::flatbuffers) diff --git a/src/server/detail/snapshot_storage.cc b/src/server/detail/snapshot_storage.cc index b92ad090b82f..454998eb56ab 100644 --- a/src/server/detail/snapshot_storage.cc +++ b/src/server/detail/snapshot_storage.cc @@ -24,6 +24,7 @@ #include "base/logging.h" #include "io/file_util.h" #include "server/engine_shard_set.h" +#include "util/cloud/gcp/gcs_file.h" #include "util/fibers/fiber_file.h" namespace dfly { @@ -32,9 +33,19 @@ namespace detail { using namespace util; using namespace std; -// Returns bucket_name, obj_path for an s3 path. -optional> GetBucketPath(string_view path) { - std::string_view clean = absl::StripPrefix(path, kS3Prefix); +inline bool IsGcsPath(string_view path) { + return absl::StartsWith(path, kGCSPrefix); +} + +constexpr string_view kSummarySuffix = "summary.dfs"sv; + +pair GetBucketPath(string_view path) { + string_view clean = path; + if (absl::StartsWith(clean, kS3Prefix)) { + clean = absl::StripPrefix(clean, kS3Prefix); + } else { + clean = absl::StripPrefix(clean, kGCSPrefix); + } size_t pos = clean.find('/'); if (pos == string_view::npos) { @@ -43,7 +54,8 @@ optional> GetBucketPath(string_view path) { string bucket_name{clean.substr(0, pos)}; string obj_path{clean.substr(pos + 1)}; - return std::make_pair(std::move(bucket_name), std::move(obj_path)); + + return make_pair(std::move(bucket_name), std::move(obj_path)); } #ifdef __linux__ @@ -156,7 +168,7 @@ io::Result FileSnapshotStorage::LoadPath(std::string_ return std::difftime(l.last_modified, r.last_modified) < 0; }); auto it = std::find_if(short_vec->rbegin(), short_vec->rend(), [](const auto& stat) { - return absl::EndsWith(stat.name, ".rdb") || absl::EndsWith(stat.name, "summary.dfs"); + return absl::EndsWith(stat.name, ".rdb") || absl::EndsWith(stat.name, kSummarySuffix); }); if (it != short_vec->rend()) return it->name; @@ -192,6 +204,109 @@ error_code FileSnapshotStorage::CheckPath(const string& path) { return ec; } +GcsSnapshotStorage::~GcsSnapshotStorage() { +} + +error_code GcsSnapshotStorage::Init(unsigned connect_ms) { + fb2::ProactorBase* proactor = fb2::ProactorBase::me(); + CHECK(proactor); + error_code ec = creds_provider_.Init(connect_ms, proactor); + if (ec) + return ec; + + ctx_ = util::http::TlsClient::CreateSslContext(); + return ec; +} + +io::Result, GenericError> GcsSnapshotStorage::OpenWriteFile( + const std::string& path) { + CHECK(ctx_); + + pair bucket_path = GetBucketPath(path); + fb2::ProactorBase* proactor = fb2::ProactorBase::me(); + unique_ptr conn_pool = cloud::GCS::CreateApiConnectionPool(ctx_, proactor); + cloud::GcsWriteFileOptions opts; + opts.creds_provider = &creds_provider_; + opts.pool = conn_pool.release(); + opts.pool_owned = true; + + io::Result dest_res = + cloud::OpenWriteGcsFile(bucket_path.first, bucket_path.second, opts); + if (!dest_res) { + return nonstd::make_unexpected(GenericError(dest_res.error(), "Could not open file")); + } + + return std::pair(*dest_res, FileType::CLOUD); +} + +io::ReadonlyFileOrError GcsSnapshotStorage::OpenReadFile(const std::string& path) { + if (!IsGcsPath(path)) + return nonstd::make_unexpected(GenericError("Invalid GCS path")); + + auto [bucket, key] = GetBucketPath(path); + fb2::ProactorBase* proactor = fb2::ProactorBase::me(); + unique_ptr conn_pool = cloud::GCS::CreateApiConnectionPool(ctx_, proactor); + cloud::GcsReadFileOptions opts; + opts.creds_provider = &creds_provider_; + opts.pool = conn_pool.release(); + opts.pool_owned = true; + + return cloud::OpenReadGcsFile(bucket, key, opts); +} + +io::Result GcsSnapshotStorage::LoadPath(std::string_view dir, + std::string_view dbfilename) { + LOG(ERROR) << "Load snapshot: Searching for snapshot in GCS path: " << dir << "/" << dbfilename; + return nonstd::make_unexpected(GenericError("Not supported")); +} + +io::Result, GenericError> GcsSnapshotStorage::ExpandFromPath( + const string& load_path) { + if (!IsGcsPath(load_path)) + return nonstd::make_unexpected( + GenericError(make_error_code(errc::invalid_argument), "Invalid GCS path")); + + if (!absl::EndsWith(load_path, kSummarySuffix)) + return vector{}; + + const auto [bucket_name, obj_path] = GetBucketPath(load_path); + regex re(absl::StrReplaceAll(obj_path, {{"summary", "[0-9]{4}"}})); + string_view prefix = absl::StripSuffix(obj_path, kSummarySuffix); + + // Find snapshot shard files if we're loading DFS. + fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor(); + auto paths = proactor->Await([&]() -> io::Result, GenericError> { + vector res; + cloud::GCS gcs(&creds_provider_, ctx_, proactor); + + error_code ec = gcs.List(bucket_name, prefix, false, [&](const cloud::GCS::ObjectItem& item) { + std::smatch m; + string key{item.key}; + if (std::regex_match(key, m, re)) { + res.push_back(absl::StrCat(kGCSPrefix, bucket_name, "/", item.key)); + } + }); + + if (ec) { + return nonstd::make_unexpected(ec); + } + + return res; + }); + + if (!paths || paths->empty()) { + return nonstd::make_unexpected( + GenericError{std::make_error_code(std::errc::no_such_file_or_directory), + "Cound not find DFS snapshot shard files"}); + } + + return *paths; +} + +error_code GcsSnapshotStorage::CheckPath(const std::string& path) { + return {}; +} + #ifdef WITH_AWS AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https, bool ec2_metadata, bool sign_payload) { diff --git a/src/server/detail/snapshot_storage.h b/src/server/detail/snapshot_storage.h index cda401b52168..e3aa4127ff6e 100644 --- a/src/server/detail/snapshot_storage.h +++ b/src/server/detail/snapshot_storage.h @@ -14,6 +14,8 @@ #include "io/io.h" #include "server/common.h" +#include "util/cloud/gcp/gcp_creds_provider.h" +#include "util/cloud/gcp/gcs.h" #include "util/fibers/fiberqueue_threadpool.h" #include "util/fibers/uring_file.h" @@ -23,6 +25,7 @@ namespace detail { namespace fs = std::filesystem; constexpr std::string_view kS3Prefix = "s3://"; +constexpr std::string_view kGCSPrefix = "gs://"; const size_t kBucketConnectMs = 2000; @@ -77,6 +80,29 @@ class FileSnapshotStorage : public SnapshotStorage { util::fb2::FiberQueueThreadPool* fq_threadpool_; }; +class GcsSnapshotStorage : public SnapshotStorage { + public: + ~GcsSnapshotStorage(); + + std::error_code Init(unsigned connect_ms); + + io::Result, GenericError> OpenWriteFile( + const std::string& path) override; + + io::ReadonlyFileOrError OpenReadFile(const std::string& path) override; + + io::Result LoadPath(std::string_view dir, + std::string_view dbfilename) override; + + private: + io::Result, GenericError> ExpandFromPath(const std::string& path) final; + + std::error_code CheckPath(const std::string& path) final; + + util::cloud::GCPCredsProvider creds_provider_; + SSL_CTX* ctx_ = NULL; +}; + #ifdef WITH_AWS class AwsS3SnapshotStorage : public SnapshotStorage { public: