Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(snapshot): add support for snapshot recovery from S3 #1839

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
177 changes: 155 additions & 22 deletions src/server/detail/snapshot_storage.cc
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,24 @@
#include <absl/strings/str_replace.h>
#include <absl/strings/strip.h>

#include <regex>

#include "base/logging.h"
#include "io/file_util.h"
#include "server/engine_shard_set.h"
#include "util/cloud/s3.h"
#include "util/fibers/fiber_file.h"
#include "util/uring/uring_file.h"

namespace dfly {
namespace detail {

namespace {

const std::string kTimestampRegex = "([0-9]{4}-[0-9]{2}-[0-9]{2}T[0-9]{2}:[0-9]{2}:[0-9]{2})";

} // namespace

std::optional<std::pair<std::string, std::string>> GetBucketPath(std::string_view path) {
std::string_view clean = absl::StripPrefix(path, kS3Prefix);

Expand Down Expand Up @@ -76,8 +85,8 @@ io::ReadonlyFileOrError FileSnapshotStorage::OpenReadFile(const std::string& pat
#endif
}

std::string FileSnapshotStorage::LoadPath(const std::string_view& dir,
const std::string_view& dbfilename) {
io::Result<std::string, GenericError> FileSnapshotStorage::LoadPath(std::string_view dir,
std::string_view dbfilename) {
if (dbfilename.empty())
return "";

Expand All @@ -88,14 +97,14 @@ std::string FileSnapshotStorage::LoadPath(const std::string_view& dir,
std::error_code file_ec;
data_folder = fs::canonical(dir, file_ec);
if (file_ec) {
LOG(ERROR) << "Data directory error: " << file_ec.message() << " for dir " << dir;
return "";
return nonstd::make_unexpected(GenericError{file_ec, "Data directory error"});
}
}

LOG(INFO) << "Data directory is " << data_folder;
LOG(INFO) << "Load snapshot: Searching for snapshot in directory: " << data_folder;

fs::path fl_path = data_folder.append(dbfilename);
// If we've found an exact match we're done.
if (fs::exists(fl_path))
return fl_path.generic_string();

Expand All @@ -104,7 +113,6 @@ std::string FileSnapshotStorage::LoadPath(const std::string_view& dir,
fl_path += "*";
}
io::Result<io::StatShortVec> short_vec = io::StatFiles(fl_path.generic_string());

if (short_vec) {
// io::StatFiles returns a list of sorted files. Because our timestamp format has the same
// time order and lexicographic order we iterate from the end to find the latest snapshot.
Expand All @@ -114,15 +122,19 @@ std::string FileSnapshotStorage::LoadPath(const std::string_view& dir,
if (it != short_vec->rend())
return it->name;
} else {
LOG(WARNING) << "Could not stat " << fl_path << ", error " << short_vec.error().message();
return nonstd::make_unexpected(
GenericError(short_vec.error(), "Could not stat snapshot directory"));
}
return "";

return nonstd::make_unexpected(GenericError(
std::make_error_code(std::errc::no_such_file_or_directory), "Snapshot not found"));
}

io::Result<std::vector<std::string>> FileSnapshotStorage::LoadPaths(const std::string& load_path) {
io::Result<std::vector<std::string>, GenericError> FileSnapshotStorage::LoadPaths(
const std::string& load_path) {
if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) {
LOG(ERROR) << "Bad filename extension \"" << load_path << "\"";
return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument));
return nonstd::make_unexpected(
GenericError(std::make_error_code(std::errc::invalid_argument), "Bad filename extension"));
}

std::vector<std::string> paths{{load_path}};
Expand All @@ -133,8 +145,9 @@ io::Result<std::vector<std::string>> FileSnapshotStorage::LoadPaths(const std::s
io::Result<io::StatShortVec> files = io::StatFiles(glob);

if (files && files->size() == 0) {
LOG(ERROR) << "Cound not find DFS snapshot shard files";
return nonstd::make_unexpected(std::make_error_code(std::errc::no_such_file_or_directory));
return nonstd::make_unexpected(
GenericError(std::make_error_code(std::errc::no_such_file_or_directory),
"Cound not find DFS shard files"));
}

for (auto& fstat : *files) {
Expand All @@ -147,7 +160,6 @@ io::Result<std::vector<std::string>> FileSnapshotStorage::LoadPaths(const std::s
std::error_code ec;
(void)fs::canonical(path, ec);
if (ec) {
LOG(ERROR) << "Error loading " << load_path << " " << ec.message();
return nonstd::make_unexpected(ec);
}
}
Expand Down Expand Up @@ -182,18 +194,139 @@ io::Result<std::pair<io::Sink*, uint8_t>, GenericError> AwsS3SnapshotStorage::Op
}

io::ReadonlyFileOrError AwsS3SnapshotStorage::OpenReadFile(const std::string& path) {
return nonstd::make_unexpected(std::make_error_code(std::errc::not_supported));
std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(path);
if (!bucket_path) {
return nonstd::make_unexpected(GenericError("Invalid S3 path"));
}
auto [bucket_name, obj_path] = *bucket_path;

util::cloud::S3Bucket bucket{*aws_, bucket_name};
std::error_code ec = bucket.Connect(kBucketConnectMs);
if (ec) {
return nonstd::make_unexpected(GenericError(ec, "Couldn't connect to S3 bucket"));
}
auto res = bucket.OpenReadFile(obj_path);
if (!res) {
return nonstd::make_unexpected(GenericError(res.error(), "Couldn't open file for reading"));
}
return res;
}

io::Result<std::string, GenericError> AwsS3SnapshotStorage::LoadPath(std::string_view dir,
std::string_view dbfilename) {
if (dbfilename.empty())
return "";

std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(dir);
if (!bucket_path) {
return nonstd::make_unexpected(
GenericError{std::make_error_code(std::errc::invalid_argument), "Invalid S3 path"});
}
auto [bucket_name, prefix] = *bucket_path;

util::fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
return proactor->Await([&]() -> io::Result<std::string, GenericError> {
LOG(INFO) << "Load snapshot: Searching for snapshot in S3 path: " << kS3Prefix << bucket_name
<< "/" << prefix;

// Create a regex to match the object keys, substituting the timestamp
// and adding an extension if needed.
fs::path fl_path{prefix};
fl_path.append(dbfilename);
SubstituteFilenameTsPlaceholder(&fl_path, kTimestampRegex);
if (!fl_path.has_extension()) {
fl_path += "(-summary.dfs|.rdb)";
}
const std::regex re(fl_path.string());

// Sort the keys in reverse so the first. Since the timestamp format
// has lexicographic order, the matching snapshot file will be the latest
// snapshot.
io::Result<std::vector<std::string>, GenericError> keys = ListObjects(bucket_name, prefix);
if (!keys) {
return nonstd::make_unexpected(keys.error());
}

std::sort(std::rbegin(*keys), std::rend(*keys));
for (const std::string& key : *keys) {
std::smatch m;
if (std::regex_match(key, m, re)) {
return std::string(kS3Prefix) + bucket_name + "/" + key;
}
}

return nonstd::make_unexpected(GenericError(
std::make_error_code(std::errc::no_such_file_or_directory), "Snapshot not found"));
});
}

std::string AwsS3SnapshotStorage::LoadPath(const std::string_view& dir,
const std::string_view& dbfilename) {
LOG(WARNING) << "Loading snapshots from S3 is not supported";
return "";
io::Result<std::vector<std::string>, GenericError> AwsS3SnapshotStorage::LoadPaths(
const std::string& load_path) {
if (!(absl::EndsWith(load_path, ".rdb") || absl::EndsWith(load_path, "summary.dfs"))) {
return nonstd::make_unexpected(
GenericError(std::make_error_code(std::errc::invalid_argument), "Bad filename extension"));
}

// Find snapshot shard files if we're loading DFS.
if (absl::EndsWith(load_path, "summary.dfs")) {
util::fb2::ProactorBase* proactor = shard_set->pool()->GetNextProactor();
return proactor->Await([&]() -> io::Result<std::vector<std::string>, GenericError> {
std::vector<std::string> paths{{load_path}};

std::optional<std::pair<std::string, std::string>> bucket_path = GetBucketPath(load_path);
if (!bucket_path) {
return nonstd::make_unexpected(
GenericError{std::make_error_code(std::errc::invalid_argument), "Invalid S3 path"});
}
const auto [bucket_name, obj_path] = *bucket_path;

const std::regex re(absl::StrReplaceAll(obj_path, {{"summary", "[0-9]{4}"}}));

// Limit prefix to objects in the same 'directory' as load_path.
const size_t pos = obj_path.find_last_of('/');
const std::string prefix = (pos == std::string_view::npos) ? "" : obj_path.substr(0, pos);
const io::Result<std::vector<std::string>, GenericError> keys =
ListObjects(bucket_name, prefix);
if (!keys) {
return nonstd::make_unexpected(keys.error());
}

for (const std::string& key : *keys) {
std::smatch m;
if (std::regex_match(key, m, re)) {
paths.push_back(std::string(kS3Prefix) + bucket_name + "/" + key);
}
}

if (paths.size() <= 1) {
return nonstd::make_unexpected(
GenericError{std::make_error_code(std::errc::no_such_file_or_directory),
"Cound not find DFS snapshot shard files"});
}

return paths;
});
}

return {{load_path}};
}

io::Result<std::vector<std::string>> AwsS3SnapshotStorage::LoadPaths(const std::string& load_path) {
LOG(WARNING) << "Loading snapshots from S3 is not supported";
return nonstd::make_unexpected(std::make_error_code(std::errc::invalid_argument));
io::Result<std::vector<std::string>, GenericError> AwsS3SnapshotStorage::ListObjects(
std::string_view bucket_name, std::string_view prefix) {
util::cloud::S3Bucket bucket(*aws_, bucket_name);
std::error_code ec = bucket.Connect(kBucketConnectMs);
if (ec) {
return nonstd::make_unexpected(GenericError{ec, "Couldn't connect to S3 bucket"});
}

std::vector<std::string> keys;
ec = bucket.ListAllObjects(
prefix, [&](size_t sz, std::string_view name) { keys.push_back(std::string(name)); });
if (ec) {
return nonstd::make_unexpected(GenericError{ec, "Couldn't list objects in S3 bucket"});
}

return keys;
}

io::Result<size_t> LinuxWriteWrapper::WriteSome(const iovec* v, uint32_t len) {
Expand Down
23 changes: 17 additions & 6 deletions src/server/detail/snapshot_storage.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,12 @@ class SnapshotStorage {
virtual io::ReadonlyFileOrError OpenReadFile(const std::string& path) = 0;

// Returns the path of the RDB file or DFS summary file to load.
virtual std::string LoadPath(const std::string_view& dir, const std::string_view& dbfilename) = 0;
virtual io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) = 0;

// Returns the snapshot paths given the RDB file or DFS summary file path.
virtual io::Result<std::vector<std::string>> LoadPaths(const std::string& load_path) = 0;
virtual io::Result<std::vector<std::string>, GenericError> LoadPaths(
const std::string& load_path) = 0;
};

class FileSnapshotStorage : public SnapshotStorage {
Expand All @@ -57,9 +59,11 @@ class FileSnapshotStorage : public SnapshotStorage {

io::ReadonlyFileOrError OpenReadFile(const std::string& path) override;

std::string LoadPath(const std::string_view& dir, const std::string_view& dbfilename) override;
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) override;

io::Result<std::vector<std::string>> LoadPaths(const std::string& load_path) override;
io::Result<std::vector<std::string>, GenericError> LoadPaths(
const std::string& load_path) override;

private:
util::fb2::FiberQueueThreadPool* fq_threadpool_;
Expand All @@ -74,11 +78,18 @@ class AwsS3SnapshotStorage : public SnapshotStorage {

io::ReadonlyFileOrError OpenReadFile(const std::string& path) override;

std::string LoadPath(const std::string_view& dir, const std::string_view& dbfilename) override;
io::Result<std::string, GenericError> LoadPath(std::string_view dir,
std::string_view dbfilename) override;

io::Result<std::vector<std::string>> LoadPaths(const std::string& load_path) override;
io::Result<std::vector<std::string>, GenericError> LoadPaths(
const std::string& load_path) override;

private:
// List the objects in the given bucket with the given prefix. This must
// run from a proactor.
io::Result<std::vector<std::string>, GenericError> ListObjects(std::string_view bucket_name,
std::string_view prefix);

util::cloud::AWS* aws_;
};

Expand Down
27 changes: 19 additions & 8 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -441,9 +441,18 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
snapshot_storage_ = std::make_shared<detail::FileSnapshotStorage>(nullptr);
}

string load_path = snapshot_storage_->LoadPath(flag_dir, GetFlag(FLAGS_dbfilename));
if (!load_path.empty()) {
load_result_ = Load(load_path);
const auto load_path_result = snapshot_storage_->LoadPath(flag_dir, GetFlag(FLAGS_dbfilename));
if (load_path_result) {
const std::string load_path = *load_path_result;
if (!load_path.empty()) {
load_result_ = Load(load_path);
}
} else {
if (std::error_code(load_path_result.error()) == std::errc::no_such_file_or_directory) {
LOG(WARNING) << "Load snapshot: No snapshot found";
} else {
LOG(ERROR) << "Failed to load snapshot: " << load_path_result.error().Format();
}
}

snapshot_schedule_fb_ =
Expand Down Expand Up @@ -498,10 +507,12 @@ struct AggregateLoadResult {
// Load starts as many fibers as there are files to load each one separately.
// It starts one more fiber that waits for all load fibers to finish and returns the first
// error (if any occured) with a future.
Future<std::error_code> ServerFamily::Load(const std::string& load_path) {
io::Result<std::vector<std::string>> paths_result = snapshot_storage_->LoadPaths(load_path);
Future<GenericError> ServerFamily::Load(const std::string& load_path) {
auto paths_result = snapshot_storage_->LoadPaths(load_path);
if (!paths_result) {
Promise<std::error_code> ec_promise;
LOG(ERROR) << "Failed to load snapshot: " << paths_result.error().Format();

Promise<GenericError> ec_promise;
ec_promise.set_value(paths_result.error());
return ec_promise.get_future();
}
Expand Down Expand Up @@ -543,8 +554,8 @@ Future<std::error_code> ServerFamily::Load(const std::string& load_path) {
load_fibers.push_back(proactor->LaunchFiber(std::move(load_fiber)));
}

Promise<std::error_code> ec_promise;
Future<std::error_code> ec_future = ec_promise.get_future();
Promise<GenericError> ec_promise;
Future<GenericError> ec_future = ec_promise.get_future();

// Run fiber that empties the channel and sets ec_promise.
auto load_join_fiber = [this, aggregated_result, load_fibers = std::move(load_fibers),
Expand Down
4 changes: 2 additions & 2 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ class ServerFamily {

// Load snapshot from file (.rdb file or summary.dfs file) and return
// future with error_code.
Future<std::error_code> Load(const std::string& file_name);
Future<GenericError> Load(const std::string& file_name);

// used within tests.
bool IsSaving() const {
Expand Down Expand Up @@ -220,7 +220,7 @@ class ServerFamily {
void SnapshotScheduling();

Fiber snapshot_schedule_fb_;
Future<std::error_code> load_result_;
Future<GenericError> load_result_;

uint32_t stats_caching_task_ = 0;
Service& service_;
Expand Down