Skip to content

Commit

Permalink
fix(server): Small fixes everywhere
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Nov 22, 2022
1 parent 893c741 commit da55c35
Show file tree
Hide file tree
Showing 9 changed files with 49 additions and 42 deletions.
10 changes: 10 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -240,4 +240,14 @@ bool ScanOpts::Matches(std::string_view val_name) const {
return stringmatchlen(pattern.data(), pattern.size(), val_name.data(), val_name.size(), 0) == 1;
}

std::string GenericError::Format() const {
if (!ec_)
return "";

if (details_.empty())
return ec_.message();
else
return absl::StrCat(ec_.message(), ":", details_);
}

} // namespace dfly
7 changes: 3 additions & 4 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,6 @@ class GenericError {
GenericError(std::error_code ec, std::string details) : ec_{ec}, details_{std::move(details)} {
}

std::pair<std::error_code, const std::string&> Get() const {
return {ec_, details_};
}

std::error_code GetError() const {
return ec_;
}
Expand All @@ -238,6 +234,9 @@ class GenericError {
return bool(ec_);
}

// Get string representation of error.
std::string Format() const;

private:
std::error_code ec_;
std::string details_;
Expand Down
7 changes: 3 additions & 4 deletions src/server/debugcmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -151,18 +151,17 @@ void DebugCmd::Reload(CmdArgList args) {
}
}

error_code ec;

if (save) {
string err_details;
const CommandId* cid = sf_.service().FindCmd("SAVE");
CHECK_NOTNULL(cid);
intrusive_ptr<Transaction> trans(new Transaction{cid});
trans->InitByArgs(0, {});
VLOG(1) << "Performing save";
ec = sf_.DoSave(false, trans.get(), &err_details);

GenericError ec = sf_.DoSave(false, trans.get());
if (ec) {
return (*cntx_)->SendError(absl::StrCat(err_details, ec.message()));
return (*cntx_)->SendError(ec.Format());
}
}

Expand Down
2 changes: 1 addition & 1 deletion src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -399,7 +399,7 @@ uint32_t DflyCmd::CreateSyncSession() {

unsigned flow_count = shard_set->size() + 1;
auto err_handler = [this, sync_id](const GenericError& err) {
LOG(INFO) << "Replication error: " << err.GetError().message() << " " << err.GetDetails();
LOG(INFO) << "Replication error: " << err.Format();

// Stop replication in case of error.
// StopReplication needs to run async to prevent blocking
Expand Down
7 changes: 2 additions & 5 deletions src/server/rdb_load.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1616,8 +1616,7 @@ error_code RdbLoader::Load(io::Source* src) {
} // main load loop

if (stop_early_) {
lock_guard lk(mu_);
return ec_;
return *ec_;
}

/* Verify the checksum if RDB version is >= 5 */
Expand Down Expand Up @@ -1811,9 +1810,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) {

for (const auto& item : ib) {
PrimeValue pv;
if (auto ec = Visit(item, &pv); ec) {
lock_guard lk(mu_);
ec_ = ec;
if (ec_ = Visit(item, &pv); ec_) {
stop_early_ = true;
break;
}
Expand Down
3 changes: 1 addition & 2 deletions src/server/rdb_load.h
Original file line number Diff line number Diff line change
Expand Up @@ -198,8 +198,7 @@ class RdbLoader : protected RdbLoaderBase {

DbIndex cur_db_index_ = 0;

::boost::fibers::mutex mu_;
std::error_code ec_; // guarded by mu_
AggregateError ec_;
std::atomic_bool stop_early_{false};

// Callback when receiving RDB_OPCODE_FULLSYNC_END
Expand Down
51 changes: 27 additions & 24 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -543,16 +543,14 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec& spec) {
continue;
}

// do the save
string err_details;
error_code ec;
const CommandId* cid = service().FindCmd("SAVE");
CHECK_NOTNULL(cid);
boost::intrusive_ptr<Transaction> trans(new Transaction{cid});
trans->InitByArgs(0, {});
ec = DoSave(false, trans.get(), &err_details);

GenericError ec = DoSave(false, trans.get());
if (ec) {
LOG(WARNING) << "Failed to perform snapshot " << err_details;
LOG(WARNING) << "Failed to perform snapshot " << ec.Format();
}
}
}
Expand Down Expand Up @@ -783,11 +781,14 @@ static void RunStage(bool new_version, std::function<void(unsigned)> cb) {
}
};

using PartialSaveOpts =
tuple<const string& /*filename*/, const string& /*path*/, absl::Time /*start*/>;

// Start saving a single snapshot of a multi-file dfly snapshot.
// If shard is null, then this is the summary file.
error_code DoPartialSave(const string& filename, const string& path, absl::Time now,
const dfly::StringVec& scripts, RdbSnapshot* snapshot,
EngineShard* shard) {
error_code DoPartialSave(PartialSaveOpts opts, const dfly::StringVec& scripts,
RdbSnapshot* snapshot, EngineShard* shard) {
auto [filename, path, now] = opts;
// Construct resulting filename.
fs::path file = filename, abs_path = path;
if (shard == nullptr) {
Expand All @@ -809,24 +810,22 @@ error_code DoPartialSave(const string& filename, const string& path, absl::Time
return local_ec;
}

error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* err_details) {
GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) {
fs::path dir_path(GetFlag(FLAGS_dir));
AggregateError ec;
AggregateGenericError ec;

// Check directory.
if (!dir_path.empty()) {
ec = CreateDirs(dir_path);
if (ec) {
*err_details = "create-dir ";
return *ec;
if (auto local_ec = CreateDirs(dir_path); local_ec) {
return {local_ec, "create-dir"};
}
}

// Manage global state.
GlobalState new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::SAVING);
if (new_state != GlobalState::SAVING) {
*err_details = StrCat(GlobalStateName(new_state), " - can not save database");
return make_error_code(errc::operation_in_progress);
return {make_error_code(errc::operation_in_progress),
StrCat(GlobalStateName(new_state), " - can not save database")};
}
absl::Cleanup rev_state = [this] {
service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE);
Expand Down Expand Up @@ -864,25 +863,29 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er

// Start snapshots.
if (new_version) {
auto file_opts = make_tuple(cref(filename), cref(path), start);

// In the new version (.dfs) we store a file for every shard and one more summary file.
// Summary file is always last in snapshots array.
snapshots.resize(shard_set->size() + 1);

// Save summary file.
{
const auto scripts = script_mgr_->GetLuaScripts();
auto& summary_snapshot = snapshots[shard_set->size()];
summary_snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
if ((ec = DoPartialSave(filename, path, start, scripts, summary_snapshot.get(), nullptr))) {
summary_snapshot.reset();
auto& snapshot = snapshots[shard_set->size()];
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
if (auto local_ec = DoPartialSave(file_opts, scripts, snapshot.get(), nullptr); local_ec) {
ec = local_ec;
snapshot.reset();
}
}

// Save shard files.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto& snapshot = snapshots[shard->shard_id()];
snapshot.reset(new RdbSnapshot(fq_threadpool_.get()));
if ((ec = DoPartialSave(filename, path, start, {}, snapshot.get(), shard))) {
if (auto local_ec = DoPartialSave(file_opts, {}, snapshot.get(), shard); local_ec) {
ec = local_ec;
snapshot.reset();
}
return OpStatus::OK;
Expand Down Expand Up @@ -950,6 +953,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er
// swap - to deallocate the old version outstide of the lock.
last_save_info_.swap(save_info);
}

return *ec;
}

Expand Down Expand Up @@ -1127,10 +1131,9 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) {
}
}

error_code ec = DoSave(new_version, cntx->transaction, &err_detail);

GenericError ec = DoSave(new_version, cntx->transaction);
if (ec) {
(*cntx)->SendError(absl::StrCat(err_detail, ec.message()));
(*cntx)->SendError(ec.Format());
} else {
(*cntx)->SendOk();
}
Expand Down
2 changes: 1 addition & 1 deletion src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ class ServerFamily {
void StatsMC(std::string_view section, facade::ConnectionContext* cntx);

// if new_version is true, saves DF specific, non redis compatible snapshot.
std::error_code DoSave(bool new_version, Transaction* transaction, std::string* err_details);
GenericError DoSave(bool new_version, Transaction* transaction);

// Burns down and destroy all the data from the database.
// if kDbAll is passed, burns all the databases to the ground.
Expand Down
2 changes: 1 addition & 1 deletion tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ async def check_replication(c_replica):
@pytest.mark.asyncio
@pytest.mark.parametrize("t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys", disconnect_cases)
async def test_disconnect(df_local_factory, t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys):
master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master,logtostdout="")
master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master)
replicas = [
(df_local_factory.create(
port=BASE_PORT+i+1, proactor_threads=t), crash_fs)
Expand Down

0 comments on commit da55c35

Please sign in to comment.