diff --git a/src/server/command_registry.cc b/src/server/command_registry.cc index 6d577907f9ce..d488bbeb869b 100644 --- a/src/server/command_registry.cc +++ b/src/server/command_registry.cc @@ -111,6 +111,8 @@ const char* OptName(CO::CommandOpt fl) { return "global-trans"; case VARIADIC_KEYS: return "variadic-keys"; + case NO_AUTOJOURNAL: + return "custom-journal"; } return "unknown"; } diff --git a/src/server/command_registry.h b/src/server/command_registry.h index 7dc9e8204626..a081a9d8ffff 100644 --- a/src/server/command_registry.h +++ b/src/server/command_registry.h @@ -19,20 +19,21 @@ class ConnectionContext; namespace CO { enum CommandOpt : uint32_t { - READONLY = 1, - FAST = 2, - WRITE = 4, - LOADING = 8, - DENYOOM = 0x10, // use-memory in redis. - REVERSE_MAPPING = 0x20, - - // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. - VARIADIC_KEYS = 0x40, - - ADMIN = 0x80, // implies NOSCRIPT, - NOSCRIPT = 0x100, - BLOCKING = 0x200, // implies REVERSE_MAPPING - GLOBAL_TRANS = 0x1000, + READONLY = 1U << 0, + FAST = 1U << 1, + WRITE = 1U << 2, + LOADING = 1U << 3, + DENYOOM = 1U << 4, // use-memory in redis. + REVERSE_MAPPING = 1U << 5, + + VARIADIC_KEYS = 1U << 6, // arg 2 determines number of keys. Relevant for ZUNIONSTORE, EVAL etc. + + ADMIN = 1U << 7, // implies NOSCRIPT, + NOSCRIPT = 1U << 8, + BLOCKING = 1U << 9, // implies REVERSE_MAPPING + GLOBAL_TRANS = 1U << 12, + + NO_AUTOJOURNAL = 1U << 15, // Skip automatically logging command to journal inside transaction. }; const char* OptName(CommandOpt fl); diff --git a/src/server/common.cc b/src/server/common.cc index 7f4b2da7389e..6e8379153d70 100644 --- a/src/server/common.cc +++ b/src/server/common.cc @@ -19,8 +19,11 @@ extern "C" { } #include "base/logging.h" #include "core/compact_object.h" +#include "server/engine_shard_set.h" #include "server/error.h" +#include "server/journal/journal.h" #include "server/server_state.h" +#include "server/transaction.h" namespace dfly { @@ -190,6 +193,10 @@ bool ParseDouble(string_view src, double* value) { return true; } +void OpArgs::RecordJournal(string_view cmd, ArgSlice args) const { + tx->LogJournalOnShard(shard, make_pair(cmd, args)); +} + #define ADD(x) (x) += o.x TieredStats& TieredStats::operator+=(const TieredStats& o) { diff --git a/src/server/common.h b/src/server/common.h index 3e57c3ed271a..0e4b761fb527 100644 --- a/src/server/common.h +++ b/src/server/common.h @@ -81,14 +81,18 @@ struct DbContext { struct OpArgs { EngineShard* shard; - TxId txid; + const Transaction* tx; DbContext db_cntx; - OpArgs() : shard(nullptr), txid(0) { + OpArgs() : shard(nullptr), tx(nullptr) { } - OpArgs(EngineShard* s, TxId i, const DbContext& cntx) : shard(s), txid(i), db_cntx(cntx) { + OpArgs(EngineShard* s, const Transaction* tx, const DbContext& cntx) + : shard(s), tx(tx), db_cntx(cntx) { } + + // Log single-shard journal command with own txid and dbid. + void RecordJournal(std::string_view cmd, ArgSlice args) const; }; struct TieredStats { diff --git a/src/server/db_slice.cc b/src/server/db_slice.cc index 5e9375aeaad1..e03fe697b862 100644 --- a/src/server/db_slice.cc +++ b/src/server/db_slice.cc @@ -13,6 +13,7 @@ extern "C" { #include "base/logging.h" #include "server/engine_shard_set.h" +#include "server/journal/journal.h" #include "server/server_state.h" #include "server/tiered_storage.h" #include "util/fiber_sched_algo.h" @@ -584,27 +585,33 @@ PrimeIterator DbSlice::AddNew(const Context& cntx, string_view key, PrimeValue o return it; } -OpStatus DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it, - ExpireIterator expire_it, const ExpireParams& params) { +pair DbSlice::ExpireParams::Calculate(int64_t now_ms) const { + int64_t msec = (unit == TimeUnit::SEC) ? value * 1000 : value; + int64_t now_msec = now_ms; + int64_t rel_msec = absolute ? msec - now_msec : msec; + return make_pair(rel_msec, now_msec + rel_msec); +} + +OpResult DbSlice::UpdateExpire(const Context& cntx, PrimeIterator prime_it, + ExpireIterator expire_it, const ExpireParams& params) { DCHECK(params.IsDefined()); DCHECK(IsValid(prime_it)); - int64_t msec = (params.unit == TimeUnit::SEC) ? params.value * 1000 : params.value; - int64_t now_msec = cntx.time_now_ms; - int64_t rel_msec = params.absolute ? msec - now_msec : msec; + auto [rel_msec, abs_msec] = params.Calculate(cntx.time_now_ms); if (rel_msec > kMaxExpireDeadlineSec * 1000) { return OpStatus::OUT_OF_RANGE; } if (rel_msec <= 0 && !params.persist) { CHECK(Del(cntx.db_index, prime_it)); + return -1; } else if (IsValid(expire_it)) { - expire_it->second = FromAbsoluteTime(now_msec + rel_msec); + expire_it->second = FromAbsoluteTime(abs_msec); + return abs_msec; } else { - UpdateExpire(cntx.db_index, prime_it, params.persist ? 0 : rel_msec + now_msec); + UpdateExpire(cntx.db_index, prime_it, params.persist ? 0 : abs_msec); + return params.persist ? 0 : abs_msec; } - - return OpStatus::OK; } std::pair DbSlice::AddOrUpdateInternal(const Context& cntx, diff --git a/src/server/db_slice.h b/src/server/db_slice.h index 8655deb23142..49499a190c2a 100644 --- a/src/server/db_slice.h +++ b/src/server/db_slice.h @@ -97,6 +97,9 @@ class DbSlice { bool IsDefined() const { return persist || value > INT64_MIN; } + + // Calculate relative and absolue timepoints. + std::pair Calculate(int64_t now_msec) const; }; DbSlice(uint32_t index, bool caching_mode, EngineShard* owner); @@ -171,8 +174,10 @@ class DbSlice { PrimeIterator AddNew(const Context& cntx, std::string_view key, PrimeValue obj, uint64_t expire_at_ms) noexcept(false); - facade::OpStatus UpdateExpire(const Context& cntx, PrimeIterator prime_it, ExpireIterator exp_it, - const ExpireParams& params); + // Update entry expiration. Return epxiration timepoint in abs milliseconds, or -1 if the entry + // already expired and was deleted; + facade::OpResult UpdateExpire(const Context& cntx, PrimeIterator prime_it, + ExpireIterator exp_it, const ExpireParams& params); // Adds expiry information. void AddExpire(DbIndex db_ind, PrimeIterator main_it, uint64_t at); diff --git a/src/server/generic_family.cc b/src/server/generic_family.cc index 925e2bc21148..66d5c620b6bd 100644 --- a/src/server/generic_family.cc +++ b/src/server/generic_family.cc @@ -19,6 +19,7 @@ extern "C" { #include "server/container_utils.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/journal/journal.h" #include "server/rdb_extensions.h" #include "server/rdb_load.h" #include "server/rdb_save.h" @@ -563,7 +564,6 @@ uint64_t ScanGeneric(uint64_t cursor, const ScanOpts& scan_opts, StringVec* keys do { ess->Await(sid, [&] { OpArgs op_args{EngineShard::tlocal(), 0, db_cntx}; - OpScan(op_args, scan_opts, &cursor, keys); }); if (cursor == 0) { @@ -588,7 +588,21 @@ OpStatus OpExpire(const OpArgs& op_args, string_view key, const DbSlice::ExpireP if (!IsValid(it)) return OpStatus::KEY_NOTFOUND; - return db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params); + auto res = db_slice.UpdateExpire(op_args.db_cntx, it, expire_it, params); + + // If the value was deleted, replicate as DEL. + // Else, replicate as PEXPIREAT with exact time. + if (auto journal = op_args.shard->journal(); journal && res.ok()) { + if (res.value() == -1) { + op_args.RecordJournal("DEL"sv, ArgSlice{key}); + } else { + auto time = absl::StrCat(res.value()); + // Note: Don't forget to change this when adding arguments to expire commands. + op_args.RecordJournal("PEXPIREAT"sv, ArgSlice{time}); + } + } + + return res.status(); } } // namespace @@ -1398,12 +1412,13 @@ void GenericFamily::Register(CommandRegistry* registry) { << CI{"ECHO", CO::LOADING | CO::FAST, 2, 0, 0, 0}.HFUNC(Echo) << CI{"EXISTS", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists) << CI{"TOUCH", CO::READONLY | CO::FAST, -2, 1, -1, 1}.HFUNC(Exists) - << CI{"EXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Expire) - << CI{"EXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(ExpireAt) + << CI{"EXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Expire) + << CI{"EXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(ExpireAt) << CI{"PERSIST", CO::WRITE | CO::FAST, 2, 1, 1, 1}.HFUNC(Persist) << CI{"KEYS", CO::READONLY, 2, 0, 0, 0}.HFUNC(Keys) - << CI{"PEXPIREAT", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(PexpireAt) - << CI{"PEXPIRE", CO::WRITE | CO::FAST, 3, 1, 1, 1}.HFUNC(Pexpire) + << CI{"PEXPIREAT", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC( + PexpireAt) + << CI{"PEXPIRE", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, 3, 1, 1, 1}.HFUNC(Pexpire) << CI{"RENAME", CO::WRITE, 3, 1, 2, 1}.HFUNC(Rename) << CI{"RENAMENX", CO::WRITE, 3, 1, 2, 1}.HFUNC(RenameNx) << CI{"SELECT", kSelectOpts, 2, 0, 0, 0}.HFUNC(Select) diff --git a/src/server/journal/journal.cc b/src/server/journal/journal.cc index 4c0496e965fe..c63926cc535f 100644 --- a/src/server/journal/journal.cc +++ b/src/server/journal/journal.cc @@ -111,8 +111,9 @@ bool Journal::EnterLameDuck() { return res; } -void Journal::RecordEntry(const Entry& entry) { - journal_slice.AddLogRecord(entry); +void Journal::RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, + Entry::Payload payload) { + journal_slice.AddLogRecord(Entry{txid, opcode, dbid, shard_cnt, std::move(payload)}); } TxId Journal::GetLastTxId() { diff --git a/src/server/journal/journal.h b/src/server/journal/journal.h index 2631f968efaf..f86eaea32478 100644 --- a/src/server/journal/journal.h +++ b/src/server/journal/journal.h @@ -53,7 +53,8 @@ class Journal { */ LSN GetLsn() const; - void RecordEntry(const Entry& entry); + void RecordEntry(TxId txid, Op opcode, DbIndex dbid, unsigned shard_cnt, Entry::Payload payload); + TxId GetLastTxId(); private: diff --git a/src/server/set_family.cc b/src/server/set_family.cc index 15c923b8fec3..3f54bbddd229 100644 --- a/src/server/set_family.cc +++ b/src/server/set_family.cc @@ -20,6 +20,7 @@ extern "C" { #include "server/container_utils.h" #include "server/engine_shard_set.h" #include "server/error.h" +#include "server/journal/journal.h" #include "server/transaction.h" ABSL_DECLARE_FLAG(bool, use_set2); @@ -959,8 +960,13 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count return true; }); - /* Delete the set as it is now empty */ + // Delete the set as it is now empty CHECK(db_slice.Del(op_args.db_cntx.db_index, it)); + + // Replicate as DEL. + if (auto journal = op_args.shard->journal(); journal) { + op_args.RecordJournal("DEL"sv, ArgSlice{key}); + } } else { SetType st{it->second.RObjPtr(), it->second.Encoding()}; db_slice.PreUpdate(op_args.db_cntx.db_index, it); @@ -979,6 +985,15 @@ OpResult OpPop(const OpArgs& op_args, string_view key, unsigned count } else { result = PopStrSet(op_args.db_cntx, count, st); } + + // Replicate as SREM with removed keys, because SPOP is not deterministic. + if (auto journal = op_args.shard->journal(); journal) { + vector mapped(result.size() + 1); + mapped[0] = key; + std::copy(result.begin(), result.end(), mapped.begin() + 1); + op_args.RecordJournal("SREM"sv, mapped); + } + db_slice.PostUpdate(op_args.db_cntx.db_index, it, key); } return result; @@ -1545,7 +1560,7 @@ void SetFamily::Register(CommandRegistry* registry) { << CI{"SMOVE", CO::FAST | CO::WRITE, 4, 1, 2, 1}.HFUNC(SMove) << CI{"SREM", CO::WRITE | CO::FAST | CO::DENYOOM, -3, 1, 1, 1}.HFUNC(SRem) << CI{"SCARD", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(SCard) - << CI{"SPOP", CO::WRITE | CO::FAST, -2, 1, 1, 1}.HFUNC(SPop) + << CI{"SPOP", CO::WRITE | CO::FAST | CO::NO_AUTOJOURNAL, -2, 1, 1, 1}.HFUNC(SPop) << CI{"SUNION", CO::READONLY, -2, 1, -1, 1}.HFUNC(SUnion) << CI{"SUNIONSTORE", CO::WRITE | CO::DENYOOM, -3, 1, -1, 1}.HFUNC(SUnionStore) << CI{"SSCAN", CO::READONLY, -3, 1, 1, 1}.HFUNC(SScan); diff --git a/src/server/string_family.cc b/src/server/string_family.cc index 0067968e01af..0f5e334c863b 100644 --- a/src/server/string_family.cc +++ b/src/server/string_family.cc @@ -211,7 +211,7 @@ OpResult OpGet(const OpArgs& op_args, string_view key, bool del_hit = fa if (exp_params.IsDefined()) { DVLOG(1) << "Expire: " << key; auto& db_slice = op_args.shard->db_slice(); - OpStatus status = db_slice.UpdateExpire(op_args.db_cntx, it, it_expire, exp_params); + OpStatus status = db_slice.UpdateExpire(op_args.db_cntx, it, it_expire, exp_params).status(); if (status != OpStatus::OK) return status; } diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 6571e65f0c16..da4954e80445 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -373,7 +373,7 @@ bool Transaction::RunInShard(EngineShard* shard) { /*************************************************************************/ if (!was_suspended && is_concluding) // Check last hop & non suspended. - LogJournalOnShard(shard); + LogAutoJournalOnShard(shard); // at least the coordinator thread owns the reference. DCHECK_GE(use_count(), 1u); @@ -801,7 +801,7 @@ void Transaction::RunQuickie(EngineShard* shard) { LOG(FATAL) << "Unexpected exception " << e.what(); } - LogJournalOnShard(shard); + LogAutoJournalOnShard(shard); sd.local_mask &= ~ARMED; cb_ = nullptr; // We can do it because only a single shard runs the callback. @@ -1093,7 +1093,7 @@ void Transaction::UnwatchShardCb(ArgSlice wkeys, bool should_expire, EngineShard void Transaction::UnlockMultiShardCb(const std::vector& sharded_keys, EngineShard* shard) { auto journal = shard->journal(); if (journal != nullptr && journal->GetLastTxId() == txid_) { - journal->RecordEntry(journal::Entry{txid_, journal::Op::EXEC, db_index_, unique_shard_cnt_}); + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, unique_shard_cnt_, {}); } if (multi_->multi_opts & CO::GLOBAL_TRANS) { @@ -1216,12 +1216,13 @@ bool Transaction::NotifySuspended(TxId committed_txid, ShardId sid) { return false; } -void Transaction::LogJournalOnShard(EngineShard* shard) { +void Transaction::LogAutoJournalOnShard(EngineShard* shard) { // TODO: For now, we ignore non shard coordination. if (shard == nullptr) return; - if ((cid_->opt_mask() & CO::WRITE) == 0) + // Ignore non-write commands or ones with disabled autojournal. + if ((cid_->opt_mask() & CO::WRITE) == 0 || (cid_->opt_mask() & CO::NO_AUTOJOURNAL) > 0) return; auto journal = shard->journal(); @@ -1234,15 +1235,18 @@ void Transaction::LogJournalOnShard(EngineShard* shard) { CHECK(!cmd_with_full_args_.empty()); entry_payload = cmd_with_full_args_; } else { - entry_payload = - make_pair(facade::ToSV(cmd_with_full_args_.front()), ShardArgsInShard(shard->shard_id())); - } - journal::Op opcode = journal::Op::COMMAND; - if (multi_) { - opcode = journal::Op::MULTI_COMMAND; + auto cmd = facade::ToSV(cmd_with_full_args_.front()); + entry_payload = make_pair(cmd, ShardArgsInShard(shard->shard_id())); } - journal->RecordEntry(journal::Entry{txid_, opcode, db_index_, unique_shard_cnt_, entry_payload}); + LogJournalOnShard(shard, std::move(entry_payload)); +} + +void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const { + auto journal = shard->journal(); + CHECK(journal); + auto opcode = multi_ ? journal::Op::MULTI_COMMAND : journal::Op::COMMAND; + journal->RecordEntry(txid_, opcode, db_index_, unique_shard_cnt_, std::move(payload)); } void Transaction::BreakOnShutdown() { diff --git a/src/server/transaction.h b/src/server/transaction.h index c635324cba62..7a6bb5f76a61 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -16,6 +16,7 @@ #include "core/tx_queue.h" #include "facade/op_status.h" #include "server/common.h" +#include "server/journal/types.h" #include "server/table.h" #include "util/fibers/fibers_ext.h" @@ -181,7 +182,7 @@ class Transaction { KeyLockArgs GetLockArgs(ShardId sid) const; OpArgs GetOpArgs(EngineShard* shard) const { - return OpArgs{shard, txid_, db_context()}; + return OpArgs{shard, this, db_context()}; } DbContext db_context() const { @@ -192,6 +193,9 @@ class Transaction { return db_index_; } + // Log a journal entry on shard with payload. + void LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&& payload) const; + private: struct LockCnt { unsigned cnt[2] = {0, 0}; @@ -245,9 +249,9 @@ class Transaction { return use_count_.load(std::memory_order_relaxed); } - // If needed, notify the jounral of the executed command on the given shard. + // Log command in the journal of a shard for write commands with auto-journaling enabled. // Should be called immediately after the last phase (hop). - void LogJournalOnShard(EngineShard* shard); + void LogAutoJournalOnShard(EngineShard* shard); struct PerShardData { uint32_t arg_start = 0; // Indices into args_ array. @@ -266,6 +270,7 @@ class Transaction { PerShardData() = default; }; + enum { kPerShardSize = sizeof(PerShardData) }; struct Multi { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index 0049fadb168e..b75e13e3a124 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -31,6 +31,7 @@ (1, [1], dict(keys=100, dbcount=2)), ] + @pytest.mark.asyncio @pytest.mark.parametrize("t_master, t_replicas, seeder_config", replication_cases) async def test_replication_all(df_local_factory, df_seeder_factory, t_master, t_replicas, seeder_config): @@ -316,12 +317,12 @@ async def start_master(): assert await seeder.compare(capture, port=replica.port) - """ Test flushall command. Set data to master send flashall and set more data. Check replica keys at the end. """ + @pytest.mark.asyncio async def test_flushall(df_local_factory): master = df_local_factory.create(port=BASE_PORT, proactor_threads=4) @@ -335,6 +336,7 @@ async def test_flushall(df_local_factory): await c_replica.execute_command(f"REPLICAOF localhost {master.port}") n_keys = 1000 + def gen_test_data(start, end): for i in range(start, end): yield f"key-{i}", f"value-{i}" @@ -346,7 +348,8 @@ def gen_test_data(start, end): # flushall pipe.flushall() # Set simple keys n_keys..n_keys*2 on master - batch_fill_data(client=pipe, gen=gen_test_data(n_keys, n_keys*2), batch_size=3) + batch_fill_data(client=pipe, gen=gen_test_data( + n_keys, n_keys*2), batch_size=3) await pipe.execute() diff --git a/tests/dragonfly/utility.py b/tests/dragonfly/utility.py index 7d8998bcc393..9fe4d86a0734 100644 --- a/tests/dragonfly/utility.py +++ b/tests/dragonfly/utility.py @@ -189,8 +189,8 @@ def gen_shrink_cmd(self): ('SETRANGE {k} 10 {val}', ValueType.STRING), ('LPUSH {k} {val}', ValueType.LIST), ('LPOP {k}', ValueType.LIST), - # ('SADD {k} {val}', ValueType.SET), - # ('SPOP {k}', ValueType.SET), + ('SADD {k} {val}', ValueType.SET), + ('SPOP {k}', ValueType.SET), # ('HSETNX {k} v0 {val}', ValueType.HSET), # ('HINCRBY {k} v1 1', ValueType.HSET), # ('ZPOPMIN {k} 1', ValueType.ZSET),