Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Rewrite journal commands (basic) #651

Merged
merged 9 commits into from
Jan 16, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions src/server/command_registry.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,8 @@ const char* OptName(CO::CommandOpt fl) {
return "global-trans";
case VARIADIC_KEYS:
return "variadic-keys";
case NO_AUTOJOURNAL:
return "custom-journal";
}
return "unknown";
}
Expand Down
29 changes: 15 additions & 14 deletions src/server/command_registry.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,21 @@ class ConnectionContext;
namespace CO {

enum CommandOpt : uint32_t {
READONLY = 1,
FAST = 2,
WRITE = 4,
LOADING = 8,
DENYOOM = 0x10, // use-memory in redis.
REVERSE_MAPPING = 0x20,

// arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.
VARIADIC_KEYS = 0x40,

ADMIN = 0x80, // implies NOSCRIPT,
NOSCRIPT = 0x100,
BLOCKING = 0x200, // implies REVERSE_MAPPING
GLOBAL_TRANS = 0x1000,
READONLY = 1U << 0,
FAST = 1U << 1,
WRITE = 1U << 2,
LOADING = 1U << 3,
DENYOOM = 1U << 4, // use-memory in redis.
REVERSE_MAPPING = 1U << 5,

VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc.

ADMIN = 1U << 7, // implies NOSCRIPT,
NOSCRIPT = 1U << 8,
BLOCKING = 1U << 9, // implies REVERSE_MAPPING
GLOBAL_TRANS = 1U << 12,

NO_AUTOJOURNAL = 1U << 15, // Skip automatically logging command to journal inside transaction.
};

const char* OptName(CommandOpt fl);
Expand Down
8 changes: 8 additions & 0 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@ extern "C" {
}
#include "base/logging.h"
#include "core/compact_object.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/server_state.h"

namespace dfly {
Expand Down Expand Up @@ -190,6 +192,12 @@ bool ParseDouble(string_view src, double* value) {
return true;
}

void OpArgs::RecordJournal(string_view key, ArgSlice args) const {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key -> command_name

auto journal = shard->journal();
CHECK(journal);
journal->RecordEntry(txid, journal::Op::COMMAND, db_cntx.db_index, 1, make_pair(key, args));
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this records can be part of multi transaction and in this case we should use MULTI_COMMAND

}

#define ADD(x) (x) += o.x

TieredStats& TieredStats::operator+=(const TieredStats& o) {
Expand Down
5 changes: 4 additions & 1 deletion src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -87,8 +87,11 @@ struct OpArgs {
OpArgs() : shard(nullptr), txid(0) {
}

OpArgs(EngineShard* s, TxId i, const DbContext& cntx) : shard(s), txid(i), db_cntx(cntx) {
OpArgs(EngineShard* s, TxId txid, const DbContext& cntx) : shard(s), txid(txid), db_cntx(cntx) {
}

// Log single-shard journal command with own txid and dbid.
void RecordJournal(std::string_view cmd, ArgSlice args) const;
};

struct TieredStats {
Expand Down
33 changes: 24 additions & 9 deletions src/server/db_slice.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ extern "C" {

#include "base/logging.h"
#include "server/engine_shard_set.h"
#include "server/journal/journal.h"
#include "server/server_state.h"
#include "server/tiered_storage.h"
#include "util/fiber_sched_algo.h"
Expand Down Expand Up @@ -581,27 +582,33 @@ PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue o
return it;
}

OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
ExpireIterator expire_it, const ExpireParams& params) {
pair<int64_t, int64_t> DbSlice::ExpireParams::Calculate(int64_t now_ms) const {
int64_t msec = (unit == TimeUnit::SEC) ? value * 1000 : value;
int64_t now_msec = now_ms;
int64_t rel_msec = absolute ? msec - now_msec : msec;
return make_pair(rel_msec, now_msec + rel_msec);
}

OpResult<int64_t> DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it,
ExpireIterator expire_it, const ExpireParams& params) {
DCHECK(params.IsDefined());
DCHECK(IsValid(prime_it));

int64_t msec = (params.unit == TimeUnit::SEC) ? params.value * 1000 : params.value;
int64_t now_msec = cntx.time_now_ms;
int64_t rel_msec = params.absolute ? msec - now_msec : msec;
auto [rel_msec, abs_msec] = params.Calculate(cntx.time_now_ms);
if (rel_msec > kMaxExpireDeadlineSec * 1000) {
return OpStatus::OUT_OF_RANGE;
}

if (rel_msec <= 0 && !params.persist) {
CHECK(Del(cntx.db_index, prime_it));
return -1;
} else if (IsValid(expire_it)) {
expire_it->second = FromAbsoluteTime(now_msec + rel_msec);
expire_it->second = FromAbsoluteTime(abs_msec);
return abs_msec;
} else {
UpdateExpire(cntx.db_index, prime_it, params.persist ? 0 : rel_msec + now_msec);
UpdateExpire(cntx.db_index, prime_it, params.persist ? 0 : abs_msec);
return params.persist ? 0 : abs_msec;
}

return OpStatus::OK;
}

std::pair<PrimeIterator, bool> DbSlice::AddOrUpdateInternal(const Context& cntx,
Expand Down Expand Up @@ -784,6 +791,14 @@ pair<PrimeIterator, ExpireIterator> DbSlice::ExpireIfNeeded(const Context& cntx,
if (time_t(cntx.time_now_ms) < expire_time)
return make_pair(it, expire_it);

// Replicate any expiration as DEL command.
// TODO: Pass optional key to skip decoding.
if (auto journal = owner_->journal(); journal) {
string scratch;
auto payload = make_pair("DEL"sv, ArgSlice{it->first.GetSlice(&scratch)});
journal->RecordEntry(0, journal::Op::COMMAND, cntx.db_index, 1, payload);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do you know to tell if this flow will not happen while executing multi command?
If it does, it will break the current impl in replica as if it gets multi command from journal it reads more multi commands till it gets exec. Replica assumes that there is no command op entry that was inserted before the exec op was reached

}

PerformDeletion(it, expire_it, shard_owner(), db.get());
++events_.expired_keys;

Expand Down
9 changes: 7 additions & 2 deletions src/server/db_slice.h
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ class DbSlice {
bool IsDefined() const {
return persist || value > INT64_MIN;
}

// Calculate relative and absolue timepoints.
std::pair<int64_t, int64_t> Calculate(int64_t now_msec) const;
};

DbSlice(uint32_t index, bool caching_mode, EngineShard* owner);
Expand Down Expand Up @@ -171,8 +174,10 @@ class DbSlice {
PrimeIterator AddNew(const Context& cntx, std::string_view key, PrimeValue obj,
uint64_t expire_at_ms) noexcept(false);

facade::OpStatus UpdateExpire(const Context& cntx, PrimeIterator prime_it, ExpireIterator exp_it,
const ExpireParams& params);
// Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry
// already expired and was deleted;
facade::OpResult<int64_t> UpdateExpire(const Context& cntx, PrimeIterator prime_it,
ExpireIterator exp_it, const ExpireParams& params);

// Adds expiry information.
void AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at);
Expand Down
10 changes: 8 additions & 2 deletions src/server/dflycmd.cc
Original file line number Diff line number Diff line change
Expand Up @@ -378,8 +378,14 @@ OpStatus DflyCmd::StartStableSyncInThread(FlowInfo* flow, Context* cntx, EngineS
writer = new JournalWriter{};
auto journal_cb = [flow, cntx, writer](const journal::Entry& je) mutable {
writer->Write(je);
if (auto ec = writer->Flush(flow->conn->socket()); ec)
cntx->ReportError(ec);

// REMOVE THIS AFTER ASYNC STREAMER IS MERGED.
::boost::fibers::fiber{[writer, flow, cntx]() {
if (auto ec = writer->Flush(flow->conn->socket()); ec) {
VLOG(0) << "Failed to flush???";
// cntx->ReportError(ec);
}
}}.detach();
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
};
cb_id = sf_->journal()->RegisterOnChange(std::move(journal_cb));
}
Expand Down
27 changes: 21 additions & 6 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ extern "C" {
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/rdb_load.h"
#include "server/rdb_save.h"
#include "server/transaction.h"
Expand Down Expand Up @@ -562,7 +563,6 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys
do {
ess->Await(sid, [&] {
OpArgs op_args{EngineShard::tlocal(), 0, db_cntx};

OpScan(op_args, scan_opts, &cursor, keys);
});
if (cursor == 0) {
Expand All @@ -587,7 +587,21 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP
if (!IsValid(it))
return OpStatus::KEY_NOTFOUND;

return db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params);
auto res = db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params);

// If the value was deleted, replicate as DEL.
// Else, replicate as PEXPIREAT with exact time.
if (auto journal = op_args.shard->journal(); journal && res.ok()) {
if (res.value() == -1) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
} else {
auto time = absl::StrCat(res.value());
// TODO: Don't forget to change this when adding arguments to expire commands.
op_args.RecordJournal("PEXPIREAT"sv, ArgSlice{time});
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where is the key in PEXPIREAT journal write?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Damn 😓 I'll fix it in the next PR

}
}

return res.status();
}

} // namespace
Expand Down Expand Up @@ -1397,12 +1411,13 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"ECHO", CO::LOADING | CO::FAST, 2, 0, 0, 0}.HFUNC(Echo)
<< CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists)
<< CI{"TOUCH", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists)
<< CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire)
<< CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt)
<< CI{"EXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Expire)
<< CI{"EXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(ExpireAt)
<< CI{"PERSIST", CO::WRITE | CO::FAST, 2, 1, 1, 1}.HFUNC(Persist)
<< CI{"KEYS", CO::READONLY, 2, 0, 0, 0}.HFUNC(Keys)
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(PexpireAt)
<< CI{"PEXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Pexpire)
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(
PexpireAt)
<< CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Pexpire)
<< CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename)
<< CI{"RENAMENX", CO::WRITE, 3, 1, 2, 1}.HFUNC(RenameNx)
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select)
Expand Down
5 changes: 3 additions & 2 deletions src/server/journal/journal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,9 @@ bool Journal::EnterLameDuck() {
return res;
}

void Journal::RecordEntry(const Entry& entry) {
journal_slice.AddLogRecord(entry);
void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt,
Entry::Payload payload) {
journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)});
}

TxId Journal::GetLastTxId() {
Expand Down
3 changes: 2 additions & 1 deletion src/server/journal/journal.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,8 @@ class Journal {
*/
LSN GetLsn() const;

void RecordEntry(const Entry& entry);
void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload);

TxId GetLastTxId();

private:
Expand Down
19 changes: 17 additions & 2 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ extern "C" {
#include "server/container_utils.h"
#include "server/engine_shard_set.h"
#include "server/error.h"
#include "server/journal/journal.h"
#include "server/transaction.h"

ABSL_DECLARE_FLAG(bool, use_set2);
Expand Down Expand Up @@ -959,8 +960,13 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count
return true;
});

/* Delete the set as it is now empty */
// Delete the set as it is now empty
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));

// Replicate as DEL.
if (auto journal = op_args.shard->journal(); journal) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
}
} else {
SetType st{it->second.RObjPtr(), it->second.Encoding()};
db_slice.PreUpdate(op_args.db_cntx.db_index, it);
Expand All @@ -979,6 +985,15 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count
} else {
result = PopStrSet(op_args.db_cntx, count, st);
}

// Replicate as SREM with removed keys, because SPOP is not deterministic.
if (auto journal = op_args.shard->journal(); journal) {
vector<string_view> mapped(result.size() + 1);
mapped[0] = key;
std::copy(result.begin(), result.end(), mapped.begin() + 1);
op_args.RecordJournal("SREM"sv, mapped);
}

db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
}
return result;
Expand Down Expand Up @@ -1545,7 +1560,7 @@ void SetFamily::Register(CommandRegistry* registry) {
<< CI{"SMOVE", CO::FAST | CO::WRITE, 4, 1, 2, 1}.HFUNC(SMove)
<< CI{"SREM", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SRem)
<< CI{"SCARD", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(SCard)
<< CI{"SPOP", CO::WRITE | CO::FAST, -2, 1, 1, 1}.HFUNC(SPop)
<< CI{"SPOP", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -2, 1, 1, 1}.HFUNC(SPop)
<< CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion)
<< CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore)
<< CI{"SSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(SScan);
Expand Down
2 changes: 1 addition & 1 deletion src/server/string_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -211,7 +211,7 @@ OpResult<string> OpGet(const OpArgs& op_args, string_view key, bool del_hit = fa
if (exp_params.IsDefined()) {
DVLOG(1) << "Expire: " << key;
auto& db_slice = op_args.shard->db_slice();
OpStatus status = db_slice.UpdateExpire(op_args.db_cntx, it, it_expire, exp_params);
OpStatus status = db_slice.UpdateExpire(op_args.db_cntx, it, it_expire, exp_params).status();
if (status != OpStatus::OK)
return status;
}
Expand Down
16 changes: 7 additions & 9 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1114,7 +1114,7 @@ void Transaction::ExpireShardCb(EngineShard* shard) {
void Transaction::UnlockMultiShardCb(const std::vector<KeyList>& sharded_keys, EngineShard* shard) {
auto journal = shard->journal();
if (journal != nullptr && journal->GetLastTxId() == txid_) {
journal->RecordEntry(journal::Entry{txid_, journal::Op::EXEC, db_index_, unique_shard_cnt_});
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, unique_shard_cnt_, {});
}

if (multi_->multi_opts & CO::GLOBAL_TRANS) {
Expand Down Expand Up @@ -1242,7 +1242,8 @@ void Transaction::LogJournalOnShard(EngineShard* shard) {
if (shard == nullptr)
return;

if ((cid_->opt_mask() & CO::WRITE) == 0)
// Ignore non-write commands or ones with disabled autojournal.
if ((cid_->opt_mask() & CO::WRITE) == 0 || (cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0)
return;

auto journal = shard->journal();
Expand All @@ -1255,15 +1256,12 @@ void Transaction::LogJournalOnShard(EngineShard* shard) {
CHECK(!cmd_with_full_args_.empty());
entry_payload = cmd_with_full_args_;
} else {
entry_payload =
make_pair(facade::ToSV(cmd_with_full_args_.front()), ShardArgsInShard(shard->shard_id()));
}
journal::Op opcode = journal::Op::COMMAND;
if (multi_) {
opcode = journal::Op::MULTI_COMMAND;
auto cmd = facade::ToSV(cmd_with_full_args_.front());
entry_payload = make_pair(cmd, ShardArgsInShard(shard->shard_id()));
}

journal->RecordEntry(journal::Entry{txid_, opcode, db_index_, unique_shard_cnt_, entry_payload});
auto opcode = multi_ ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, unique_shard_cnt_, std::move(entry_payload));
}

void Transaction::BreakOnShutdown() {
Expand Down
1 change: 1 addition & 0 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,7 @@ class Transaction {

PerShardData() = default;
};

enum { kPerShardSize = sizeof(PerShardData) };

struct Multi {
Expand Down
Loading