Skip to content

Commit

Permalink
feat(snapshot): add support for loading snapshots from s3
Browse files Browse the repository at this point in the history
  • Loading branch information
andydunstall committed Sep 10, 2023
1 parent 5c95820 commit b84c68f
Show file tree
Hide file tree
Showing 2 changed files with 127 additions and 5 deletions.
128 changes: 123 additions & 5 deletions src/server/detail/snapshot_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@
#include <absl/strings/str_replace.h>
#include <absl/strings/strip.h>

#include <regex>

#include "base/logging.h"
#include "io/file_util.h"
#include "server/engine_shard_set.h"
#include "util/cloud/s3.h"
#include "util/fibers/fiber_file.h"
#include "util/uring/uring_file.h"

namespace dfly {
namespace detail {

namespace {

const std::string kTimestampRegex = "([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})";

} // namespace

std::optional<std::pair<std::string, std::string>> GetBucketPath(std::string_view path) {
std::string_view clean = absl::StripPrefix(path, kS3Prefix);

Expand Down Expand Up @@ -182,18 +191,127 @@ io::Result<std::pair<io::Sink*, uint8_t>, GenericError> AwsS3SnapshotStorage::Op
}

io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile(const std::string& path) {
return nonstd::make_unexpected(std::make_error_code(std::errc::not_supported));
std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(path);
if (!bucket_path) {
return nonstd::make_unexpected(GenericError("Invalid S3 path"));
}
auto [bucket_name, obj_path] = *bucket_path;

util::cloud::S3Bucket bucket{*aws_, bucket_name};
std::error_code ec = bucket.Connect(kBucketConnectMs);
if (ec) {
return nonstd::make_unexpected(GenericError(ec, "Couldn't connect to S3 bucket"));
}
auto res = bucket.OpenReadFile(obj_path);
if (!res) {
return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for reading"));
}
return res;
}

std::string AwsS3SnapshotStorage::LoadPath(const std::string_view& dir,
const std::string_view& dbfilename) {
LOG(WARNING) << "Loading snapshots from S3 is not supported";
return "";
if (dbfilename.empty())
return "";

std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(dir);
if (!bucket_path) {
LOG(ERROR) << "Invalid S3 path: " << dir;
return "";
}
auto [bucket_name, prefix] = *bucket_path;

util::fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
return proactor->Await([&]() -> std::string {
LOG(INFO) << "Loading from S3 path s3://" << bucket_name << "/" << prefix;

// Create a regex to match the object keys, substituting the timestamp
// and adding an extension if needed.
fs::path fl_path{prefix};
fl_path.append(dbfilename);
SubstituteFilenameTsPlaceholder(&fl_path, kTimestampRegex);
if (!fl_path.has_extension()) {
fl_path += "(-summary.dfs|.rdb)";
}
const std::regex re(fl_path.string());

// Sort the keys in reverse so the first. Since the timestamp format
// has lexicographic order, the matching snapshot file will be the latest
// snapshot.
std::vector<std::string> keys = ListObjects(bucket_name, prefix);
std::sort(std::rbegin(keys), std::rend(keys));
for (const std::string& key : keys) {
std::smatch m;
if (std::regex_match(key, m, re)) {
return std::string(kS3Prefix) + bucket_name + "/" + key;
}
}

LOG(WARNING) << "Couldn't find a snapshot in S3 bucket " << dir;
return "";
});
}

io::Result<std::vector<std::string>> AwsS3SnapshotStorage::LoadPaths(const std::string& load_path) {
LOG(WARNING) << "Loading snapshots from S3 is not supported";
return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument));
if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) {
LOG(ERROR) << "Bad filename extension \"" << load_path << "\"";
return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument));
}

std::vector<std::string> paths{{load_path}};

// Find snapshot shard files if we're loading DFS.
if (absl::EndsWith(load_path, "summary.dfs")) {
util::fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
proactor->Await([&]() {
std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(load_path);
if (!bucket_path) {
LOG(ERROR) << "Invalid S3 path: " << load_path;
return;
}
const auto [bucket_name, obj_path] = *bucket_path;

const std::regex re(absl::StrReplaceAll(obj_path, {{"summary", "[0-9]{4}"}}));

// Limit prefix to objects in the same 'directory' as load_path.
const size_t pos = obj_path.find_last_of('/');
const std::string prefix = (pos == std::string_view::npos) ? "" : obj_path.substr(0, pos);
const std::vector<std::string> keys = ListObjects(bucket_name, prefix);
for (const std::string& key : keys) {
std::smatch m;
if (std::regex_match(key, m, re)) {
paths.push_back(std::string(kS3Prefix) + bucket_name + "/" + key);
}
}
});

if (paths.size() <= 1) {
LOG(ERROR) << "Cound not find DFS snapshot shard files";
return nonstd::make_unexpected(std::make_error_code(std::errc::no_such_file_or_directory));
}
}

return paths;
}

std::vector<std::string> AwsS3SnapshotStorage::ListObjects(const std::string& bucket_name,
const std::string& prefix) {
util::cloud::S3Bucket bucket(*aws_, bucket_name);
std::error_code ec = bucket.Connect(kBucketConnectMs);
if (ec) {
LOG(ERROR) << "Couldn't connect to S3 bucket: " << ec.message();
return {};
}

std::vector<std::string> keys;
ec = bucket.ListAllObjects(
prefix, [&](size_t sz, std::string_view name) { keys.push_back(std::string(name)); });
if (ec) {
LOG(ERROR) << "Couldn't list objects in S3 bucket: " << ec.message();
return {};
}

return keys;
}

io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
Expand Down
4 changes: 4 additions & 0 deletions src/server/detail/snapshot_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,10 @@ class AwsS3SnapshotStorage : public SnapshotStorage {
io::Result<std::vector<std::string>> LoadPaths(const std::string& load_path) override;

private:
// List the objects in the given bucket with the given prefix. This must
// run from a proactor.
std::vector<std::string> ListObjects(const std::string& bucket_name, const std::string& prefix);

util::cloud::AWS* aws_;
};

Expand Down

0 comments on commit b84c68f

Please sign in to comment.