Skip to content

Commit

Permalink
feat(list family): support blocking command for replication (#740)
Browse files Browse the repository at this point in the history
  • Loading branch information
adiholden authored Feb 2, 2023
1 parent 68edcf0 commit 69519b2
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 11 deletions.
31 changes: 22 additions & 9 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -191,7 +191,7 @@ class BPopper {
}

private:
OpStatus Pop(Transaction* t, EngineShard* shard);
void Pop(Transaction* t, EngineShard* shard);

ListDir dir_;

Expand Down Expand Up @@ -267,13 +267,21 @@ OpStatus BPopper::Run(Transaction* t, unsigned msec) {
VLOG(1) << "Popping an element " << t->DebugId();
ff_result_ = move(result.value());

auto cb = [this](Transaction* t, EngineShard* shard) { return Pop(t, shard); };
auto cb = [this](Transaction* t, EngineShard* shard) {
Pop(t, shard);
OpArgs op_args = t->GetOpArgs(shard);
if (op_args.shard->journal()) {
string command = dir_ == ListDir::LEFT ? "LPOP" : "RPOP";
RecordJournal(op_args, command, ArgSlice{key_}, 1);
}
return OpStatus::OK;
};
t->Execute(std::move(cb), true);

return OpStatus::OK;
}

OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
void BPopper::Pop(Transaction* t, EngineShard* shard) {
if (shard->shard_id() == ff_result_.sid) {
ff_result_.key.GetString(&key_);
auto& db_slice = shard->db_slice();
Expand All @@ -289,8 +297,6 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
CHECK(shard->db_slice().Del(t->GetDbIndex(), it));
}
}

return OpStatus::OK;
}

OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, string_view dest,
Expand Down Expand Up @@ -882,8 +888,13 @@ OpResult<string> BPopPusher::RunSingle(Transaction* t, time_point tp) {
OpResult<string> op_res;
bool is_multi = t->IsMulti();
auto cb_move = [&](Transaction* t, EngineShard* shard) {
op_res = OpMoveSingleShard(t->GetOpArgs(shard), pop_key_, push_key_, popdir_, pushdir_);
t->RenableAutoJournal(); // With single shard run auto journal flow.
OpArgs op_args = t->GetOpArgs(shard);
op_res = OpMoveSingleShard(op_args, pop_key_, push_key_, popdir_, pushdir_);
if (op_res) {
if (op_args.shard->journal()) {
RecordJournal(op_args, "RPOPLPUSH", ArgSlice{pop_key_, push_key_}, 1);
}
}
return OpStatus::OK;
};
t->Execute(cb_move, false);
Expand Down Expand Up @@ -1341,8 +1352,10 @@ void ListFamily::Register(CommandRegistry* registry) {
.SetHandler(RPopLPush)
<< CI{"BRPOPLPUSH", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, 4, 1, 2, 1}
.SetHandler(BRPopLPush)
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BLPop)
<< CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING, -3, 1, -2, 1}.HFUNC(BRPop)
<< CI{"BLPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2, 1}
.HFUNC(BLPop)
<< CI{"BRPOP", CO::WRITE | CO::NOSCRIPT | CO::BLOCKING | CO::NO_AUTOJOURNAL, -3, 1, -2, 1}
.HFUNC(BRPop)
<< CI{"LLEN", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(LLen)
<< CI{"LPOS", CO::READONLY | CO::FAST, -3, 1, 1, 1}.HFUNC(LPos)
<< CI{"LINDEX", CO::READONLY, 3, 1, 1, 1}.HFUNC(LIndex)
Expand Down
8 changes: 6 additions & 2 deletions tests/dragonfly/replication_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -513,8 +513,12 @@ async def check_expire(key):

# Check there is no rewrite for RPOPLPUSH on single shard
await check("RPOPLPUSH list list", r"RPOPLPUSH list list")
# Check there is no rewrite for BRPOPLPUSH on single shard
await check("BRPOPLPUSH list list 0", r"BRPOPLPUSH list list 0")
# Check BRPOPLPUSH on single shard turns into RPOPLPUSH
await check("BRPOPLPUSH list list 0", r"RPOPLPUSH list list")
# Check BLPOP turns into LPOP
await check("BLPOP list 0", r"LPOP list")
# Check BRPOP turns into RPOP
await check("BRPOP list 0", r"RPOP list")


await c_master.lpush("list1s", "v1", "v2", "v3", "v4")
Expand Down

0 comments on commit 69519b2

Please sign in to comment.