Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(server): Implement MOVE command #298

Merged
merged 4 commits into from
Sep 18, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/api_status.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ with respect to Memcached and Redis APIs.
- [X] EXPIRE
- [X] EXPIREAT
- [X] KEYS
- [X] MOVE
- [X] PING
- [X] RENAME
- [X] RENAMENX
Expand Down Expand Up @@ -105,7 +106,6 @@ with respect to Memcached and Redis APIs.
- [ ] BGREWRITEAOF
- [ ] MONITOR
- [ ] RANDOMKEY
- [ ] MOVE

### API 2
- [X] List Family
Expand Down
72 changes: 71 additions & 1 deletion src/server/generic_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -463,6 +463,38 @@ void GenericFamily::Stick(CmdArgList args, ConnectionContext* cntx) {
(*cntx)->SendLong(match_cnt);
}

void GenericFamily::Move(CmdArgList args, ConnectionContext* cntx) {
string_view key = ArgS(args, 1);
int64_t target_db;

if (!absl::SimpleAtoi(ArgS(args, 2), &target_db)) {
return (*cntx)->SendError(kInvalidIntErr);
}

if (target_db < 0 || target_db >= absl::GetFlag(FLAGS_dbnum)) {
return (*cntx)->SendError(kDbIndOutOfRangeErr);
}
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

if (target_db == cntx->db_index()) {
return (*cntx)->SendError("source and destination objects are the same");
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
}

OpStatus res = OpStatus::SKIPPED;
ShardId target_shard = Shard(key, shard_set->size());
auto cb = [&](Transaction* t, EngineShard* shard) {
// MOVE runs as a global transaction and is therefore scheduled on every shard.
if (target_shard == shard->shard_id()) {
res = OpMove(t->GetOpArgs(shard), key, target_db);
}
return OpStatus::OK;
};
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

cntx->transaction->ScheduleSingleHop(std::move(cb));
// Exactly one shard will call OpMove.
DCHECK(res != OpStatus::SKIPPED);
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
(*cntx)->SendLong(res == OpStatus::OK);
}

void GenericFamily::Rename(CmdArgList args, ConnectionContext* cntx) {
OpResult<void> st = RenameGeneric(args, false, cntx);
(*cntx)->SendError(st.status());
Expand Down Expand Up @@ -771,6 +803,43 @@ OpResult<uint32_t> GenericFamily::OpStick(const OpArgs& op_args, ArgSlice keys)
return res;
}

// OpMove touches multiple databases (op_args.db_idx, target_db), so it assumes it runs
// as a global transaction.
// TODO: Allow running OpMove without a global transaction.
OpStatus GenericFamily::OpMove(const OpArgs& op_args, string_view key, DbIndex target_db) {
auto& db_slice = op_args.shard->db_slice();

// Fetch value at key in current db.
auto [from_it, from_expire] = db_slice.FindExt(op_args.db_ind, key);
if (!IsValid(from_it))
return OpStatus::KEY_NOTFOUND;

// Fetch value at key in target db.
auto [to_it, _] = db_slice.FindExt(target_db, key);
if (IsValid(to_it))
return OpStatus::KEY_EXISTS;

// Ensure target database exists.
db_slice.ActivateDb(target_db);

bool sticky = from_it->first.IsSticky();
uint64_t exp_ts = db_slice.ExpireTime(from_expire);
PrimeValue from_obj = std::move(from_it->second);

// Restore expire flag after std::move.
from_it->second.SetExpire(IsValid(from_expire));
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

CHECK(db_slice.Del(op_args.db_ind, from_it));
to_it = db_slice.AddNew(target_db, key, std::move(from_obj), exp_ts);
to_it->first.SetSticky(sticky);
dranikpg marked this conversation as resolved.
Show resolved Hide resolved

if (to_it->second.ObjType() == OBJ_LIST && op_args.shard->blocking_controller()) {
op_args.shard->blocking_controller()->AwakeWatched(target_db, key);
}

return OpStatus::OK;
romange marked this conversation as resolved.
Show resolved Hide resolved
}

using CI = CommandId;

#define HFUNC(x) SetHandler(&GenericFamily::x)
Expand Down Expand Up @@ -798,7 +867,8 @@ void GenericFamily::Register(CommandRegistry* registry) {
<< CI{"PTTL", CO::READONLY | CO::FAST, 2, 1, 1, 1}.HFUNC(Pttl)
<< CI{"TYPE", CO::READONLY | CO::FAST | CO::LOADING, 2, 1, 1, 1}.HFUNC(Type)
<< CI{"UNLINK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Del)
<< CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick);
<< CI{"STICK", CO::WRITE, -2, 1, -1, 1}.HFUNC(Stick)
<< CI{"MOVE", CO::WRITE | CO::GLOBAL_TRANS, 3, 1, 1, 1}.HFUNC(Move);
}

} // namespace dfly
2 changes: 2 additions & 0 deletions src/server/generic_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ class GenericFamily {
static void Keys(CmdArgList args, ConnectionContext* cntx);
static void PexpireAt(CmdArgList args, ConnectionContext* cntx);
static void Stick(CmdArgList args, ConnectionContext* cntx);
static void Move(CmdArgList args, ConnectionContext* cntx);

static void Rename(CmdArgList args, ConnectionContext* cntx);
static void RenameNx(CmdArgList args, ConnectionContext* cntx);
Expand All @@ -71,6 +72,7 @@ class GenericFamily {
static OpResult<void> OpRen(const OpArgs& op_args, std::string_view from, std::string_view to,
bool skip_exists);
static OpResult<uint32_t> OpStick(const OpArgs& op_args, ArgSlice keys);
static OpStatus OpMove(const OpArgs& op_args, std::string_view key, DbIndex target_db);
};

} // namespace dfly
45 changes: 45 additions & 0 deletions src/server/generic_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,51 @@ TEST_F(GenericFamilyTest, Stick) {
ASSERT_THAT(Run({"stick", "b"}), IntArg(0));
}

TEST_F(GenericFamilyTest, Move) {
// Check MOVE returns 0 on non-existent keys
ASSERT_THAT(Run({"move", "a", "1"}), IntArg(0));

// Check MOVE catches non-existent database indices
ASSERT_THAT(Run({"move", "a", "-1"}), ArgType(RespExpr::ERROR));
ASSERT_THAT(Run({"move", "a", "100500"}), ArgType(RespExpr::ERROR));

// Check MOVE moves value & expiry & stickyness
Run({"set", "a", "test"});
Run({"expire", "a", "1000"});
Run({"stick", "a"});
ASSERT_THAT(Run({"move", "a", "1"}), IntArg(1));
Run({"select", "1"});
ASSERT_THAT(Run({"get", "a"}), "test");
ASSERT_THAT(Run({"ttl", "a"}), testing::Not(IntArg(-1)));
ASSERT_THAT(Run({"stick", "a"}), IntArg(0));

// Check MOVE doesn't move if key exists
Run({"select", "1"});
Run({"set", "a", "test"});
Run({"select", "0"});
Run({"set", "a", "another test"});
ASSERT_THAT(Run({"move", "a", "1"}), IntArg(0)); // exists from test case above
Run({"select", "1"});
ASSERT_THAT(Run({"get", "a"}), "test");

// Check MOVE awakes blocking operations
auto fb_blpop = pp_->at(0)->LaunchFiber(fibers::launch::dispatch, [&] {
Run({"select", "1"});
auto resp = Run({"blpop", "l", "0"});
ASSERT_THAT(resp, ArgType(RespExpr::ARRAY));
EXPECT_THAT(resp.GetVec(), ElementsAre("l", "TestItem"));
});

WaitUntilLocked(1, "l");

pp_->at(1)->Await([&] {
Run({"select", "0"});
Run({"lpush", "l", "TestItem"});
Run({"move", "l", "1"});
});

fb_blpop.join();
}

using testing::AnyOf;
using testing::Each;
Expand Down
22 changes: 8 additions & 14 deletions src/server/list_family_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,6 @@ class ListFamilyTest : public BaseFamilyTest {
ListFamilyTest() {
num_threads_ = 4;
}

void WaitForLocked(string_view key) {
do {
this_fiber::sleep_for(30us);
} while (!IsLocked(0, key));
}
};

const char kKey1[] = "x";
Expand Down Expand Up @@ -187,7 +181,7 @@ TEST_F(ListFamilyTest, BLPopMultiPush) {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

auto p1_fb = pp_->at(1)->LaunchFiber([&] {
for (unsigned i = 0; i < 100; ++i) {
Expand Down Expand Up @@ -225,7 +219,7 @@ TEST_F(ListFamilyTest, BLPopSerialize) {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey3, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

LOG(INFO) << "Starting multi";

Expand Down Expand Up @@ -295,7 +289,7 @@ TEST_F(ListFamilyTest, WrongTypeDoesNotWake) {
blpop_resp = Run({"blpop", kKey1, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

auto p1_fb = pp_->at(1)->LaunchFiber([&] {
Run({"multi"});
Expand Down Expand Up @@ -324,7 +318,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
ASSERT_THAT(watched, ArrLen(0));
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey1, "bar"})); });
pop_fb.join();
Expand All @@ -336,7 +330,7 @@ TEST_F(ListFamilyTest, BPopSameKeyTwice) {
blpop_resp = Run({"blpop", kKey1, kKey2, kKey2, kKey1, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", kKey2, "bar"})); });
pop_fb.join();
Expand All @@ -358,7 +352,7 @@ TEST_F(ListFamilyTest, BPopTwoKeysSameShard) {
ASSERT_THAT(watched, ArrLen(0));
});

WaitForLocked("x");
WaitUntilLocked(0, "x");

pp_->at(1)->Await([&] { EXPECT_EQ(1, CheckedInt({"lpush", "x", "bar"})); });
pop_fb.join();
Expand All @@ -377,7 +371,7 @@ TEST_F(ListFamilyTest, BPopRename) {
blpop_resp = Run({"blpop", kKey1, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] {
EXPECT_EQ(1, CheckedInt({"lpush", "a", "bar"}));
Expand All @@ -395,7 +389,7 @@ TEST_F(ListFamilyTest, BPopFlush) {
blpop_resp = Run({"blpop", kKey1, "0"});
});

WaitForLocked(kKey1);
WaitUntilLocked(0, kKey1);

pp_->at(1)->Await([&] {
Run({"flushdb"});
Expand Down
10 changes: 10 additions & 0 deletions src/server/test_utils.cc
Original file line number Diff line number Diff line change
Expand Up @@ -157,6 +157,16 @@ void BaseFamilyTest::UpdateTime(uint64_t ms) {
shard_set->RunBriefInParallel(cb);
}

void BaseFamilyTest::WaitUntilLocked(DbIndex db_index, string_view key, double timeout) {
auto step = 50us;
auto timeout_micro = chrono::duration_cast<chrono::microseconds> (1000ms * timeout);
int64_t steps = timeout_micro.count() / step.count();
do {
::boost::this_fiber::sleep_for(step);
} while (!IsLocked(db_index, key) && --steps > 0);
CHECK(IsLocked(db_index, key));
}

RespExpr BaseFamilyTest::Run(ArgSlice list) {
if (!ProactorBase::IsProactorThread()) {
return pp_->at(0)->Await([&] { return this->Run(list); });
Expand Down
3 changes: 3 additions & 0 deletions src/server/test_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class BaseFamilyTest : public ::testing::Test {
// ts is ms
void UpdateTime(uint64_t ms);

// Wait for a locked key to unlock. Aborts after timeout seconds passed.
void WaitUntilLocked(DbIndex db_index, std::string_view key, double timeout = 3);

std::string GetId() const;
size_t SubscriberMessagesLen(std::string_view conn_id) const;

Expand Down
15 changes: 8 additions & 7 deletions src/server/transaction.cc
Original file line number Diff line number Diff line change
Expand Up @@ -398,13 +398,14 @@ bool Transaction::RunInShard(EngineShard* shard) {
}
}
sd.local_mask &= ~OUT_OF_ORDER;

// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
// of the queue and notify the next one.
if (shard->blocking_controller())
shard->blocking_controller()->RunStep(awaked_prerun ? this : nullptr);
}
// It has 2 responsibilities.
// 1: to go over potential wakened keys, verify them and activate watch queues.
// 2: if this transaction was notified and finished running - to remove it from the head
// of the queue and notify the next one.
// RunStep is also called for global transactions because of commands like MOVE.
if (shard->blocking_controller()) {
shard->blocking_controller()->RunStep(awaked_prerun ? this : nullptr);
}
dranikpg marked this conversation as resolved.
Show resolved Hide resolved
}

Expand Down