Skip to content

Commit

Permalink
feat(server): Implement MOVE command (#298)
Browse files Browse the repository at this point in the history
* feat(server): Implement MOVE command

Signed-off-by: Vladislav Oleshko <[email protected]>
Co-authored-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg and dranikpg authored Sep 18, 2022
1 parent 1aef3c1 commit 16b6b11
Show file tree
Hide file tree
Showing 8 changed files with 148 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/api_status.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ with respect to Memcached and Redis APIs.
- [X] EXPIRE
- [X] EXPIREAT
- [X] KEYS
- [X] MOVE
- [X] PING
- [X] RENAME
- [X] RENAMENX
Expand Down Expand Up @@ -105,7 +106,6 @@ with respect to Memcached and Redis APIs.
- [ ] BGREWRITEAOF
- [ ] MONITOR
- [ ] RANDOMKEY
- [ ] MOVE

### API 2
- [X] List Family
Expand Down
72 changes: 71 additions & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,38 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(match_cnt);
}

void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
int64_t target_db;

if (!absl::SimpleAtoi(ArgS(args, 2), &target_db)) {
return (*cntx)->SendError(kInvalidIntErr);
}

if (target_db < 0 || target_db >= absl::GetFlag(FLAGS_dbnum)) {
return (*cntx)->SendError(kDbIndOutOfRangeErr);
}

if (target_db == cntx->db_index()) {
return (*cntx)->SendError("source and destination objects are the same");
}

OpStatus res = OpStatus::SKIPPED;
ShardId target_shard = Shard(key, shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) {
// MOVE runs as a global transaction and is therefore scheduled on every shard.
if (target_shard == shard->shard_id()) {
res = OpMove(t->GetOpArgs(shard), key, target_db);
}
return OpStatus::OK;
};

cntx->transaction->ScheduleSingleHop(std::move(cb));
// Exactly one shard will call OpMove.
DCHECK(res != OpStatus::SKIPPED);
(*cntx)->SendLong(res == OpStatus::OK);
}

void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) {
OpResult<void> st = RenameGeneric(args, false, cntx);
(*cntx)->SendError(st.status());
Expand Down Expand Up @@ -771,6 +803,43 @@ OpResult<uint32_t> GenericFamily::OpStick(const OpArgs& op_args, ArgSlice keys)
return res;
}

// OpMove touches multiple databases (op_args.db_idx, target_db), so it assumes it runs
// as a global transaction.
// TODO: Allow running OpMove without a global transaction.
OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) {
auto& db_slice = op_args.shard->db_slice();

// Fetch value at key in current db.
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, key);
if (!IsValid(from_it))
return OpStatus::KEY_NOTFOUND;

// Fetch value at key in target db.
auto [to_it, _] = db_slice.FindExt(target_db, key);
if (IsValid(to_it))
return OpStatus::KEY_EXISTS;

// Ensure target database exists.
db_slice.ActivateDb(target_db);

bool sticky = from_it->first.IsSticky();
uint64_t exp_ts = db_slice.ExpireTime(from_expire);
PrimeValue from_obj = std::move(from_it->second);

// Restore expire flag after std::move.
from_it->second.SetExpire(IsValid(from_expire));

CHECK(db_slice.Del(op_args.db_ind, from_it));
to_it = db_slice.AddNew(target_db, key, std::move(from_obj), exp_ts);
to_it->first.SetSticky(sticky);

if (to_it->second.ObjType() == OBJ_LIST && op_args.shard->blocking_controller()) {
op_args.shard->blocking_controller()->AwakeWatched(target_db, key);
}

return OpStatus::OK;
}

using CI = CommandId;

#define HFUNC(x) SetHandler(&GenericFamily::x)
Expand Down Expand Up @@ -798,7 +867,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl)
<< CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del)
<< CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick);
<< CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick)
<< CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS, 3, 1, 1, 1}.HFUNC(Move);
}

} // namespace dfly
2 changes: 2 additions & 0 deletions src/server/generic_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class GenericFamily {
static void Keys(CmdArgList args, ConnectionContext* cntx);
static void PexpireAt(CmdArgList args, ConnectionContext* cntx);
static void Stick(CmdArgList args, ConnectionContext* cntx);
static void Move(CmdArgList args, ConnectionContext* cntx);

static void Rename(CmdArgList args, ConnectionContext* cntx);
static void RenameNx(CmdArgList args, ConnectionContext* cntx);
Expand All @@ -71,6 +72,7 @@ class GenericFamily {
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
bool skip_exists);
static OpResult<uint32_t> OpStick(const OpArgs& op_args, ArgSlice keys);
static OpStatus OpMove(const OpArgs& op_args, std::string_view key, DbIndex target_db);
};

} // namespace dfly
45 changes: 45 additions & 0 deletions src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,51 @@ TEST_F(GenericFamilyTest, Stick) {
ASSERT_THAT(Run({"stick", "b"}), IntArg(0));
}

TEST_F(GenericFamilyTest, Move) {
// Check MOVE returns 0 on non-existent keys
ASSERT_THAT(Run({"move", "a", "1"}), IntArg(0));

// Check MOVE catches non-existent database indices
ASSERT_THAT(Run({"move", "a", "-1"}), ArgType(RespExpr::ERROR));
ASSERT_THAT(Run({"move", "a", "100500"}), ArgType(RespExpr::ERROR));

// Check MOVE moves value & expiry & stickyness
Run({"set", "a", "test"});
Run({"expire", "a", "1000"});
Run({"stick", "a"});
ASSERT_THAT(Run({"move", "a", "1"}), IntArg(1));
Run({"select", "1"});
ASSERT_THAT(Run({"get", "a"}), "test");
ASSERT_THAT(Run({"ttl", "a"}), testing::Not(IntArg(-1)));
ASSERT_THAT(Run({"stick", "a"}), IntArg(0));

// Check MOVE doesn't move if key exists
Run({"select", "1"});
Run({"set", "a", "test"});
Run({"select", "0"});
Run({"set", "a", "another test"});
ASSERT_THAT(Run({"move", "a", "1"}), IntArg(0)); // exists from test case above
Run({"select", "1"});
ASSERT_THAT(Run({"get", "a"}), "test");

// Check MOVE awakes blocking operations
auto fb_blpop = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
Run({"select", "1"});
auto resp = Run({"blpop", "l", "0"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));
EXPECT_THAT(resp.GetVec(), ElementsAre("l", "TestItem"));
});

WaitUntilLocked(1, "l");

pp_->at(1)->Await([&] {
Run({"select", "0"});
Run({"lpush", "l", "TestItem"});
Run({"move", "l", "1"});
});

fb_blpop.join();
}

using testing::AnyOf;
using testing::Each;
Expand Down
22 changes: 8 additions & 14 deletions src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ class ListFamilyTest : public BaseFamilyTest {
ListFamilyTest() {
num_threads_ = 4;
}

void WaitForLocked(string_view key) {
do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, key));
}
};

const char kKey1[] = "x";
Expand Down Expand Up @@ -187,7 +181,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

auto p1_fb = pp_->at(1)->LaunchFiber([&] {
for (unsigned i = 0; i < 100; ++i) {
Expand Down Expand Up @@ -225,7 +219,7 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

LOG(INFO) << "Starting multi";

Expand Down Expand Up @@ -295,7 +289,7 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
blpop_resp = Run({"blpop", kKey1, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

auto p1_fb = pp_->at(1)->LaunchFiber([&] {
Run({"multi"});
Expand Down Expand Up @@ -324,7 +318,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
ASSERT_THAT(watched, ArrLen(0));
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); });
pop_fb.join();
Expand All @@ -336,7 +330,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); });
pop_fb.join();
Expand All @@ -358,7 +352,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) {
ASSERT_THAT(watched, ArrLen(0));
});

WaitForLocked("x");
WaitUntilLocked(0, "x");

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); });
pop_fb.join();
Expand All @@ -377,7 +371,7 @@ TEST_F(ListFamilyTest, BPopRename) {
blpop_resp = Run({"blpop", kKey1, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] {
EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"}));
Expand All @@ -395,7 +389,7 @@ TEST_F(ListFamilyTest, BPopFlush) {
blpop_resp = Run({"blpop", kKey1, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] {
Run({"flushdb"});
Expand Down
10 changes: 10 additions & 0 deletions src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ void BaseFamilyTest::UpdateTime(uint64_t ms) {
shard_set->RunBriefInParallel(cb);
}

void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) {
auto step = 50us;
auto timeout_micro = chrono::duration_cast<chrono::microseconds> (1000ms * timeout);
int64_t steps = timeout_micro.count() / step.count();
do {
::boost::this_fiber::sleep_for(step);
} while (!IsLocked(db_index, key) && --steps > 0);
CHECK(IsLocked(db_index, key));
}

RespExpr BaseFamilyTest::Run(ArgSlice list) {
if (!ProactorBase::IsProactorThread()) {
return pp_->at(0)->Await([&] { return this->Run(list); });
Expand Down
3 changes: 3 additions & 0 deletions src/server/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class BaseFamilyTest : public ::testing::Test {
// ts is ms
void UpdateTime(uint64_t ms);

// Wait for a locked key to unlock. Aborts after timeout seconds passed.
void WaitUntilLocked(DbIndex db_index, std::string_view key, double timeout = 3);

std::string GetId() const;
size_t SubscriberMessagesLen(std::string_view conn_id) const;

Expand Down
15 changes: 8 additions & 7 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,14 @@ bool Transaction::RunInShard(EngineShard* shard) {
}
}
sd.local_mask &= ~OUT_OF_ORDER;

// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
// of the queue and notify the next one.
if (shard->blocking_controller())
shard->blocking_controller()->RunStep(awaked_prerun ? this : nullptr);
}
// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
// of the queue and notify the next one.
// RunStep is also called for global transactions because of commands like MOVE.
if (shard->blocking_controller()) {
shard->blocking_controller()->RunStep(awaked_prerun ? this : nullptr);
}
}

Expand Down

0 comments on commit 16b6b11

Please sign in to comment.