Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore(tiering): Range functions + small refactoring #3207

Merged
merged 7 commits into from
Jul 22, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 10 additions & 2 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -817,12 +817,19 @@ OpResult<DbSlice::ItAndUpdater> DbSlice::AddNew(const Context& cntx, string_view
.it = res.it, .exp_it = res.exp_it, .post_updater = std::move(res.post_updater)};
}

pair<int64_t, int64_t> DbSlice::ExpireParams::Calculate(int64_t now_ms) const {
int64_t DbSlice::ExpireParams::Cap(int64_t value, TimeUnit unit) {
return unit == TimeUnit::SEC ? min(value, kMaxExpireDeadlineSec)
: min(value, kMaxExpireDeadlineMs);
}

pair<int64_t, int64_t> DbSlice::ExpireParams::Calculate(uint64_t now_ms, bool cap) const {
if (persist)
return {0, 0};
int64_t msec = (unit == TimeUnit::SEC) ? value * 1000 : value;
int64_t now_msec = now_ms;
int64_t rel_msec = absolute ? msec - now_msec : msec;
if (cap)
rel_msec = Cap(rel_msec, TimeUnit::MSEC);
return make_pair(rel_msec, now_msec + rel_msec);
}

Expand All @@ -838,7 +845,7 @@ OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, Iterator prime_it,
}

auto [rel_msec, abs_msec] = params.Calculate(cntx.time_now_ms);
if (rel_msec > kMaxExpireDeadlineSec * 1000) {
if (rel_msec > kMaxExpireDeadlineMs) {
return OpStatus::OUT_OF_RANGE;
}

Expand Down Expand Up @@ -1399,6 +1406,7 @@ void DbSlice::ClearEntriesOnFlush(absl::Span<const DbIndex> indices, const DbTab

// Wait for delete operations to finish in sync
while (!async && db_ptr->stats.tiered_entries > 0) {
VLOG(0) << db_ptr->stats.tiered_entries;
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

remove?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removed in subsequent

LOG_EVERY_T(ERROR, 0.5) << "Long wait for tiered entry delete on flush";
ThisFiber::SleepFor(1ms);
}
Expand Down
24 changes: 16 additions & 8 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -204,19 +204,27 @@ class DbSlice {
std::function<void(std::string_view, const Context&, const PrimeValue& pv)>;

struct ExpireParams {
int64_t value = INT64_MIN; // undefined

bool absolute = false;
TimeUnit unit = TimeUnit::SEC;
bool persist = false;
int32_t expire_options = 0; // ExpireFlags

bool IsDefined() const {
return persist || value > INT64_MIN;
}

static int64_t Cap(int64_t value, TimeUnit unit);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's a bit hard for me to review with refactoring and fixes.
Can you please split to 2+ PRs with refactoring and then fixes?
or at least reshape commits in this PR into 2 distinct commits (refactoring , fix bug1, fix bug2...)


// Calculate relative and absolue timepoints.
std::pair<int64_t, int64_t> Calculate(int64_t now_msec) const;
std::pair<int64_t, int64_t> Calculate(uint64_t now_msec, bool cap = false) const;

// Return true if relative expiration is in the past
bool IsExpired(uint64_t now_msec) const {
return Calculate(now_msec, false).first < 0;
}

public:
int64_t value = INT64_MIN; // undefined
TimeUnit unit = TimeUnit::SEC;

bool absolute = false;
bool persist = false;
int32_t expire_options = 0; // ExpireFlags
};

DbSlice(uint32_t index, bool caching_mode, EngineShard* owner);
Expand Down
62 changes: 12 additions & 50 deletions src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -248,20 +248,6 @@ OpResult<int64_t> OpIncrBy(const OpArgs& op_args, string_view key, int64_t incr,
return new_val;
}

int64_t AbsExpiryToTtl(int64_t abs_expiry_time, bool as_milli) {
using std::chrono::duration_cast;
using std::chrono::milliseconds;
using std::chrono::seconds;
using std::chrono::system_clock;

if (as_milli) {
return abs_expiry_time -
duration_cast<milliseconds>(system_clock::now().time_since_epoch()).count();
} else {
return abs_expiry_time - duration_cast<seconds>(system_clock::now().time_since_epoch()).count();
}
}

// Returns true if keys were set, false otherwise.
OpStatus OpMSet(const OpArgs& op_args, const ShardArgs& args) {
DCHECK(!args.Empty() && args.Size() % 2 == 0);
Expand Down Expand Up @@ -750,32 +736,17 @@ void StringFamily::Set(CmdArgList args, ConnectionContext* cntx) {
return builder->SendError(InvalidExpireTime("set"));
}

bool is_ms = (opt[0] == 'P');

// for []AT we need to take expiration time as absolute from the value
// given check here and if the time is in the past, return OK but don't
// set it Note that the time pass here for PXAT is in milliseconds, we
// must not change it!
if (absl::EndsWith(opt, "AT")) {
int_arg = AbsExpiryToTtl(int_arg, is_ms);
if (int_arg < 0) {
// this happened in the past, just return, for some reason Redis
// reports OK in this case
return builder->SendStored();
}
}
DbSlice::ExpireParams expiry{
.value = int_arg,
.unit = (opt[0] == 'P') ? TimeUnit::MSEC : TimeUnit::SEC,
.absolute = absl::EndsWith(opt, "AT"),
};

if (is_ms) {
if (int_arg > kMaxExpireDeadlineMs) {
int_arg = kMaxExpireDeadlineMs;
}
} else {
if (int_arg > kMaxExpireDeadlineSec) {
int_arg = kMaxExpireDeadlineSec;
}
int_arg *= 1000;
}
sparams.expire_after_ms = int_arg;
// Redis reports just OK in this case
if (expiry.IsExpired(GetCurrentTimeMs()))
return builder->SendStored();

tie(sparams.expire_after_ms, ignore) = expiry.Calculate(GetCurrentTimeMs(), true);
} else if (parser.Check("_MCFLAGS").ExpectTail(1)) {
sparams.memcache_flags = parser.Next<uint16_t>();
} else {
Expand Down Expand Up @@ -1124,17 +1095,8 @@ void StringFamily::SetExGeneric(bool seconds, CmdArgList args, ConnectionContext

SetCmd::SetParams sparams;
sparams.flags |= SetCmd::SET_EXPIRE_AFTER_MS;
if (seconds) {
if (unit_vals > kMaxExpireDeadlineSec) {
unit_vals = kMaxExpireDeadlineSec;
}
sparams.expire_after_ms = uint64_t(unit_vals) * 1000;
} else {
if (unit_vals > kMaxExpireDeadlineMs) {
unit_vals = kMaxExpireDeadlineMs;
}
sparams.expire_after_ms = unit_vals;
}
sparams.expire_after_ms =
DbSlice::ExpireParams::Cap(unit_vals * (seconds ? 1000 : 1), TimeUnit::MSEC);

cntx->SendError(SetGeneric(cntx, sparams, key, value));
}
Expand Down