Skip to content

Commit

Permalink
fix: add ability to set snapshot_cron flag during runtime (#2101)
Browse files Browse the repository at this point in the history
* fix: add validating for snapshot_cron flag during runtime
* refactor: move warning log to upper level
  • Loading branch information
BorysTheDev authored Nov 3, 2023
1 parent 21cc7e9 commit 2f39e89
Show file tree
Hide file tree
Showing 4 changed files with 104 additions and 36 deletions.
5 changes: 3 additions & 2 deletions src/server/config_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,10 @@ auto ConfigRegistry::Set(std::string_view config_name, std::string_view value) -

absl::CommandLineFlag* flag = absl::FindCommandLineFlag(config_name);
CHECK(flag);
string error;
if (!flag->ParseFrom(value, &error))
if (string error; !flag->ParseFrom(value, &error)) {
LOG(WARNING) << error;
return SetResult::INVALID;
}

bool success = !cb || cb(*flag);
return success ? SetResult::OK : SetResult::INVALID;
Expand Down
110 changes: 76 additions & 34 deletions src/server/server_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -71,16 +71,23 @@ struct ReplicaOfFlag {
static bool AbslParseFlag(std::string_view in, ReplicaOfFlag* flag, std::string* err);
static std::string AbslUnparseFlag(const ReplicaOfFlag& flag);

struct CronExprFlag {
static constexpr std::string_view kCronPrefix = "0 "sv;
std::optional<cron::cronexpr> value;
};

static bool AbslParseFlag(std::string_view in, CronExprFlag* flag, std::string* err);
static std::string AbslUnparseFlag(const CronExprFlag& flag);

ABSL_FLAG(string, dir, "", "working directory");
ABSL_FLAG(string, dbfilename, "dump-{timestamp}", "the filename to save/load the DB");
ABSL_FLAG(string, requirepass, "",
"password for AUTH authentication. "
"If empty can also be set with DFLY_PASSWORD environment variable.");
ABSL_FLAG(uint32_t, maxclients, 64000, "Maximum number of concurrent clients allowed.");

ABSL_FLAG(string, save_schedule, "",
"glob spec for the UTC time to save a snapshot which matches HH:MM 24h time");
ABSL_FLAG(string, snapshot_cron, "",
ABSL_FLAG(string, save_schedule, "", "the flag is deprecated, please use snapshot_cron instead");
ABSL_FLAG(CronExprFlag, snapshot_cron, {},
"cron expression for the time to save a snapshot, crontab style");
ABSL_FLAG(bool, df_snapshot_format, true,
"if true, save in dragonfly-specific snapshotting format");
Expand Down Expand Up @@ -161,6 +168,37 @@ std::string AbslUnparseFlag(const ReplicaOfFlag& flag) {
return (flag.has_value()) ? absl::StrCat(flag.host, ":", flag.port) : "";
}

bool AbslParseFlag(std::string_view in, CronExprFlag* flag, std::string* err) {
if (in.empty()) {
flag->value = std::nullopt;
return true;
}
if (absl::StartsWith(in, "\"")) {
*err = absl::StrCat("Could it be that you put quotes in the flagfile?");

return false;
}

std::string raw_cron_expr = absl::StrCat(CronExprFlag::kCronPrefix, in);
try {
VLOG(1) << "creating cron from: '" << raw_cron_expr << "'";
flag->value = cron::make_cron(raw_cron_expr);
return true;
} catch (const cron::bad_cronexpr& ex) {
*err = ex.what();
}
return false;
}

std::string AbslUnparseFlag(const CronExprFlag& flag) {
if (flag.value) {
auto str_expr = to_cronstr(*flag.value);
DCHECK(absl::StartsWith(str_expr, CronExprFlag::kCronPrefix));
return str_expr.substr(CronExprFlag::kCronPrefix.size());
}
return "";
}

void SlowLogGet(dfly::CmdArgList args, dfly::ConnectionContext* cntx, dfly::Service& service,
std::string_view sub_cmd) {
size_t requested_slow_log_length = UINT32_MAX;
Expand Down Expand Up @@ -403,39 +441,30 @@ bool DoesTimeMatchSpecifier(const SnapshotSpec& spec, time_t now) {

std::optional<cron::cronexpr> InferSnapshotCronExpr() {
string save_time = GetFlag(FLAGS_save_schedule);
string snapshot_cron_exp = GetFlag(FLAGS_snapshot_cron);
auto cron_expr = GetFlag(FLAGS_snapshot_cron);

if (!snapshot_cron_exp.empty() && !save_time.empty()) {
LOG(ERROR) << "snapshot_cron and save_schedule flags should not be set simultaneously";
exit(1);
if (cron_expr.value) {
if (!save_time.empty()) {
LOG(ERROR) << "snapshot_cron and save_schedule flags should not be set simultaneously";
exit(1);
}
return std::move(cron_expr.value);
}

string raw_cron_expr;
if (!save_time.empty()) {
std::optional<SnapshotSpec> spec = ParseSaveSchedule(save_time);

if (spec) {
if (std::optional<SnapshotSpec> spec = ParseSaveSchedule(save_time); spec) {
// Setting snapshot to HH:mm everyday, as specified by `save_schedule` flag
raw_cron_expr = "0 " + spec.value().minute_spec + " " + spec.value().hour_spec + " * * *";
string raw_cron_expr = absl::StrCat(CronExprFlag::kCronPrefix, spec.value().minute_spec, " ",
spec.value().hour_spec, " * * *");
try {
VLOG(1) << "creating cron from: `" << raw_cron_expr << "`";
return cron::make_cron(raw_cron_expr);
} catch (const cron::bad_cronexpr& ex) {
LOG(WARNING) << "Invalid cron expression: " << raw_cron_expr;
}
} else {
LOG(WARNING) << "Invalid snapshot time specifier " << save_time;
}
} else if (!snapshot_cron_exp.empty()) {
if (absl::StartsWith(snapshot_cron_exp, "\"")) {
LOG(WARNING) << "Invalid snapshot cron expression `" << snapshot_cron_exp
<< "`, could it be that you put quotes in the flagfile?";
return nullopt;
}
raw_cron_expr = "0 " + snapshot_cron_exp;
}

if (!raw_cron_expr.empty()) {
try {
VLOG(1) << "creating cron from: `" << raw_cron_expr << "`";
return std::optional<cron::cronexpr>(cron::make_cron(raw_cron_expr));
} catch (const cron::bad_cronexpr& ex) {
LOG(WARNING) << "Invalid cron expression: " << raw_cron_expr;
}
}
return std::nullopt;
}
Expand Down Expand Up @@ -576,8 +605,24 @@ void ServerFamily::Init(util::AcceptServer* acceptor, std::vector<facade::Listen
}
}

snapshot_schedule_fb_ =
service_.proactor_pool().GetNextProactor()->LaunchFiber([this] { SnapshotScheduling(); });
const auto create_snapshot_schedule_fb = [this] {
snapshot_schedule_fb_ =
service_.proactor_pool().GetNextProactor()->LaunchFiber([this] { SnapshotScheduling(); });
};
config_registry.RegisterMutable(
"snapshot_cron", [this, create_snapshot_schedule_fb](const absl::CommandLineFlag& flag) {
JoinSnapshotSchedule();
create_snapshot_schedule_fb();
return true;
});

create_snapshot_schedule_fb();
}

void ServerFamily::JoinSnapshotSchedule() {
schedule_done_.Notify();
snapshot_schedule_fb_.JoinIfNeeded();
schedule_done_.Reset();
}

void ServerFamily::Shutdown() {
Expand All @@ -586,10 +631,7 @@ void ServerFamily::Shutdown() {
if (load_result_.valid())
load_result_.wait();

schedule_done_.Notify();
if (snapshot_schedule_fb_.IsJoinable()) {
snapshot_schedule_fb_.Join();
}
JoinSnapshotSchedule();

if (save_on_shutdown_ && !absl::GetFlag(FLAGS_dbfilename).empty()) {
shard_set->pool()->GetNextProactor()->Await([this] {
Expand Down
2 changes: 2 additions & 0 deletions src/server/server_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,8 @@ class ServerFamily {
void Replicate(std::string_view host, std::string_view port);

private:
void JoinSnapshotSchedule();

uint32_t shard_count() const {
return shard_set->size();
}
Expand Down
23 changes: 23 additions & 0 deletions tests/dragonfly/snapshot_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,29 @@ async def test_snapshot(self, df_seeder_factory, df_server):
assert super().get_main_file("test-cron-summary.dfs")


@dfly_args({**BASIC_ARGS, "dbfilename": "test-set-snapshot_cron"})
class TestSetsnapshot_cron(SnapshotTestBase):
"""Test set snapshot_cron flag"""

@pytest.fixture(autouse=True)
def setup(self, tmp_dir: Path):
super().setup(tmp_dir)

@pytest.mark.asyncio
@pytest.mark.slow
async def test_snapshot(self, df_seeder_factory, async_client, df_server):
seeder = df_seeder_factory.create(
port=df_server.port, keys=10, multi_transaction_probability=0
)
await seeder.run(target_deviation=0.5)

await async_client.execute_command("CONFIG", "SET", "snapshot_cron", "* * * * *")

await super().wait_for_save("test-set-snapshot_cron-summary.dfs")

assert super().get_main_file("test-set-snapshot_cron-summary.dfs")


@dfly_args({**BASIC_ARGS})
class TestPathEscapes(SnapshotTestBase):
"""Test that we don't allow path escapes. We just check that df_server.start()
Expand Down

0 comments on commit 2f39e89

Please sign in to comment.