Skip to content

Commit

Permalink
feat(server): Implement MOVE command
Browse files Browse the repository at this point in the history
Signed-off-by: Vladislav Oleshko <[email protected]>
  • Loading branch information
dranikpg committed Sep 14, 2022
1 parent 4522f2f commit 88f67b3
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 2 deletions.
2 changes: 1 addition & 1 deletion docs/api_status.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ with respect to Memcached and Redis APIs.
- [X] EXPIRE
- [X] EXPIREAT
- [X] KEYS
- [X] MOVE
- [X] PING
- [X] RENAME
- [X] RENAMENX
Expand Down Expand Up @@ -105,7 +106,6 @@ with respect to Memcached and Redis APIs.
- [ ] BGREWRITEAOF
- [ ] MONITOR
- [ ] RANDOMKEY
- [ ] MOVE

### API 2
- [X] List Family
Expand Down
63 changes: 62 additions & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,33 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(match_cnt);
}

void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
int64_t target_db;

if (!absl::SimpleAtoi(ArgS(args, 2), &target_db)) {
return (*cntx)->SendError(kInvalidIntErr);
}

if (target_db < 0 || target_db >= absl::GetFlag(FLAGS_dbnum)) {
return (*cntx)->SendError(kDbIndOutOfRangeErr);
}

OpStatus res = OpStatus::SKIPPED;
ShardId target_shard = Shard(key, shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) {
// MOVE runs as a global transaction and is therefore scheduled on every shard.
if (target_shard == shard->shard_id()) {
res = OpMove(t->GetOpArgs(shard), key, target_db);
}
return OpStatus::OK;
};

cntx->transaction->ScheduleSingleHop(std::move(cb));
DCHECK(res != OpStatus::SKIPPED);
(*cntx)->SendLong(res == OpStatus::OK);
}

void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) {
OpResult<void> st = RenameGeneric(args, false, cntx);
(*cntx)->SendError(st.status());
Expand Down Expand Up @@ -771,6 +798,39 @@ OpResult<uint32_t> GenericFamily::OpStick(const OpArgs& op_args, ArgSlice keys)
return res;
}

// OpMove touches multiple databases (op_args.db_idx, target_db), so it assumes it runs
// as a global transaction.
// TODO: Allow running OpMove without a global transaction.
OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) {
auto& db_slice = op_args.shard->db_slice();

// Fetch value at key in current db.
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, key);
if (!IsValid(from_it))
return OpStatus::KEY_NOTFOUND;

// Fetch value at key in target db.
auto [to_it, _] = db_slice.FindExt(target_db, key);
if (IsValid(to_it))
return OpStatus::KEY_EXISTS;

// Ensure target database exists.
db_slice.ActivateDb(target_db);

bool sticky = from_it->first.IsSticky();
uint64_t exp_ts = db_slice.ExpireTime(from_expire);
PrimeValue from_obj = std::move(from_it->second);

// Restore expire flag after std::move.
from_it->second.SetExpire(IsValid(from_expire));

CHECK(db_slice.Del(op_args.db_ind, from_it));
to_it = db_slice.AddNew(target_db, key, std::move(from_obj), exp_ts);
to_it->first.SetSticky(sticky);

return OpStatus::OK;
}

using CI = CommandId;

#define HFUNC(x) SetHandler(&GenericFamily::x)
Expand Down Expand Up @@ -798,7 +858,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl)
<< CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del)
<< CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick);
<< CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick)
<< CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS, 3, 1, 1, 1}.HFUNC(Move);
}

} // namespace dfly
2 changes: 2 additions & 0 deletions src/server/generic_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class GenericFamily {
static void Keys(CmdArgList args, ConnectionContext* cntx);
static void PexpireAt(CmdArgList args, ConnectionContext* cntx);
static void Stick(CmdArgList args, ConnectionContext* cntx);
static void Move(CmdArgList args, ConnectionContext* cntx);

static void Rename(CmdArgList args, ConnectionContext* cntx);
static void RenameNx(CmdArgList args, ConnectionContext* cntx);
Expand All @@ -71,6 +72,7 @@ class GenericFamily {
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
bool skip_exists);
static OpResult<uint32_t> OpStick(const OpArgs& op_args, ArgSlice keys);
static OpStatus OpMove(const OpArgs& op_args, std::string_view key, DbIndex target_db);
};

} // namespace dfly
25 changes: 25 additions & 0 deletions src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,31 @@ TEST_F(GenericFamilyTest, Stick) {
ASSERT_THAT(Run({"stick", "b"}), IntArg(0));
}

TEST_F(GenericFamilyTest, Move) {
// Check MOVE returns 0 on non-existent keys
ASSERT_THAT(Run({"move", "a", "1"}), IntArg(0));

// Check MOVE catches non-existent database indices
ASSERT_THAT(Run({"move", "a", "-1"}), ArgType(RespExpr::ERROR));
ASSERT_THAT(Run({"move", "a", "100500"}), ArgType(RespExpr::ERROR));

// Check MOVE moves value & expiry & stickyness
Run({"set", "a", "test"});
Run({"expire", "a", "100"});
Run({"stick", "a"});
ASSERT_THAT(Run({"move", "a", "1"}), IntArg(1));
Run({"select", "1"});
ASSERT_THAT(Run({"get", "a"}), "test");
ASSERT_THAT(Run({"ttl", "a"}), testing::Not(IntArg(-1)));
ASSERT_THAT(Run({"stick", "a"}), IntArg(0));

// Check MOVE doesn't move if key exists
Run({"select", "0"});
Run({"set", "a", "another test"});
ASSERT_THAT(Run({"move", "a", "1"}), IntArg(0)); // exists from test case above
Run({"select", "1"});
ASSERT_THAT(Run({"get", "a"}), "test");
}

using testing::AnyOf;
using testing::Each;
Expand Down

0 comments on commit 88f67b3

Please sign in to comment.