From 1132d7285655a10e1bf2796113b99b7275871956 Mon Sep 17 00:00:00 2001 From: Vladislav Oleshko Date: Sun, 4 Feb 2024 21:07:07 +0300 Subject: [PATCH] fix: fix replication v1.14 --- src/server/main_service.cc | 1 + src/server/replica.cc | 2 +- src/server/transaction.cc | 21 +++++++++++++++++-- src/server/transaction.h | 3 +++ tests/dragonfly/replication_test.py | 32 +++++++++++++++++++++++++++++ 5 files changed, 56 insertions(+), 3 deletions(-) diff --git a/src/server/main_service.cc b/src/server/main_service.cc index 75857bdb3f2d..1aef421d0cd7 100644 --- a/src/server/main_service.cc +++ b/src/server/main_service.cc @@ -1849,6 +1849,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret cntx->transaction = stub_tx.get(); result = interpreter->RunFunction(eval_args.sha, &error); + cntx->transaction->FIX_ConcludeJournalExec(); // flush journal cntx->transaction = tx; return OpStatus::OK; diff --git a/src/server/replica.cc b/src/server/replica.cc index 3f4d525ea153..7254ba10409d 100644 --- a/src/server/replica.cc +++ b/src/server/replica.cc @@ -1203,7 +1203,7 @@ auto DflyShardReplica::TransactionReader::NextTxData(JournalReader* reader, Cont // Otherwise, continue building multi command. DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC); - DCHECK(res->txid > 0); + DCHECK(res->txid > 0 || res->shard_cnt == 1); auto txid = res->txid; auto& txdata = current_[txid]; diff --git a/src/server/transaction.cc b/src/server/transaction.cc index 557df1e41e04..9b268a7372a3 100644 --- a/src/server/transaction.cc +++ b/src/server/transaction.cc @@ -108,7 +108,9 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optio // Use squashing mechanism for inline execution of single-shard EVAL multi_->mode = LOCK_AHEAD; } + multi_->role = SQUASHED_STUB; + multi_->shard_journal_write.resize(1); time_now_ms_ = parent->time_now_ms_; @@ -972,6 +974,16 @@ void Transaction::IterateMultiLocks(ShardId sid, std::functionshard_journal_write.front()) + return; + + if (auto journal = EngineShard::tlocal()->journal(); journal != nullptr) { + journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, 1, + unique_slot_checker_.GetUniqueSlotId(), {}, false); + } +} + void Transaction::EnableShard(ShardId sid) { unique_shard_cnt_ = 1; unique_shard_id_ = sid; @@ -1506,8 +1518,13 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload& bool allow_await) const { auto journal = shard->journal(); CHECK(journal); - if (multi_ && multi_->role != SQUASHED_STUB) - multi_->shard_journal_write[shard->shard_id()] = true; + + if (multi_) { + if (multi_->role != SQUASHED_STUB) + multi_->shard_journal_write[shard->shard_id()] = true; + else + multi_->shard_journal_write[0] = true; + } bool is_multi = multi_commands || IsAtomicMulti(); diff --git a/src/server/transaction.h b/src/server/transaction.h index ca5f817fa33a..666637fbc344 100644 --- a/src/server/transaction.h +++ b/src/server/transaction.h @@ -353,6 +353,9 @@ class Transaction { void IterateMultiLocks(ShardId sid, std::function cb) const; + // Send journal EXEC opcode after a series of MULTI commands on the currently active shard + void FIX_ConcludeJournalExec(); + private: // Holds number of locks for each IntentLock::Mode: shared and exlusive. struct LockCnt { diff --git a/tests/dragonfly/replication_test.py b/tests/dragonfly/replication_test.py index acac258124b8..4d919cddc279 100644 --- a/tests/dragonfly/replication_test.py +++ b/tests/dragonfly/replication_test.py @@ -769,6 +769,38 @@ async def test_expiry(df_local_factory: DflyInstanceFactory, n_keys=1000): assert all(v is None for v in res) +@dfly_args({"proactor_threads": 4}) +async def test_simple_scripts(df_local_factory: DflyInstanceFactory): + master = df_local_factory.create() + replicas = [df_local_factory.create() for _ in range(2)] + df_local_factory.start_all([master] + replicas) + + c_replicas = [replica.client() for replica in replicas] + c_master = master.client() + + # Connect replicas and wait for sync to finish + for c_replica in c_replicas: + await c_replica.execute_command(f"REPLICAOF localhost {master.port}") + await check_all_replicas_finished([c_replica], c_master) + + # Generate some scripts and run them + keys = ["a", "b", "c", "d", "e"] + for i in range(len(keys) + 1): + script = "" + subkeys = keys[:i] + for key in subkeys: + script += f"redis.call('INCR', '{key}')" + script += f"redis.call('INCR', '{key}')" + + await c_master.eval(script, len(subkeys), *subkeys) + + # Wait for replicas + await check_all_replicas_finished([c_replica], c_master) + + for c_replica in c_replicas: + assert (await c_replica.mget(keys)) == ["10", "8", "6", "4", "2"] + + """ Test script replication.