-
Notifications
You must be signed in to change notification settings - Fork 999
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): Rewrite journal commands (basic) #651
Changes from 4 commits
c139bc8
30c170a
00f3718
3490965
aa49656
9221c03
4f8bd48
4eeb6c8
208ba4b
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,7 +19,9 @@ extern "C" { | |
} | ||
#include "base/logging.h" | ||
#include "core/compact_object.h" | ||
#include "server/engine_shard_set.h" | ||
#include "server/error.h" | ||
#include "server/journal/journal.h" | ||
#include "server/server_state.h" | ||
|
||
namespace dfly { | ||
|
@@ -190,6 +192,12 @@ bool ParseDouble(string_view src, double* value) { | |
return true; | ||
} | ||
|
||
void OpArgs::RecordJournal(string_view key, ArgSlice args) const { | ||
auto journal = shard->journal(); | ||
CHECK(journal); | ||
journal->RecordEntry(txid, journal::Op::COMMAND, db_cntx.db_index, 1, make_pair(key, args)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this records can be part of multi transaction and in this case we should use MULTI_COMMAND |
||
} | ||
|
||
#define ADD(x) (x) += o.x | ||
|
||
TieredStats& TieredStats::operator+=(const TieredStats& o) { | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -13,6 +13,7 @@ extern "C" { | |
|
||
#include "base/logging.h" | ||
#include "server/engine_shard_set.h" | ||
#include "server/journal/journal.h" | ||
#include "server/server_state.h" | ||
#include "server/tiered_storage.h" | ||
#include "util/fiber_sched_algo.h" | ||
|
@@ -581,27 +582,33 @@ PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue o | |
return it; | ||
} | ||
|
||
OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it, | ||
ExpireIterator expire_it, const ExpireParams& params) { | ||
pair<int64_t, int64_t> DbSlice::ExpireParams::Calculate(int64_t now_ms) const { | ||
int64_t msec = (unit == TimeUnit::SEC) ? value * 1000 : value; | ||
int64_t now_msec = now_ms; | ||
int64_t rel_msec = absolute ? msec - now_msec : msec; | ||
return make_pair(rel_msec, now_msec + rel_msec); | ||
} | ||
|
||
OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it, | ||
ExpireIterator expire_it, const ExpireParams& params) { | ||
DCHECK(params.IsDefined()); | ||
DCHECK(IsValid(prime_it)); | ||
|
||
int64_t msec = (params.unit == TimeUnit::SEC) ? params.value * 1000 : params.value; | ||
int64_t now_msec = cntx.time_now_ms; | ||
int64_t rel_msec = params.absolute ? msec - now_msec : msec; | ||
auto [rel_msec, abs_msec] = params.Calculate(cntx.time_now_ms); | ||
if (rel_msec > kMaxExpireDeadlineSec * 1000) { | ||
return OpStatus::OUT_OF_RANGE; | ||
} | ||
|
||
if (rel_msec <= 0 && !params.persist) { | ||
CHECK(Del(cntx.db_index, prime_it)); | ||
return -1; | ||
} else if (IsValid(expire_it)) { | ||
expire_it->second = FromAbsoluteTime(now_msec + rel_msec); | ||
expire_it->second = FromAbsoluteTime(abs_msec); | ||
return abs_msec; | ||
} else { | ||
UpdateExpire(cntx.db_index, prime_it, params.persist ? 0 : rel_msec + now_msec); | ||
UpdateExpire(cntx.db_index, prime_it, params.persist ? 0 : abs_msec); | ||
return params.persist ? 0 : abs_msec; | ||
} | ||
|
||
return OpStatus::OK; | ||
} | ||
|
||
std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx, | ||
|
@@ -784,6 +791,14 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(const Context& cntx, | |
if (time_t(cntx.time_now_ms) < expire_time) | ||
return make_pair(it, expire_it); | ||
|
||
// Replicate any expiration as DEL command. | ||
// TODO: Pass optional key to skip decoding. | ||
if (auto journal = owner_->journal(); journal) { | ||
string scratch; | ||
auto payload = make_pair("DEL"sv, ArgSlice{it->first.GetSlice(&scratch)}); | ||
journal->RecordEntry(0, journal::Op::COMMAND, cntx.db_index, 1, payload); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do you know to tell if this flow will not happen while executing multi command? |
||
} | ||
|
||
PerformDeletion(it, expire_it, shard_owner(), db.get()); | ||
++events_.expired_keys; | ||
|
||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -19,6 +19,7 @@ extern "C" { | |
#include "server/container_utils.h" | ||
#include "server/engine_shard_set.h" | ||
#include "server/error.h" | ||
#include "server/journal/journal.h" | ||
#include "server/rdb_load.h" | ||
#include "server/rdb_save.h" | ||
#include "server/transaction.h" | ||
|
@@ -562,7 +563,6 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys | |
do { | ||
ess->Await(sid, [&] { | ||
OpArgs op_args{EngineShard::tlocal(), 0, db_cntx}; | ||
|
||
OpScan(op_args, scan_opts, &cursor, keys); | ||
}); | ||
if (cursor == 0) { | ||
|
@@ -587,7 +587,21 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP | |
if (!IsValid(it)) | ||
return OpStatus::KEY_NOTFOUND; | ||
|
||
return db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params); | ||
auto res = db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params); | ||
|
||
// If the value was deleted, replicate as DEL. | ||
// Else, replicate as PEXPIREAT with exact time. | ||
if (auto journal = op_args.shard->journal(); journal && res.ok()) { | ||
if (res.value() == -1) { | ||
op_args.RecordJournal("DEL"sv, ArgSlice{key}); | ||
} else { | ||
auto time = absl::StrCat(res.value()); | ||
// TODO: Don't forget to change this when adding arguments to expire commands. | ||
op_args.RecordJournal("PEXPIREAT"sv, ArgSlice{time}); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. where is the key in PEXPIREAT journal write? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Damn 😓 I'll fix it in the next PR |
||
} | ||
} | ||
|
||
return res.status(); | ||
} | ||
|
||
} // namespace | ||
|
@@ -1397,12 +1411,13 @@ void GenericFamily::Register(CommandRegistry* registry) { | |
<< CI{"ECHO", CO::LOADING | CO::FAST, 2, 0, 0, 0}.HFUNC(Echo) | ||
<< CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists) | ||
<< CI{"TOUCH", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists) | ||
<< CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire) | ||
<< CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt) | ||
<< CI{"EXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Expire) | ||
<< CI{"EXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(ExpireAt) | ||
<< CI{"PERSIST", CO::WRITE | CO::FAST, 2, 1, 1, 1}.HFUNC(Persist) | ||
<< CI{"KEYS", CO::READONLY, 2, 0, 0, 0}.HFUNC(Keys) | ||
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(PexpireAt) | ||
<< CI{"PEXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Pexpire) | ||
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC( | ||
PexpireAt) | ||
<< CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Pexpire) | ||
<< CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename) | ||
<< CI{"RENAMENX", CO::WRITE, 3, 1, 2, 1}.HFUNC(RenameNx) | ||
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select) | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
key -> command_name