Skip to content

Commit

Permalink
feat(server): implement LMOVE #369 (#391)
Browse files Browse the repository at this point in the history
Signed-off-by: chenyuxuan.allen <[email protected]>

Co-authored-by: chenyuxuan.allen <[email protected]>
  • Loading branch information
YuxuanChen98 and YuxuanChen98 authored Oct 16, 2022
1 parent fcb95be commit 15725f4
Show file tree
Hide file tree
Showing 4 changed files with 283 additions and 78 deletions.
1 change: 1 addition & 0 deletions CONTRIBUTORS.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
* **[Amir Alperin](https://github.com/iko1)**
* **[Philipp Born](https://github.com/tamcore)**
* Helm Chart
* **[Yuxuan Chen](https://github.com/YuxuanChen98)**
* **[Redha Lhimeur](https://github.com/redhal)**
* **[Braydn Moore](https://github.com/braydnm)**
* **[Logan Raarup](https://github.com/logandk)**
Expand Down
195 changes: 117 additions & 78 deletions src/server/list_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,8 @@ OpStatus BPopper::Pop(Transaction* t, EngineShard* shard) {
return OpStatus::OK;
}

OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src, string_view dest) {
OpResult<string> OpMoveSingleShard(const OpArgs& op_args, string_view src, string_view dest,
ListDir src_dir, ListDir dest_dir) {
auto& db_slice = op_args.shard->db_slice();
auto src_res = db_slice.Find(op_args.db_cntx, src, OBJ_LIST);
if (!src_res)
Expand All @@ -287,9 +288,10 @@ OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,

if (src == dest) { // simple case.
db_slice.PreUpdate(op_args.db_cntx.db_index, src_it);
string val = ListPop(ListDir::RIGHT, src_ql);
string val = ListPop(src_dir, src_ql);

quicklistPushHead(src_ql, val.data(), val.size());
int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(src_ql, val.data(), val.size(), pos);
db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src);

return val;
Expand Down Expand Up @@ -323,8 +325,9 @@ OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,

db_slice.PreUpdate(op_args.db_cntx.db_index, src_it);

string val = ListPop(ListDir::RIGHT, src_ql);
quicklistPushHead(dest_ql, val.data(), val.size());
string val = ListPop(src_dir, src_ql);
int pos = (dest_dir == ListDir::LEFT) ? QUICKLIST_HEAD : QUICKLIST_TAIL;
quicklistPush(dest_ql, val.data(), val.size(), pos);

db_slice.PostUpdate(op_args.db_cntx.db_index, src_it, src);
db_slice.PostUpdate(op_args.db_cntx.db_index, dest_it, dest, !new_key);
Expand All @@ -336,9 +339,9 @@ OpResult<string> OpRPopLPushSingleShard(const OpArgs& op_args, string_view src,
return val;
}

// Read-only peek operation that determines wether the list exists and optionally
// returns the first from right value without popping it from the list.
OpResult<string> RPeek(const OpArgs& op_args, string_view key, bool fetch) {
// Read-only peek operation that determines whether the list exists and optionally
// returns the first from left/right value without popping it from the list.
OpResult<string> Peek(const OpArgs& op_args, string_view key, ListDir dir, bool fetch) {
auto it_res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST);
if (!it_res) {
return it_res.status();
Expand All @@ -349,7 +352,8 @@ OpResult<string> RPeek(const OpArgs& op_args, string_view key, bool fetch) {

quicklist* ql = GetQL(it_res.value()->second);
quicklistEntry entry = container_utils::QLEntry();
quicklistIter* iter = quicklistGetIterator(ql, AL_START_TAIL);
quicklistIter* iter = (dir == ListDir::LEFT) ? quicklistGetIterator(ql, AL_START_HEAD) :
quicklistGetIterator(ql, AL_START_TAIL);
CHECK(quicklistNext(iter, &entry));
quicklistReleaseIterator(iter);

Expand Down Expand Up @@ -480,74 +484,7 @@ void ListFamily::RPopLPush(CmdArgList args, ConnectionContext* cntx) {
string_view src = ArgS(args, 1);
string_view dest = ArgS(args, 2);

OpResult<string> result;

if (cntx->transaction->unique_shard_cnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpRPopLPushSingleShard(t->GetOpArgs(shard), src, dest);
};

result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else {
CHECK_EQ(2u, cntx->transaction->unique_shard_cnt());

OpResult<string> find_res[2];

// Transaction is comprised of 2 hops:
// 1 - check for entries existence, their types and if possible -
// read the value we may rpop from the source list.
// 2. If everything is ok, rpop from source and lpush the peeked value into
// the destination.
//
cntx->transaction->Schedule();
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
DCHECK_EQ(1u, args.size());
bool is_dest = args.front() == dest;
find_res[is_dest] = RPeek(t->GetOpArgs(shard), args.front(), !is_dest);
return OpStatus::OK;
};

cntx->transaction->Execute(move(cb), false);

if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) {
auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
cntx->transaction->Execute(move(cb), true);
result = find_res[0] ? find_res[1] : find_res[0];
} else {
// Everything is ok, lets proceed with the mutations.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
bool is_dest = args.front() == dest;
OpArgs op_args = t->GetOpArgs(shard);

if (is_dest) {
string_view val{find_res[0].value()};
absl::Span<string_view> span{&val, 1};
OpPush(op_args, args.front(), ListDir::LEFT, false, span);
} else {
OpPop(op_args, args.front(), ListDir::RIGHT, 1, false);
}
return OpStatus::OK;
};
cntx->transaction->Execute(move(cb), true);
result = std::move(find_res[0].value());
}
}

if (result) {
return (*cntx)->SendBulkString(*result);
}

switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
(*cntx)->SendNull();
break;

default:
(*cntx)->SendError(result.status());
break;
}
MoveGeneric(cntx, src, dest, ListDir::RIGHT, ListDir::LEFT);
}

void ListFamily::LLen(CmdArgList args, ConnectionContext* cntx) {
Expand Down Expand Up @@ -767,6 +704,35 @@ void ListFamily::BRPop(CmdArgList args, ConnectionContext* cntx) {
BPopGeneric(ListDir::RIGHT, std::move(args), cntx);
}

void ListFamily::LMove(CmdArgList args, ConnectionContext* cntx) {
std::string_view src = ArgS(args, 1);
std::string_view dest = ArgS(args, 2);
std::string_view src_dir_str = ArgS(args, 3);
std::string_view dest_dir_str = ArgS(args, 4);

ToUpper(&args[3]);
ToUpper(&args[4]);

ListDir src_dir;
ListDir dest_dir;
if (src_dir_str == "LEFT") {
src_dir = ListDir::LEFT;
} else if (src_dir_str == "RIGHT") {
src_dir = ListDir::RIGHT;
} else {
return (*cntx)->SendError(kSyntaxErr);
}
if (dest_dir_str == "LEFT") {
dest_dir = ListDir::LEFT;
} else if (dest_dir_str == "RIGHT") {
dest_dir = ListDir::RIGHT;
} else {
return (*cntx)->SendError(kSyntaxErr);
}

MoveGeneric(cntx, src, dest, src_dir, dest_dir);
}

void ListFamily::BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx) {
DCHECK_GE(args.size(), 3u);

Expand Down Expand Up @@ -876,6 +842,78 @@ void ListFamily::PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cnt
}
}

void ListFamily::MoveGeneric(ConnectionContext* cntx, string_view src, string_view dest,
ListDir src_dir, ListDir dest_dir) {
OpResult<string> result;

if (cntx->transaction->unique_shard_cnt() == 1) {
auto cb = [&](Transaction* t, EngineShard* shard) {
return OpMoveSingleShard(t->GetOpArgs(shard), src, dest, src_dir, dest_dir);
};

result = cntx->transaction->ScheduleSingleHopT(std::move(cb));
} else {
CHECK_EQ(2u, cntx->transaction->unique_shard_cnt());

OpResult<string> find_res[2];

// Transaction is comprised of 2 hops:
// 1 - check for entries existence, their types and if possible -
// read the value we may move from the source list.
// 2. If everything is ok, pop from source and push the peeked value into
// the destination.
//
cntx->transaction->Schedule();
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
DCHECK_EQ(1u, args.size());
bool is_dest = args.front() == dest;
find_res[is_dest] = Peek(t->GetOpArgs(shard), args.front(), src_dir, !is_dest);
return OpStatus::OK;
};

cntx->transaction->Execute(move(cb), false);

if (!find_res[0] || find_res[1].status() == OpStatus::WRONG_TYPE) {
auto cb = [&](Transaction* t, EngineShard* shard) { return OpStatus::OK; };
cntx->transaction->Execute(move(cb), true);
result = find_res[0] ? find_res[1] : find_res[0];
} else {
// Everything is ok, lets proceed with the mutations.
auto cb = [&](Transaction* t, EngineShard* shard) {
auto args = t->ShardArgsInShard(shard->shard_id());
bool is_dest = args.front() == dest;
OpArgs op_args = t->GetOpArgs(shard);

if (is_dest) {
string_view val{find_res[0].value()};
absl::Span<string_view> span{&val, 1};
OpPush(op_args, args.front(), dest_dir, false, span);
} else {
OpPop(op_args, args.front(), src_dir, 1, false);
}
return OpStatus::OK;
};
cntx->transaction->Execute(move(cb), true);
result = std::move(find_res[0].value());
}
}

if (result) {
return (*cntx)->SendBulkString(*result);
}

switch (result.status()) {
case OpStatus::KEY_NOTFOUND:
(*cntx)->SendNull();
break;

default:
(*cntx)->SendError(result.status());
break;
}
}

OpResult<uint32_t> ListFamily::OpLen(const OpArgs& op_args, std::string_view key) {
auto res = op_args.shard->db_slice().Find(op_args.db_cntx, key, OBJ_LIST);
if (!res)
Expand Down Expand Up @@ -1153,7 +1191,8 @@ void ListFamily::Register(CommandRegistry* registry) {
<< CI{"LRANGE", CO::READONLY, 4, 1, 1, 1}.HFUNC(LRange)
<< CI{"LSET", CO::WRITE | CO::DENYOOM, 4, 1, 1, 1}.HFUNC(LSet)
<< CI{"LTRIM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LTrim)
<< CI{"LREM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LRem);
<< CI{"LREM", CO::WRITE, 4, 1, 1, 1}.HFUNC(LRem)
<< CI{"LMOVE", CO::WRITE | CO::DENYOOM, 5, 1, 2, 1}.HFUNC(LMove);
}

} // namespace dfly
3 changes: 3 additions & 0 deletions src/server/list_family.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,13 @@ class ListFamily {
static void LRem(CmdArgList args, ConnectionContext* cntx);
static void LSet(CmdArgList args, ConnectionContext* cntx);
static void RPopLPush(CmdArgList args, ConnectionContext* cntx);
static void LMove(CmdArgList args, ConnectionContext* cntx);

static void PopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx);
static void PushGeneric(ListDir dir, bool skip_notexist, CmdArgList args,
ConnectionContext* cntx);
static void MoveGeneric(ConnectionContext* cntx, std::string_view src, std::string_view dest,
ListDir src_dir, ListDir dest_dir);

static void BPopGeneric(ListDir dir, CmdArgList args, ConnectionContext* cntx);

Expand Down
Loading

0 comments on commit 15725f4

Please sign in to comment.