Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: introduce GcsSnapshotStorage #4004

Merged
merged 1 commit into from
Oct 28, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion src/server/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ endif()

cxx_link(dfly_transaction dfly_core strings_lib TRDP::fast_float)
cxx_link(dragonfly_lib dfly_transaction dfly_facade redis_lib ${AWS_LIB} jsonpath
strings_lib html_lib
strings_lib html_lib gcp_lib
http_client_lib absl::random_random TRDP::jsoncons ${ZSTD_LIB} TRDP::lz4
TRDP::croncpp TRDP::flatbuffers)

Expand Down
125 changes: 120 additions & 5 deletions src/server/detail/snapshot_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "base/logging.h"
#include "io/file_util.h"
#include "server/engine_shard_set.h"
#include "util/cloud/gcp/gcs_file.h"
#include "util/fibers/fiber_file.h"

namespace dfly {
Expand All @@ -32,9 +33,19 @@ namespace detail {
using namespace util;
using namespace std;

// Returns bucket_name, obj_path for an s3 path.
optional<pair<string, string>> GetBucketPath(string_view path) {
std::string_view clean = absl::StripPrefix(path, kS3Prefix);
inline bool IsGcsPath(string_view path) {
return absl::StartsWith(path, kGCSPrefix);
}

constexpr string_view kSummarySuffix = "summary.dfs"sv;

pair<string, string> GetBucketPath(string_view path) {
string_view clean = path;
if (absl::StartsWith(clean, kS3Prefix)) {
clean = absl::StripPrefix(clean, kS3Prefix);
} else {
clean = absl::StripPrefix(clean, kGCSPrefix);
}
Comment on lines +44 to +48
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd prefer to write it in a more generic way
const auto& prefix = absl::StartsWith(clean, kS3Prefix) ? kS3Prefix : kGCSPrefix;
clean = absl::StripPrefix(clean, prefix).

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍🏼 will do that in the next PR.


size_t pos = clean.find('/');
if (pos == string_view::npos) {
Expand All @@ -43,7 +54,8 @@ optional<pair<string, string>> GetBucketPath(string_view path) {

string bucket_name{clean.substr(0, pos)};
string obj_path{clean.substr(pos + 1)};
return std::make_pair(std::move(bucket_name), std::move(obj_path));

return make_pair(std::move(bucket_name), std::move(obj_path));
}

#ifdef __linux__
Expand Down Expand Up @@ -156,7 +168,7 @@ io::Result<std::string, GenericError> FileSnapshotStorage::LoadPath(std::string_
return std::difftime(l.last_modified, r.last_modified) < 0;
});
auto it = std::find_if(short_vec->rbegin(), short_vec->rend(), [](const auto& stat) {
return absl::EndsWith(stat.name, ".rdb") || absl::EndsWith(stat.name, "summary.dfs");
return absl::EndsWith(stat.name, ".rdb") || absl::EndsWith(stat.name, kSummarySuffix);
});
if (it != short_vec->rend())
return it->name;
Expand Down Expand Up @@ -192,6 +204,109 @@ error_code FileSnapshotStorage::CheckPath(const string& path) {
return ec;
}

GcsSnapshotStorage::~GcsSnapshotStorage() {
}

error_code GcsSnapshotStorage::Init(unsigned connect_ms) {
fb2::ProactorBase* proactor = fb2::ProactorBase::me();
CHECK(proactor);
error_code ec = creds_provider_.Init(connect_ms, proactor);
if (ec)
return ec;

ctx_ = util::http::TlsClient::CreateSslContext();
return ec;
}

io::Result<std::pair<io::Sink*, uint8_t>, GenericError> GcsSnapshotStorage::OpenWriteFile(
const std::string& path) {
CHECK(ctx_);

pair<string, string> bucket_path = GetBucketPath(path);
fb2::ProactorBase* proactor = fb2::ProactorBase::me();
unique_ptr<http::ClientPool> conn_pool = cloud::GCS::CreateApiConnectionPool(ctx_, proactor);
cloud::GcsWriteFileOptions opts;
opts.creds_provider = &creds_provider_;
opts.pool = conn_pool.release();
opts.pool_owned = true;

io::Result<io::WriteFile*> dest_res =
cloud::OpenWriteGcsFile(bucket_path.first, bucket_path.second, opts);
if (!dest_res) {
return nonstd::make_unexpected(GenericError(dest_res.error(), "Could not open file"));
}

return std::pair(*dest_res, FileType::CLOUD);
}

io::ReadonlyFileOrError GcsSnapshotStorage::OpenReadFile(const std::string& path) {
if (!IsGcsPath(path))
return nonstd::make_unexpected(GenericError("Invalid GCS path"));

auto [bucket, key] = GetBucketPath(path);
fb2::ProactorBase* proactor = fb2::ProactorBase::me();
unique_ptr<http::ClientPool> conn_pool = cloud::GCS::CreateApiConnectionPool(ctx_, proactor);
cloud::GcsReadFileOptions opts;
opts.creds_provider = &creds_provider_;
opts.pool = conn_pool.release();
opts.pool_owned = true;

return cloud::OpenReadGcsFile(bucket, key, opts);
}

io::Result<std::string, GenericError> GcsSnapshotStorage::LoadPath(std::string_view dir,
std::string_view dbfilename) {
LOG(ERROR) << "Load snapshot: Searching for snapshot in GCS path: " << dir << "/" << dbfilename;
return nonstd::make_unexpected(GenericError("Not supported"));
}

io::Result<vector<string>, GenericError> GcsSnapshotStorage::ExpandFromPath(
const string& load_path) {
if (!IsGcsPath(load_path))
return nonstd::make_unexpected(
GenericError(make_error_code(errc::invalid_argument), "Invalid GCS path"));

if (!absl::EndsWith(load_path, kSummarySuffix))
return vector<string>{};

const auto [bucket_name, obj_path] = GetBucketPath(load_path);
regex re(absl::StrReplaceAll(obj_path, {{"summary", "[0-9]{4}"}}));
string_view prefix = absl::StripSuffix(obj_path, kSummarySuffix);

// Find snapshot shard files if we're loading DFS.
fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
auto paths = proactor->Await([&]() -> io::Result<vector<string>, GenericError> {
vector<string> res;
cloud::GCS gcs(&creds_provider_, ctx_, proactor);

error_code ec = gcs.List(bucket_name, prefix, false, [&](const cloud::GCS::ObjectItem& item) {
std::smatch m;
string key{item.key};
if (std::regex_match(key, m, re)) {
res.push_back(absl::StrCat(kGCSPrefix, bucket_name, "/", item.key));
}
});

if (ec) {
return nonstd::make_unexpected(ec);
}

return res;
});

if (!paths || paths->empty()) {
return nonstd::make_unexpected(
GenericError{std::make_error_code(std::errc::no_such_file_or_directory),
"Cound not find DFS snapshot shard files"});
}

return *paths;
}

error_code GcsSnapshotStorage::CheckPath(const std::string& path) {
return {};
}

#ifdef WITH_AWS
AwsS3SnapshotStorage::AwsS3SnapshotStorage(const std::string& endpoint, bool https,
bool ec2_metadata, bool sign_payload) {
Expand Down
26 changes: 26 additions & 0 deletions src/server/detail/snapshot_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

#include "io/io.h"
#include "server/common.h"
#include "util/cloud/gcp/gcp_creds_provider.h"
#include "util/cloud/gcp/gcs.h"
#include "util/fibers/fiberqueue_threadpool.h"
#include "util/fibers/uring_file.h"

Expand All @@ -23,6 +25,7 @@ namespace detail {
namespace fs = std::filesystem;

constexpr std::string_view kS3Prefix = "s3://";
constexpr std::string_view kGCSPrefix = "gs://";

const size_t kBucketConnectMs = 2000;

Expand Down Expand Up @@ -77,6 +80,29 @@ class FileSnapshotStorage : public SnapshotStorage {
util::fb2::FiberQueueThreadPool* fq_threadpool_;
};

class GcsSnapshotStorage : public SnapshotStorage {
public:
~GcsSnapshotStorage();

std::error_code Init(unsigned connect_ms);

io::Result<std::pair<io::Sink*, uint8_t>, GenericError> OpenWriteFile(
const std::string& path) override;

io::ReadonlyFileOrError OpenReadFile(const std::string& path) override;

io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) override;

private:
io::Result<std::vector<std::string>, GenericError> ExpandFromPath(const std::string& path) final;

std::error_code CheckPath(const std::string& path) final;

util::cloud::GCPCredsProvider creds_provider_;
SSL_CTX* ctx_ = NULL;
};

#ifdef WITH_AWS
class AwsS3SnapshotStorage : public SnapshotStorage {
public:
Expand Down
Loading