From 2f39e8918911221f2e5098c97832dce06e6f3c2d Mon Sep 17 00:00:00 2001 From: Borys Date: Fri, 3 Nov 2023 10:10:16 +0200 Subject: [PATCH] fix: add ability to set snapshot_cron flag during runtime (#2101) * fix: add validating for snapshot_cron flag during runtime * refactor: move warning log to upper level --- src/server/config_registry.cc | 5 +- src/server/server_family.cc | 110 +++++++++++++++++++++---------- src/server/server_family.h | 2 + tests/dragonfly/snapshot_test.py | 23 +++++++ 4 files changed, 104 insertions(+), 36 deletions(-) diff --git a/src/server/config_registry.cc b/src/server/config_registry.cc index 7eda2a5a45ea..5a8041589f58 100644 --- a/src/server/config_registry.cc +++ b/src/server/config_registry.cc @@ -28,9 +28,10 @@ auto ConfigRegistry::Set(std::string_view config_name, std::string_view value) - absl::CommandLineFlag* flag = absl::FindCommandLineFlag(config_name); CHECK(flag); - string error; - if (!flag->ParseFrom(value, &error)) + if (string error; !flag->ParseFrom(value, &error)) { + LOG(WARNING) << error; return SetResult::INVALID; + } bool success = !cb || cb(*flag); return success ? SetResult::OK : SetResult::INVALID; diff --git a/src/server/server_family.cc b/src/server/server_family.cc index 458a7fd283a9..1c64ca895d35 100644 --- a/src/server/server_family.cc +++ b/src/server/server_family.cc @@ -71,6 +71,14 @@ struct ReplicaOfFlag { static bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err); static std::string AbslUnparseFlag(const ReplicaOfFlag& flag); +struct CronExprFlag { + static constexpr std::string_view kCronPrefix = "0 "sv; + std::optional value; +}; + +static bool AbslParseFlag(std::string_view in, CronExprFlag* flag, std::string* err); +static std::string AbslUnparseFlag(const CronExprFlag& flag); + ABSL_FLAG(string, dir, "", "working directory"); ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the DB"); ABSL_FLAG(string, requirepass, "", @@ -78,9 +86,8 @@ ABSL_FLAG(string, requirepass, "", "If empty can also be set with DFLY_PASSWORD environment variable."); ABSL_FLAG(uint32_t, maxclients, 64000, "Maximum number of concurrent clients allowed."); -ABSL_FLAG(string, save_schedule, "", - "glob spec for the UTC time to save a snapshot which matches HH:MM 24h time"); -ABSL_FLAG(string, snapshot_cron, "", +ABSL_FLAG(string, save_schedule, "", "the flag is deprecated, please use snapshot_cron instead"); +ABSL_FLAG(CronExprFlag, snapshot_cron, {}, "cron expression for the time to save a snapshot, crontab style"); ABSL_FLAG(bool, df_snapshot_format, true, "if true, save in dragonfly-specific snapshotting format"); @@ -161,6 +168,37 @@ std::string AbslUnparseFlag(const ReplicaOfFlag& flag) { return (flag.has_value()) ? absl::StrCat(flag.host, ":", flag.port) : ""; } +bool AbslParseFlag(std::string_view in, CronExprFlag* flag, std::string* err) { + if (in.empty()) { + flag->value = std::nullopt; + return true; + } + if (absl::StartsWith(in, "\"")) { + *err = absl::StrCat("Could it be that you put quotes in the flagfile?"); + + return false; + } + + std::string raw_cron_expr = absl::StrCat(CronExprFlag::kCronPrefix, in); + try { + VLOG(1) << "creating cron from: '" << raw_cron_expr << "'"; + flag->value = cron::make_cron(raw_cron_expr); + return true; + } catch (const cron::bad_cronexpr& ex) { + *err = ex.what(); + } + return false; +} + +std::string AbslUnparseFlag(const CronExprFlag& flag) { + if (flag.value) { + auto str_expr = to_cronstr(*flag.value); + DCHECK(absl::StartsWith(str_expr, CronExprFlag::kCronPrefix)); + return str_expr.substr(CronExprFlag::kCronPrefix.size()); + } + return ""; +} + void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service, std::string_view sub_cmd) { size_t requested_slow_log_length = UINT32_MAX; @@ -403,39 +441,30 @@ bool DoesTimeMatchSpecifier(const SnapshotSpec& spec, time_t now) { std::optional InferSnapshotCronExpr() { string save_time = GetFlag(FLAGS_save_schedule); - string snapshot_cron_exp = GetFlag(FLAGS_snapshot_cron); + auto cron_expr = GetFlag(FLAGS_snapshot_cron); - if (!snapshot_cron_exp.empty() && !save_time.empty()) { - LOG(ERROR) << "snapshot_cron and save_schedule flags should not be set simultaneously"; - exit(1); + if (cron_expr.value) { + if (!save_time.empty()) { + LOG(ERROR) << "snapshot_cron and save_schedule flags should not be set simultaneously"; + exit(1); + } + return std::move(cron_expr.value); } - string raw_cron_expr; if (!save_time.empty()) { - std::optional spec = ParseSaveSchedule(save_time); - - if (spec) { + if (std::optional spec = ParseSaveSchedule(save_time); spec) { // Setting snapshot to HH:mm everyday, as specified by `save_schedule` flag - raw_cron_expr = "0 " + spec.value().minute_spec + " " + spec.value().hour_spec + " * * *"; + string raw_cron_expr = absl::StrCat(CronExprFlag::kCronPrefix, spec.value().minute_spec, " ", + spec.value().hour_spec, " * * *"); + try { + VLOG(1) << "creating cron from: `" << raw_cron_expr << "`"; + return cron::make_cron(raw_cron_expr); + } catch (const cron::bad_cronexpr& ex) { + LOG(WARNING) << "Invalid cron expression: " << raw_cron_expr; + } } else { LOG(WARNING) << "Invalid snapshot time specifier " << save_time; } - } else if (!snapshot_cron_exp.empty()) { - if (absl::StartsWith(snapshot_cron_exp, "\"")) { - LOG(WARNING) << "Invalid snapshot cron expression `" << snapshot_cron_exp - << "`, could it be that you put quotes in the flagfile?"; - return nullopt; - } - raw_cron_expr = "0 " + snapshot_cron_exp; - } - - if (!raw_cron_expr.empty()) { - try { - VLOG(1) << "creating cron from: `" << raw_cron_expr << "`"; - return std::optional(cron::make_cron(raw_cron_expr)); - } catch (const cron::bad_cronexpr& ex) { - LOG(WARNING) << "Invalid cron expression: " << raw_cron_expr; - } } return std::nullopt; } @@ -576,8 +605,24 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vectorLaunchFiber([this] { SnapshotScheduling(); }); + const auto create_snapshot_schedule_fb = [this] { + snapshot_schedule_fb_ = + service_.proactor_pool().GetNextProactor()->LaunchFiber([this] { SnapshotScheduling(); }); + }; + config_registry.RegisterMutable( + "snapshot_cron", [this, create_snapshot_schedule_fb](const absl::CommandLineFlag& flag) { + JoinSnapshotSchedule(); + create_snapshot_schedule_fb(); + return true; + }); + + create_snapshot_schedule_fb(); +} + +void ServerFamily::JoinSnapshotSchedule() { + schedule_done_.Notify(); + snapshot_schedule_fb_.JoinIfNeeded(); + schedule_done_.Reset(); } void ServerFamily::Shutdown() { @@ -586,10 +631,7 @@ void ServerFamily::Shutdown() { if (load_result_.valid()) load_result_.wait(); - schedule_done_.Notify(); - if (snapshot_schedule_fb_.IsJoinable()) { - snapshot_schedule_fb_.Join(); - } + JoinSnapshotSchedule(); if (save_on_shutdown_ && !absl::GetFlag(FLAGS_dbfilename).empty()) { shard_set->pool()->GetNextProactor()->Await([this] { diff --git a/src/server/server_family.h b/src/server/server_family.h index a79c0e9e4c91..c168b0878253 100644 --- a/src/server/server_family.h +++ b/src/server/server_family.h @@ -189,6 +189,8 @@ class ServerFamily { void Replicate(std::string_view host, std::string_view port); private: + void JoinSnapshotSchedule(); + uint32_t shard_count() const { return shard_set->size(); } diff --git a/tests/dragonfly/snapshot_test.py b/tests/dragonfly/snapshot_test.py index 87ea293c2008..94b94e739834 100644 --- a/tests/dragonfly/snapshot_test.py +++ b/tests/dragonfly/snapshot_test.py @@ -194,6 +194,29 @@ async def test_snapshot(self, df_seeder_factory, df_server): assert super().get_main_file("test-cron-summary.dfs") +@dfly_args({**BASIC_ARGS, "dbfilename": "test-set-snapshot_cron"}) +class TestSetsnapshot_cron(SnapshotTestBase): + """Test set snapshot_cron flag""" + + @pytest.fixture(autouse=True) + def setup(self, tmp_dir: Path): + super().setup(tmp_dir) + + @pytest.mark.asyncio + @pytest.mark.slow + async def test_snapshot(self, df_seeder_factory, async_client, df_server): + seeder = df_seeder_factory.create( + port=df_server.port, keys=10, multi_transaction_probability=0 + ) + await seeder.run(target_deviation=0.5) + + await async_client.execute_command("CONFIG", "SET", "snapshot_cron", "* * * * *") + + await super().wait_for_save("test-set-snapshot_cron-summary.dfs") + + assert super().get_main_file("test-set-snapshot_cron-summary.dfs") + + @dfly_args({**BASIC_ARGS}) class TestPathEscapes(SnapshotTestBase): """Test that we don't allow path escapes. We just check that df_server.start()