Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

(generic family): journal support rename command #698

Merged
merged 1 commit into from
Jan 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion src/server/bitops_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -577,6 +577,10 @@ void BitOp(CmdArgList args, ConnectionContext* cntx) {
if (find_res == OpStatus::OK) {
operation.Commit(op_result);
}

if (shard->journal()) {
RecordJournal(t->GetOpArgs(shard), "SET", {dest_key, op_result});
}
}
return OpStatus::OK;
};
Expand Down Expand Up @@ -690,7 +694,7 @@ void BitOpsFamily::Register(CommandRegistry* registry) {
<< CI{"BITCOUNT", CO::READONLY, -2, 1, 1, 1}.SetHandler(&BitCount)
<< CI{"BITFIELD", CO::WRITE, -3, 1, 1, 1}.SetHandler(&BitField)
<< CI{"BITFIELD_RO", CO::READONLY, -5, 1, 1, 1}.SetHandler(&BitFieldRo)
<< CI{"BITOP", CO::WRITE, -4, 2, -1, 1}.SetHandler(&BitOp)
<< CI{"BITOP", CO::WRITE | CO::NO_AUTOJOURNAL, -4, 2, -1, 1}.SetHandler(&BitOp)
<< CI{"GETBIT", CO::READONLY | CO::FAST | CO::FAST, 3, 1, 1, 1}.SetHandler(&GetBit)
<< CI{"SETBIT", CO::WRITE, 4, 1, 1, 1}.SetHandler(&SetBit);
}
Expand Down
9 changes: 7 additions & 2 deletions src/server/common.cc
Original file line number Diff line number Diff line change
Expand Up @@ -193,8 +193,13 @@ bool ParseDouble(string_view src, double* value) {
return true;
}

void OpArgs::RecordJournal(string_view cmd, ArgSlice args) const {
tx->LogJournalOnShard(shard, make_pair(cmd, args), 1);
void RecordJournal(const OpArgs& op_args, string_view cmd, ArgSlice args, uint32_t shard_cnt,
bool multi_commands) {
op_args.tx->LogJournalOnShard(op_args.shard, make_pair(cmd, args), shard_cnt, multi_commands);
}

void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt) {
op_args.tx->FinishLogJournalOnShard(op_args.shard, shard_cnt);
}

#define ADD(x) (x) += o.x
Expand Down
10 changes: 7 additions & 3 deletions src/server/common.h
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,15 @@ struct OpArgs {
OpArgs(EngineShard* s, const Transaction* tx, const DbContext& cntx)
: shard(s), tx(tx), db_cntx(cntx) {
}

// Log single-shard journal command with own txid and dbid.
void RecordJournal(std::string_view cmd, ArgSlice args) const;
};

// Record non auto journal command with own txid and dbid.
void RecordJournal(const OpArgs& op_args, std::string_view cmd, ArgSlice args,
uint32_t shard_cnt = 1, bool multi_commands = false);

// Record non auto journal command finish. Call only when command translates to multi commands.
void RecordJournalFinish(const OpArgs& op_args, uint32_t shard_cnt);

struct TieredStats {
size_t tiered_reads = 0;
size_t tiered_writes = 0;
Expand Down
36 changes: 30 additions & 6 deletions src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -370,7 +370,11 @@ OpStatus Renamer::MoveSrc(Transaction* t, EngineShard* es) {
pv_ = std::move(it->second);
it->second.SetExpire(has_expire);
}

CHECK(es->db_slice().Del(t->GetDbIndex(), it)); // delete the entry with empty value in it.
if (es->journal()) {
RecordJournal(t->GetOpArgs(es), "DEL", ArgSlice{src_res_.key}, 2);
}
}

return OpStatus::OK;
Expand Down Expand Up @@ -406,6 +410,21 @@ OpStatus Renamer::UpdateDest(Transaction* t, EngineShard* es) {
if (!is_prior_list && dest_it->second.ObjType() == OBJ_LIST && es->blocking_controller()) {
es->blocking_controller()->AwakeWatched(t->GetDbIndex(), dest_key);
}
if (es->journal()) {
OpArgs op_args = t->GetOpArgs(es);
string scratch;
// todo insert under multi exec
RecordJournal(op_args, "SET"sv, ArgSlice{dest_key, dest_it->second.GetSlice(&scratch)}, 2,
true);
if (dest_it->first.IsSticky()) {
RecordJournal(op_args, "STICK"sv, ArgSlice{dest_key}, 2, true);
}
if (dest_it->second.HasExpire()) {
auto time = absl::StrCat(src_res_.expire_ts);
RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{time}, 2, true);
}
RecordJournalFinish(op_args, 2);
}
Comment on lines +413 to +427
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

So you send

  1. DEL with txid=T
  2. MUTLI(implied by opcode) SET EXPIREAT STICK EXEC with txid=T

In theory, do not DEL and SET have to be in one transaction (i.e. inside one multi)? But we cannot do such atomic operations on replica currently, right?

I'd also suggest assembling the latter command as one, because each RecordJournal puts pressure on the writing and receving side... But there is currently no option, because there is no stick parameter for set, right? Maybe we should add one in the future?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The DEL and SET are in one transaction because we shard_count=2 and the same txid. This is the same as multi shard commands which are not inside multi command transaction. Currently the replica just does not executes the transaction if data from all shard is not received i.e it will not execute the SET if the DEL was not received.
I use the multi just in the SET flow because we want to execute EXPIREAT and STICK in the same transaction, this also does not guarantees atomicity because currently we do the execution of the commands one by one.

We could do optimization for the SET flow for one command which will include the expire info and the stick info, but I am not sure its needed. Rename is probably not common command, and if this is the only flow we have this multi commands this is probably not worth it. Lets implement all the rename flows and see if this is needed

}

return OpStatus::OK;
Expand Down Expand Up @@ -592,13 +611,13 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP

// If the value was deleted, replicate as DEL.
// Else, replicate as PEXPIREAT with exact time.
if (auto journal = op_args.shard->journal(); journal && res.ok()) {
if (op_args.shard->journal() && res.ok()) {
if (res.value() == -1) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
} else {
auto time = absl::StrCat(res.value());
// Note: Don't forget to change this when adding arguments to expire commands.
op_args.RecordJournal("PEXPIREAT"sv, ArgSlice{time});
RecordJournal(op_args, "PEXPIREAT"sv, ArgSlice{time});
}
}

Expand Down Expand Up @@ -1195,7 +1214,12 @@ OpResult<void> GenericFamily::RenameGeneric(CmdArgList args, bool skip_exist_des

if (transaction->GetUniqueShardCnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
auto ec = OpRen(t->GetOpArgs(shard), key[0], key[1], skip_exist_dest);
// Incase of uniqe shard count we can use rename command in replica.
if (ec.ok() && shard->journal()) {
RecordJournal(t->GetOpArgs(shard), "RENAME", {key[0], key[1]});
}
return ec;
};
OpResult<void> result = transaction->ScheduleSingleHopT(std::move(cb));

Expand Down Expand Up @@ -1419,8 +1443,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(
PexpireAt)
<< CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Pexpire)
<< CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename)
<< CI{"RENAMENX", CO::WRITE, 3, 1, 2, 1}.HFUNC(RenameNx)
<< CI{"RENAME", CO::WRITE | CO::NO_AUTOJOURNAL, 3, 1, 2, 1}.HFUNC(Rename)
<< CI{"RENAMENX", CO::WRITE | CO::NO_AUTOJOURNAL, 3, 1, 2, 1}.HFUNC(RenameNx)
<< CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select)
<< CI{"SCAN", CO::READONLY | CO::FAST | CO::LOADING, -2, 0, 0, 0}.HFUNC(Scan)
<< CI{"TTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Ttl)
Expand Down
16 changes: 8 additions & 8 deletions src/server/set_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -539,7 +539,7 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v
auto it = db_slice.FindExt(op_args.db_cntx, key).first;
db_slice.Del(op_args.db_cntx.db_index, it);
if (journal_update && op_args.shard->journal()) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
}
return 0;
}
Expand Down Expand Up @@ -612,11 +612,11 @@ OpResult<uint32_t> OpAdd(const OpArgs& op_args, std::string_view key, ArgSlice v

db_slice.PostUpdate(op_args.db_cntx.db_index, it, key, !new_key);
if (journal_update && op_args.shard->journal()) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
vector<string_view> mapped(vals.size() + 1);
mapped[0] = key;
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
op_args.RecordJournal("SADD"sv, mapped);
RecordJournal(op_args, "SADD"sv, mapped);
}
return res;
}
Expand Down Expand Up @@ -689,7 +689,7 @@ OpResult<uint32_t> OpRem(const OpArgs& op_args, string_view key, const ArgSlice&
vector<string_view> mapped(vals.size() + 1);
mapped[0] = key;
std::copy(vals.begin(), vals.end(), mapped.begin() + 1);
op_args.RecordJournal("SREM"sv, mapped);
RecordJournal(op_args, "SREM"sv, mapped);
}

return removed;
Expand Down Expand Up @@ -981,8 +981,8 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count
CHECK(db_slice.Del(op_args.db_cntx.db_index, it));

// Replicate as DEL.
if (auto journal = op_args.shard->journal(); journal) {
op_args.RecordJournal("DEL"sv, ArgSlice{key});
if (op_args.shard->journal()) {
RecordJournal(op_args, "DEL"sv, ArgSlice{key});
}
} else {
SetType st{it->second.RObjPtr(), it->second.Encoding()};
Expand All @@ -1004,11 +1004,11 @@ OpResult<StringVec> OpPop(const OpArgs& op_args, string_view key, unsigned count
}

// Replicate as SREM with removed keys, because SPOP is not deterministic.
if (auto journal = op_args.shard->journal(); journal) {
if (op_args.shard->journal()) {
vector<string_view> mapped(result.size() + 1);
mapped[0] = key;
std::copy(result.begin(), result.end(), mapped.begin() + 1);
op_args.RecordJournal("SREM"sv, mapped);
RecordJournal(op_args, "SREM"sv, mapped);
}

db_slice.PostUpdate(op_args.db_cntx.db_index, it, key);
Expand Down
15 changes: 12 additions & 3 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1238,17 +1238,26 @@ void Transaction::LogAutoJournalOnShard(EngineShard* shard) {
entry_payload = make_pair(cmd, GetShardArgs(shard->shard_id()));
}

LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_);
LogJournalOnShard(shard, std::move(entry_payload), unique_shard_cnt_, false);
}

void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload,
uint32_t shard_cnt) const {
uint32_t shard_cnt, bool multi_commands) const {
auto journal = shard->journal();
CHECK(journal);
auto opcode = multi_ ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
auto opcode = (multi_ || multi_commands) ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND;
journal->RecordEntry(txid_, opcode, db_index_, shard_cnt, std::move(payload));
}

void Transaction::FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const {
if (multi_) {
return;
}
auto journal = shard->journal();
CHECK(journal);
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, shard_cnt, {});
}

void Transaction::BreakOnShutdown() {
if (coordinator_state_ & COORD_BLOCKED) {
coordinator_state_ |= COORD_CANCELLED;
Expand Down
8 changes: 8 additions & 0 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -199,6 +199,14 @@ class Transaction {

std::string DebugId() const;

// Write a journal entry to a shard journal with the given payload. When logging a non-automatic
// journal command, multiple journal entries may be necessary. In this case, call with set
// multi_commands to true and call the FinishLogJournalOnShard function after logging the final
// entry.
void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload, uint32_t shard_cnt,
bool multi_commands) const;
void FinishLogJournalOnShard(EngineShard* shard, uint32_t shard_cnt) const;

private:
// Holds number of locks for each IntentLock::Mode: shared and exlusive.
struct LockCnt {
Expand Down