Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix replication v1.14 #2537

Merged
merged 1 commit into from
Feb 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/server/main_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1849,6 +1849,7 @@ void Service::EvalInternal(CmdArgList args, const EvalArgs& eval_args, Interpret
cntx->transaction = stub_tx.get();

result = interpreter->RunFunction(eval_args.sha, &error);
cntx->transaction->FIX_ConcludeJournalExec(); // flush journal

cntx->transaction = tx;
return OpStatus::OK;
Expand Down
2 changes: 1 addition & 1 deletion src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1203,7 +1203,7 @@ auto DflyShardReplica::TransactionReader::NextTxData(JournalReader* reader, Cont

// Otherwise, continue building multi command.
DCHECK(res->opcode == journal::Op::MULTI_COMMAND || res->opcode == journal::Op::EXEC);
DCHECK(res->txid > 0);
DCHECK(res->txid > 0 || res->shard_cnt == 1);

auto txid = res->txid;
auto& txdata = current_[txid];
Expand Down
21 changes: 19 additions & 2 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,9 @@ Transaction::Transaction(const Transaction* parent, ShardId shard_id, std::optio
// Use squashing mechanism for inline execution of single-shard EVAL
multi_->mode = LOCK_AHEAD;
}

multi_->role = SQUASHED_STUB;
multi_->shard_journal_write.resize(1);

time_now_ms_ = parent->time_now_ms_;

Expand Down Expand Up @@ -972,6 +974,16 @@ void Transaction::IterateMultiLocks(ShardId sid, std::function<void(const std::s
}
}

void Transaction::FIX_ConcludeJournalExec() {
if (!multi_->shard_journal_write.front())
return;

if (auto journal = EngineShard::tlocal()->journal(); journal != nullptr) {
journal->RecordEntry(txid_, journal::Op::EXEC, db_index_, 1,
unique_slot_checker_.GetUniqueSlotId(), {}, false);
}
}

void Transaction::EnableShard(ShardId sid) {
unique_shard_cnt_ = 1;
unique_shard_id_ = sid;
Expand Down Expand Up @@ -1506,8 +1518,13 @@ void Transaction::LogJournalOnShard(EngineShard* shard, journal::Entry::Payload&
bool allow_await) const {
auto journal = shard->journal();
CHECK(journal);
if (multi_ && multi_->role != SQUASHED_STUB)
multi_->shard_journal_write[shard->shard_id()] = true;

if (multi_) {
if (multi_->role != SQUASHED_STUB)
multi_->shard_journal_write[shard->shard_id()] = true;
else
multi_->shard_journal_write[0] = true;
}

bool is_multi = multi_commands || IsAtomicMulti();

Expand Down
3 changes: 3 additions & 0 deletions src/server/transaction.h
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,9 @@ class Transaction {

void IterateMultiLocks(ShardId sid, std::function<void(const std::string&)> cb) const;

// Send journal EXEC opcode after a series of MULTI commands on the currently active shard
void FIX_ConcludeJournalExec();

private:
// Holds number of locks for each IntentLock::Mode: shared and exlusive.
struct LockCnt {
Expand Down
32 changes: 32 additions & 0 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -769,6 +769,38 @@ async def test_expiry(df_local_factory: DflyInstanceFactory, n_keys=1000):
assert all(v is None for v in res)


@dfly_args({"proactor_threads": 4})
async def test_simple_scripts(df_local_factory: DflyInstanceFactory):
master = df_local_factory.create()
replicas = [df_local_factory.create() for _ in range(2)]
df_local_factory.start_all([master] + replicas)

c_replicas = [replica.client() for replica in replicas]
c_master = master.client()

# Connect replicas and wait for sync to finish
for c_replica in c_replicas:
await c_replica.execute_command(f"REPLICAOF localhost {master.port}")
await check_all_replicas_finished([c_replica], c_master)

# Generate some scripts and run them
keys = ["a", "b", "c", "d", "e"]
for i in range(len(keys) + 1):
script = ""
subkeys = keys[:i]
for key in subkeys:
script += f"redis.call('INCR', '{key}')"
script += f"redis.call('INCR', '{key}')"

await c_master.eval(script, len(subkeys), *subkeys)

# Wait for replicas
await check_all_replicas_finished([c_replica], c_master)

for c_replica in c_replicas:
assert (await c_replica.mget(keys)) == ["10", "8", "6", "4", "2"]


"""
Test script replication.

Expand Down