Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(server): Increase common abstraction usage #511

Merged
merged 1 commit into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,14 @@ bool ScanOpts::Matches(std::string_view val_name) const {
return stringmatchlen(pattern.data(), pattern.size(), val_name.data(), val_name.size(), 0) == 1;
}

std::string GenericError::Format() const {
if (!ec_)
return "";

if (details_.empty())
return ec_.message();
else
return absl::StrCat(ec_.message(), ":", details_);
}

} // namespace dfly
7 changes: 3 additions & 4 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,6 @@ class GenericError {
GenericError(std::error_code ec, std::string details) : ec_{ec}, details_{std::move(details)} {
}

std::pair<std::error_code, const std::string&> Get() const {
return {ec_, details_};
}

std::error_code GetError() const {
return ec_;
}
Expand All @@ -238,6 +234,9 @@ class GenericError {
return bool(ec_);
}

// Get string representation of error.
std::string Format() const;

private:
std::error_code ec_;
std::string details_;
Expand Down
7 changes: 3 additions & 4 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,17 @@ void DebugCmd::Reload(CmdArgList args) {
}
}

error_code ec;

if (save) {
string err_details;
const CommandId* cid = sf_.service().FindCmd("SAVE");
CHECK_NOTNULL(cid);
intrusive_ptr<Transaction> trans(new Transaction{cid});
trans->InitByArgs(0, {});
VLOG(1) << "Performing save";
ec = sf_.DoSave(false, trans.get(), &err_details);

GenericError ec = sf_.DoSave(false, trans.get());
if (ec) {
return (*cntx_)->SendError(absl::StrCat(err_details, ec.message()));
return (*cntx_)->SendError(ec.Format());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ uint32_t DflyCmd::CreateSyncSession() {

unsigned flow_count = shard_set->size() + 1;
auto err_handler = [this, sync_id](const GenericError& err) {
LOG(INFO) << "Replication error: " << err.GetError().message() << " " << err.GetDetails();
LOG(INFO) << "Replication error: " << err.Format();

// Stop replication in case of error.
// StopReplication needs to run async to prevent blocking
Expand Down
7 changes: 2 additions & 5 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1616,8 +1616,7 @@ error_code RdbLoader::Load(io::Source* src) {
} // main load loop

if (stop_early_) {
lock_guard lk(mu_);
return ec_;
return *ec_;
}

/* Verify the checksum if RDB version is >= 5 */
Expand Down Expand Up @@ -1811,9 +1810,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {

for (const auto& item : ib) {
PrimeValue pv;
if (auto ec = Visit(item, &pv); ec) {
lock_guard lk(mu_);
ec_ = ec;
if (ec_ = Visit(item, &pv); ec_) {
stop_early_ = true;
break;
}
Expand Down
3 changes: 1 addition & 2 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ class RdbLoader : protected RdbLoaderBase {

DbIndex cur_db_index_ = 0;

::boost::fibers::mutex mu_;
std::error_code ec_; // guarded by mu_
AggregateError ec_;
std::atomic_bool stop_early_{false};

// Callback when receiving RDB_OPCODE_FULLSYNC_END
Expand Down
51 changes: 27 additions & 24 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,16 +543,14 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec& spec) {
continue;
}

// do the save
string err_details;
error_code ec;
const CommandId* cid = service().FindCmd("SAVE");
CHECK_NOTNULL(cid);
boost::intrusive_ptr<Transaction> trans(new Transaction{cid});
trans->InitByArgs(0, {});
ec = DoSave(false, trans.get(), &err_details);

GenericError ec = DoSave(false, trans.get());
if (ec) {
LOG(WARNING) << "Failed to perform snapshot " << err_details;
LOG(WARNING) << "Failed to perform snapshot " << ec.Format();
}
}
}
Expand Down Expand Up @@ -783,11 +781,14 @@ static void RunStage(bool new_version, std::function<void(unsigned)> cb) {
}
};

using PartialSaveOpts =
romange marked this conversation as resolved.
Show resolved Hide resolved
tuple<const string& /*filename*/, const string& /*path*/, absl::Time /*start*/>;

// Start saving a single snapshot of a multi-file dfly snapshot.
// If shard is null, then this is the summary file.
error_code DoPartialSave(const string& filename, const string& path, absl::Time now,
const dfly::StringVec& scripts, RdbSnapshot* snapshot,
EngineShard* shard) {
error_code DoPartialSave(PartialSaveOpts opts, const dfly::StringVec& scripts,
RdbSnapshot* snapshot, EngineShard* shard) {
auto [filename, path, now] = opts;
// Construct resulting filename.
fs::path file = filename, abs_path = path;
if (shard == nullptr) {
Expand All @@ -809,24 +810,22 @@ error_code DoPartialSave(const string& filename, const string& path, absl::Time
return local_ec;
}

error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* err_details) {
GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
fs::path dir_path(GetFlag(FLAGS_dir));
AggregateError ec;
AggregateGenericError ec;

// Check directory.
if (!dir_path.empty()) {
ec = CreateDirs(dir_path);
if (ec) {
*err_details = "create-dir ";
return *ec;
if (auto local_ec = CreateDirs(dir_path); local_ec) {
return {local_ec, "create-dir"};
}
}

// Manage global state.
GlobalState new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::SAVING);
if (new_state != GlobalState::SAVING) {
*err_details = StrCat(GlobalStateName(new_state), " - can not save database");
return make_error_code(errc::operation_in_progress);
return {make_error_code(errc::operation_in_progress),
StrCat(GlobalStateName(new_state), " - can not save database")};
}
absl::Cleanup rev_state = [this] {
service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE);
Expand Down Expand Up @@ -864,25 +863,29 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er

// Start snapshots.
if (new_version) {
auto file_opts = make_tuple(cref(filename), cref(path), start);

// In the new version (.dfs) we store a file for every shard and one more summary file.
// Summary file is always last in snapshots array.
snapshots.resize(shard_set->size() + 1);

// Save summary file.
{
const auto scripts = script_mgr_->GetLuaScripts();
auto& summary_snapshot = snapshots[shard_set->size()];
summary_snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
if ((ec = DoPartialSave(filename, path, start, scripts, summary_snapshot.get(), nullptr))) {
summary_snapshot.reset();
auto& snapshot = snapshots[shard_set->size()];
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
if (auto local_ec = DoPartialSave(file_opts, scripts, snapshot.get(), nullptr); local_ec) {
ec = local_ec;
snapshot.reset();
}
}

// Save shard files.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto& snapshot = snapshots[shard->shard_id()];
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
if ((ec = DoPartialSave(filename, path, start, {}, snapshot.get(), shard))) {
if (auto local_ec = DoPartialSave(file_opts, {}, snapshot.get(), shard); local_ec) {
ec = local_ec;
snapshot.reset();
}
return OpStatus::OK;
Expand Down Expand Up @@ -950,6 +953,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
// swap - to deallocate the old version outstide of the lock.
last_save_info_.swap(save_info);
}

return *ec;
}

Expand Down Expand Up @@ -1127,10 +1131,9 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
}
}

error_code ec = DoSave(new_version, cntx->transaction, &err_detail);

GenericError ec = DoSave(new_version, cntx->transaction);
if (ec) {
(*cntx)->SendError(absl::StrCat(err_detail, ec.message()));
(*cntx)->SendError(ec.Format());
} else {
(*cntx)->SendOk();
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class ServerFamily {
void StatsMC(std::string_view section, facade::ConnectionContext* cntx);

// if new_version is true, saves DF specific, non redis compatible snapshot.
std::error_code DoSave(bool new_version, Transaction* transaction, std::string* err_details);
GenericError DoSave(bool new_version, Transaction* transaction);

// Burns down and destroy all the data from the database.
// if kDbAll is passed, burns all the databases to the ground.
Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async def check_replication(c_replica):
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys", disconnect_cases)
async def test_disconnect(df_local_factory, t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys):
master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master,logtostdout="")
master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master)
replicas = [
(df_local_factory.create(
port=BASE_PORT+i+1, proactor_threads=t), crash_fs)
Expand Down