diff --git a/src/server/common.cc b/src/server/common.cc index 8f15890c8119..62046a5c4c6b 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -240,4 +240,14 @@ bool ScanOpts::Matches(std::string_view val_name) const { return stringmatchlen(pattern.data(), pattern.size(), val_name.data(), val_name.size(), 0) == 1; } +std::string GenericError::Format() const { + if (!ec_) + return ""; + + if (details_.empty()) + return ec_.message(); + else + return absl::StrCat(ec_.message(), ":", details_); +} + } // namespace dfly diff --git a/src/server/common.h b/src/server/common.h index 75dd77e4ddb3..e0e74bdfbfb5 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -222,10 +222,6 @@ class GenericError { GenericError(std::error_code ec, std::string details) : ec_{ec}, details_{std::move(details)} { } - std::pair Get() const { - return {ec_, details_}; - } - std::error_code GetError() const { return ec_; } @@ -238,6 +234,9 @@ class GenericError { return bool(ec_); } + // Get string representation of error. + std::string Format() const; + private: std::error_code ec_; std::string details_; diff --git a/src/server/debugcmd.cc b/src/server/debugcmd.cc index de14dade9362..1b8f7c532ce6 100644 --- a/src/server/debugcmd.cc +++ b/src/server/debugcmd.cc @@ -151,8 +151,6 @@ void DebugCmd::Reload(CmdArgList args) { } } - error_code ec; - if (save) { string err_details; const CommandId* cid = sf_.service().FindCmd("SAVE"); @@ -160,9 +158,10 @@ void DebugCmd::Reload(CmdArgList args) { intrusive_ptr trans(new Transaction{cid}); trans->InitByArgs(0, {}); VLOG(1) << "Performing save"; - ec = sf_.DoSave(false, trans.get(), &err_details); + + GenericError ec = sf_.DoSave(false, trans.get()); if (ec) { - return (*cntx_)->SendError(absl::StrCat(err_details, ec.message())); + return (*cntx_)->SendError(ec.Format()); } } diff --git a/src/server/dflycmd.cc b/src/server/dflycmd.cc index bbdb0a51f6fd..544119203fb6 100644 --- a/src/server/dflycmd.cc +++ b/src/server/dflycmd.cc @@ -399,7 +399,7 @@ uint32_t DflyCmd::CreateSyncSession() { unsigned flow_count = shard_set->size() + 1; auto err_handler = [this, sync_id](const GenericError& err) { - LOG(INFO) << "Replication error: " << err.GetError().message() << " " << err.GetDetails(); + LOG(INFO) << "Replication error: " << err.Format(); // Stop replication in case of error. // StopReplication needs to run async to prevent blocking diff --git a/src/server/rdb_load.cc b/src/server/rdb_load.cc index 8a1f85124b45..1f738187e100 100644 --- a/src/server/rdb_load.cc +++ b/src/server/rdb_load.cc @@ -1616,8 +1616,7 @@ error_code RdbLoader::Load(io::Source* src) { } // main load loop if (stop_early_) { - lock_guard lk(mu_); - return ec_; + return *ec_; } /* Verify the checksum if RDB version is >= 5 */ @@ -1811,9 +1810,7 @@ void RdbLoader::LoadItemsBuffer(DbIndex db_ind, const ItemsBuf& ib) { for (const auto& item : ib) { PrimeValue pv; - if (auto ec = Visit(item, &pv); ec) { - lock_guard lk(mu_); - ec_ = ec; + if (ec_ = Visit(item, &pv); ec_) { stop_early_ = true; break; } diff --git a/src/server/rdb_load.h b/src/server/rdb_load.h index b8d04bb6429c..ff998f06eb6f 100644 --- a/src/server/rdb_load.h +++ b/src/server/rdb_load.h @@ -198,8 +198,7 @@ class RdbLoader : protected RdbLoaderBase { DbIndex cur_db_index_ = 0; - ::boost::fibers::mutex mu_; - std::error_code ec_; // guarded by mu_ + AggregateError ec_; std::atomic_bool stop_early_{false}; // Callback when receiving RDB_OPCODE_FULLSYNC_END diff --git a/src/server/server_family.cc b/src/server/server_family.cc index c9cf6a95a33e..be3c92b128a2 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -543,16 +543,14 @@ void ServerFamily::SnapshotScheduling(const SnapshotSpec& spec) { continue; } - // do the save - string err_details; - error_code ec; const CommandId* cid = service().FindCmd("SAVE"); CHECK_NOTNULL(cid); boost::intrusive_ptr trans(new Transaction{cid}); trans->InitByArgs(0, {}); - ec = DoSave(false, trans.get(), &err_details); + + GenericError ec = DoSave(false, trans.get()); if (ec) { - LOG(WARNING) << "Failed to perform snapshot " << err_details; + LOG(WARNING) << "Failed to perform snapshot " << ec.Format(); } } } @@ -783,11 +781,14 @@ static void RunStage(bool new_version, std::function cb) { } }; +using PartialSaveOpts = + tuple; + // Start saving a single snapshot of a multi-file dfly snapshot. // If shard is null, then this is the summary file. -error_code DoPartialSave(const string& filename, const string& path, absl::Time now, - const dfly::StringVec& scripts, RdbSnapshot* snapshot, - EngineShard* shard) { +error_code DoPartialSave(PartialSaveOpts opts, const dfly::StringVec& scripts, + RdbSnapshot* snapshot, EngineShard* shard) { + auto [filename, path, now] = opts; // Construct resulting filename. fs::path file = filename, abs_path = path; if (shard == nullptr) { @@ -809,24 +810,22 @@ error_code DoPartialSave(const string& filename, const string& path, absl::Time return local_ec; } -error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* err_details) { +GenericError ServerFamily::DoSave(bool new_version, Transaction* trans) { fs::path dir_path(GetFlag(FLAGS_dir)); - AggregateError ec; + AggregateGenericError ec; // Check directory. if (!dir_path.empty()) { - ec = CreateDirs(dir_path); - if (ec) { - *err_details = "create-dir "; - return *ec; + if (auto local_ec = CreateDirs(dir_path); local_ec) { + return {local_ec, "create-dir"}; } } // Manage global state. GlobalState new_state = service_.SwitchState(GlobalState::ACTIVE, GlobalState::SAVING); if (new_state != GlobalState::SAVING) { - *err_details = StrCat(GlobalStateName(new_state), " - can not save database"); - return make_error_code(errc::operation_in_progress); + return {make_error_code(errc::operation_in_progress), + StrCat(GlobalStateName(new_state), " - can not save database")}; } absl::Cleanup rev_state = [this] { service_.SwitchState(GlobalState::SAVING, GlobalState::ACTIVE); @@ -864,6 +863,8 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er // Start snapshots. if (new_version) { + auto file_opts = make_tuple(cref(filename), cref(path), start); + // In the new version (.dfs) we store a file for every shard and one more summary file. // Summary file is always last in snapshots array. snapshots.resize(shard_set->size() + 1); @@ -871,10 +872,11 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er // Save summary file. { const auto scripts = script_mgr_->GetLuaScripts(); - auto& summary_snapshot = snapshots[shard_set->size()]; - summary_snapshot.reset(new RdbSnapshot(fq_threadpool_.get())); - if ((ec = DoPartialSave(filename, path, start, scripts, summary_snapshot.get(), nullptr))) { - summary_snapshot.reset(); + auto& snapshot = snapshots[shard_set->size()]; + snapshot.reset(new RdbSnapshot(fq_threadpool_.get())); + if (auto local_ec = DoPartialSave(file_opts, scripts, snapshot.get(), nullptr); local_ec) { + ec = local_ec; + snapshot.reset(); } } @@ -882,7 +884,8 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er auto cb = [&](Transaction* t, EngineShard* shard) { auto& snapshot = snapshots[shard->shard_id()]; snapshot.reset(new RdbSnapshot(fq_threadpool_.get())); - if ((ec = DoPartialSave(filename, path, start, {}, snapshot.get(), shard))) { + if (auto local_ec = DoPartialSave(file_opts, {}, snapshot.get(), shard); local_ec) { + ec = local_ec; snapshot.reset(); } return OpStatus::OK; @@ -950,6 +953,7 @@ error_code ServerFamily::DoSave(bool new_version, Transaction* trans, string* er // swap - to deallocate the old version outstide of the lock. last_save_info_.swap(save_info); } + return *ec; } @@ -1127,10 +1131,9 @@ void ServerFamily::Save(CmdArgList args, ConnectionContext* cntx) { } } - error_code ec = DoSave(new_version, cntx->transaction, &err_detail); - + GenericError ec = DoSave(new_version, cntx->transaction); if (ec) { - (*cntx)->SendError(absl::StrCat(err_detail, ec.message())); + (*cntx)->SendError(ec.Format()); } else { (*cntx)->SendOk(); } diff --git a/src/server/server_family.h b/src/server/server_family.h index 8197a990a979..7490c8cf30cd 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -81,7 +81,7 @@ class ServerFamily { void StatsMC(std::string_view section, facade::ConnectionContext* cntx); // if new_version is true, saves DF specific, non redis compatible snapshot. - std::error_code DoSave(bool new_version, Transaction* transaction, std::string* err_details); + GenericError DoSave(bool new_version, Transaction* transaction); // Burns down and destroy all the data from the database. // if kDbAll is passed, burns all the databases to the ground. diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 3eb02b8d3de7..0efb21fcfcf2 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -117,7 +117,7 @@ async def check_replication(c_replica): @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys", disconnect_cases) async def test_disconnect(df_local_factory, t_master, t_crash_fs, t_crash_ss, t_disonnect, n_keys): - master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master,logtostdout="") + master = df_local_factory.create(port=BASE_PORT, proactor_threads=t_master) replicas = [ (df_local_factory.create( port=BASE_PORT+i+1, proactor_threads=t), crash_fs)