Skip to content

Commit

Permalink
bug(replica): execute expire within multi command only if its belong … (
Browse files Browse the repository at this point in the history
#766)

Signed-off-by: adi_holden <[email protected]>
  • Loading branch information
adiholden authored Feb 8, 2023
1 parent b2958b5 commit 41c1eba
Show file tree
Hide file tree
Showing 3 changed files with 66 additions and 28 deletions.
26 changes: 20 additions & 6 deletions src/server/replica.cc
Original file line number Diff line number Diff line change
Expand Up @@ -808,15 +808,15 @@ void Replica::StableSyncDflyReadFb(Context* cntx) {
io::PrefixSource ps{prefix, &ss};

JournalReader reader{&ps, 0};

TransactionReader tx_reader{};
while (!cntx->IsCancelled()) {
waker_.await([&]() {
return ((trans_data_queue_.size() < kYieldAfterItemsInQueue) || cntx->IsCancelled());
});
if (cntx->IsCancelled())
break;

auto tx_data = TransactionData::ReadNext(&reader, cntx);
auto tx_data = tx_reader.NextTxData(&reader, cntx);
if (!tx_data)
break;

Expand Down Expand Up @@ -1187,6 +1187,7 @@ bool Replica::TransactionData::AddEntry(journal::ParsedEntry&& entry) {
return true;
case journal::Op::MULTI_COMMAND:
commands.push_back(std::move(entry.cmd));
dbid = entry.dbid;
return false;
default:
DCHECK(false) << "Unsupported opcode";
Expand All @@ -1198,18 +1199,31 @@ bool Replica::TransactionData::IsGlobalCmd() const {
return commands.size() == 1 && commands.front().cmd_args.size() == 1;
}

auto Replica::TransactionData::ReadNext(JournalReader* reader, Context* cntx)
// Expired entries within MULTI...EXEC sequence which belong to different database
// should be executed outside the multi transaciton.
bool Replica::TransactionReader::ReturnEntryOOO(const journal::ParsedEntry& entry) {
return !tx_data_.commands.empty() && entry.opcode == journal::Op::EXPIRED &&
tx_data_.dbid != entry.dbid;
}

auto Replica::TransactionReader::NextTxData(JournalReader* reader, Context* cntx)
-> optional<TransactionData> {
TransactionData out;
io::Result<journal::ParsedEntry> res;
do {
if (res = reader->ReadEntry(); !res) {
cntx->ReportError(res.error());
return std::nullopt;
}
} while (!cntx->IsCancelled() && !out.AddEntry(std::move(*res)));

return cntx->IsCancelled() ? std::nullopt : make_optional(std::move(out));
if (ReturnEntryOOO(*res)) {
TransactionData tmp_tx;
CHECK(tmp_tx.AddEntry(std::move(*res)));
return tmp_tx;
}

} while (!cntx->IsCancelled() && !tx_data_.AddEntry(std::move(*res)));

return cntx->IsCancelled() ? std::nullopt : make_optional(std::move(tx_data_));
}

} // namespace dfly
13 changes: 9 additions & 4 deletions src/server/replica.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,16 +59,21 @@ class Replica {
bool AddEntry(journal::ParsedEntry&& entry);

bool IsGlobalCmd() const;

// Collect next complete transaction data from journal reader.
static std::optional<TransactionData> ReadNext(JournalReader* reader, Context* cntx);

TxId txid;
DbIndex dbid;
uint32_t shard_cnt;
std::vector<journal::ParsedEntry::CmdData> commands;
};

// Utility for reading TransactionData from a journal reader.
struct TransactionReader {
std::optional<TransactionData> NextTxData(JournalReader* reader, Context* cntx);
bool ReturnEntryOOO(const journal::ParsedEntry& entry);

private:
TransactionData tx_data_{};
};

// Coorindator for multi shard execution.
struct MultiShardExecution {
boost::fibers::mutex map_mu;
Expand Down
55 changes: 37 additions & 18 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@
(8, [2, 2, 2, 2], dict(keys=4_000, dbcount=4)),
(4, [8, 8], dict(keys=4_000, dbcount=4)),
(4, [1] * 8, dict(keys=500, dbcount=2)),
#(1, [1], dict(keys=100, dbcount=2)),
(1, [1], dict(keys=100, dbcount=2)),
]


Expand Down Expand Up @@ -85,19 +85,22 @@ async def run_replication(c_replica):
async def check_replica_finished_exec(c_replica):
info_stats = await c_replica.execute_command("INFO")
tc1 = info_stats['total_commands_processed']
await asyncio.sleep(0.1)
await asyncio.sleep(0.5)
info_stats = await c_replica.execute_command("INFO")
tc2 = info_stats['total_commands_processed']
return tc1+1 == tc2 # Replica processed only the info command on above sleep.
# Replica processed only the info command on above sleep.
return tc1+1 == tc2


async def check_all_replicas_finished(c_replicas):
while True:
await asyncio.sleep(1.0)
is_finished_arr = await asyncio.gather(*(asyncio.create_task(check_replica_finished_exec(c))
for c in c_replicas))
for c in c_replicas))
if all(is_finished_arr):
break


async def check_data(seeder, replicas, c_replicas):
capture = await seeder.capture()
for (replica, c_replica) in zip(replicas, c_replicas):
Expand Down Expand Up @@ -439,7 +442,7 @@ async def check_list_ooo(cmd, rx_list):
expected_cmds = len(rx_list)
for i in range(expected_cmds):
mcmd = (await get_next_command())
#check command matches one regex from list
# check command matches one regex from list
match_rx = list(filter(lambda rx: re.match(rx, mcmd), rx_list))
assert len(match_rx) == 1
rx_list.remove(match_rx[0])
Expand Down Expand Up @@ -527,7 +530,6 @@ async def check_expire(key):
# Check BRPOP turns into RPOP
await check("BRPOP list 0", r"RPOP list")


await c_master.lpush("list1s", "v1", "v2", "v3", "v4")
await skip_cmd()
# Check LMOVE turns into LPUSH LPOP on multi shard
Expand Down Expand Up @@ -578,30 +580,47 @@ async def test_expiry(df_local_factory, n_keys=1000):
res = await c_replica.mget(k for k, _ in gen_test_data(n_keys))
assert all(v is not None for v in res)

# Set key expries in 500ms
# Set key differnt expries times in ms
pipe = c_master.pipeline(transaction=True)
for k, _ in gen_test_data(n_keys):
pipe.pexpire(k, 500)
ms = random.randint(100, 500)
pipe.pexpire(k, ms)
await pipe.execute()

# Wait two seconds for heatbeat to pick them up
await asyncio.sleep(2.0)

assert len(await c_master.keys()) == 0
assert len(await c_replica.keys()) == 0
# send more traffic for differnt dbs while keys are expired
for i in range(8):
is_multi = i % 2
c_master_db = aioredis.Redis(port=master.port, db=i)
pipe = c_master_db.pipeline(transaction=is_multi)
# Set simple keys n_keys..n_keys*2 on master
start_key = n_keys*(i+1)
end_key = start_key + n_keys
batch_fill_data(client=pipe, gen=gen_test_data(
end_key, start_key), batch_size=20)

await pipe.execute()

# Wait for master to expire keys
await asyncio.sleep(3.0)

# Check all keys with expiry has be deleted
res = await c_master.mget(k for k, _ in gen_test_data(n_keys))
assert all(v is None for v in res)
# Check replica finished executing the replicated commands
await check_all_replicas_finished([c_replica])
res = await c_replica.mget(k for k, _ in gen_test_data(n_keys))
assert all(v is None for v in res)

# Set keys
# Set expired keys again
pipe = c_master.pipeline(transaction=False)
batch_fill_data(pipe, gen_test_data(n_keys))
for k, _ in gen_test_data(n_keys):
pipe.pexpire(k, 500)
await pipe.execute()

await asyncio.sleep(1.0)

# Disconnect from master
await c_replica.execute_command("REPLICAOF NO ONE")

# Check replica expires keys on its own
await asyncio.sleep(1.0)
assert len(await c_replica.keys()) == 0
res = await c_replica.mget(k for k, _ in gen_test_data(n_keys))
assert all(v is None for v in res)

0 comments on commit 41c1eba

Please sign in to comment.