Skip to content

Commit

Permalink
refactor: use CmdArgParser for XGROUP command
Browse files Browse the repository at this point in the history
  • Loading branch information
BorysTheDev committed Sep 18, 2024
1 parent f122a19 commit 222e98a
Showing 1 changed file with 70 additions and 85 deletions.
155 changes: 70 additions & 85 deletions src/server/stream_family.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1797,19 +1797,18 @@ OpResult<PendingResult> OpPending(const OpArgs& op_args, string_view key, const
return result;
}

void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) {
if (args.size() < 2)
return cntx->SendError(UnknownSubCmd("CREATE", "XGROUP"));
void CreateGroup(facade::CmdArgParser* parser, ConnectionContext* cntx) {
auto key = parser->Next();

CreateOpts opts;
opts.gname = ArgS(args, 0);
opts.id = ArgS(args, 1);
if (args.size() >= 3) {
ToUpper(&args[2]);
if (ArgS(args, 2) == "MKSTREAM")
opts.flags |= kCreateOptMkstream;
std::tie(opts.gname, opts.id) = parser->Next<string_view, string_view>();
if (parser->Check("MKSTREAM")) {
opts.flags |= kCreateOptMkstream;
}

if (auto err = parser->Error(); err)
return cntx->SendError(err->MakeReply());

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpCreate(t->GetOpArgs(shard), key, opts);
};
Expand All @@ -1823,7 +1822,15 @@ void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) {
}
}

void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) {
void DestroyGroup(facade::CmdArgParser* parser, ConnectionContext* cntx) {
auto [key, gname] = parser->Next<string_view, string_view>();

if (auto err = parser->Error(); err)
return cntx->SendError(err->MakeReply());

if (parser->HasNext())
return cntx->SendError(UnknownSubCmd("DESTROY", "XGROUP"));

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpDestroyGroup(t->GetOpArgs(shard), key, gname);
};
Expand All @@ -1841,8 +1848,15 @@ void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) {
}
}

void CreateConsumer(string_view key, string_view gname, string_view consumer,
ConnectionContext* cntx) {
void CreateConsumer(facade::CmdArgParser* parser, ConnectionContext* cntx) {
auto [key, gname, consumer] = parser->Next<string_view, string_view, string_view>();

if (auto err = parser->Error(); err)
return cntx->SendError(err->MakeReply());

if (parser->HasNext())
return cntx->SendError(UnknownSubCmd("CREATECONSUMER", "XGROUP"));

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpCreateConsumer(t->GetOpArgs(shard), key, gname, consumer);
};
Expand All @@ -1862,8 +1876,15 @@ void CreateConsumer(string_view key, string_view gname, string_view consumer,
}
}

void DelConsumer(string_view key, string_view gname, string_view consumer,
ConnectionContext* cntx) {
void DelConsumer(facade::CmdArgParser* parser, ConnectionContext* cntx) {
auto [key, gname, consumer] = parser->Next<string_view, string_view, string_view>();

if (auto err = parser->Error(); err)
return cntx->SendError(err->MakeReply());

if (parser->HasNext())
return cntx->SendError(UnknownSubCmd("DELCONSUMER", "XGROUP"));

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpDelConsumer(t->GetOpArgs(shard), key, gname, consumer);
};
Expand All @@ -1882,18 +1903,21 @@ void DelConsumer(string_view key, string_view gname, string_view consumer,
}
}

void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContext* cntx) {
facade::CmdArgParser parser{args};
void SetId(facade::CmdArgParser* parser, ConnectionContext* cntx) {
auto [key, gname, id] = parser->Next<string_view, string_view, string_view>();

string_view id = parser.Next();
while (parser.HasNext()) {
if (parser.Check("ENTRIESREAD")) {
while (parser->HasNext()) {
if (parser->Check("ENTRIESREAD")) {
// TODO: to support ENTRIESREAD.
return cntx->SendError(kSyntaxErr);
} else {
return cntx->SendError(kSyntaxErr);
}
}

if (auto err = parser->Error(); err)
return cntx->SendError(err->MakeReply());

auto cb = [&](Transaction* t, EngineShard* shard) {
return OpSetId(t->GetOpArgs(shard), key, gname, id);
};
Expand All @@ -1910,20 +1934,23 @@ void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContex
}

void XGroupHelp(CmdArgList args, ConnectionContext* cntx) {
string_view help_arr[] = {
"CREATE <key> <groupname> <id|$> [option]",
" Create a new consumer group. Options are:",
" * MKSTREAM",
" Create the empty stream if it does not exist.",
"CREATECONSUMER <key> <groupname> <consumer>",
" Create a new consumer in the specified group.",
"DELCONSUMER <key> <groupname> <consumer>",
" Remove the specified consumer.",
"DESTROY <key> <groupname>",
" Remove the specified group.",
"SETID <key> <groupname> <id|$>",
" Set the current group ID.",
};
string_view help_arr[] = {"XGROUP <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"CREATE <key> <groupname> <id|$> [option]",
" Create a new consumer group. Options are:",
" * MKSTREAM",
" Create the empty stream if it does not exist.",
" * ENTRIESREAD entries_read",
" Set the group's entries_read counter (internal use).",
"CREATECONSUMER <key> <groupname> <consumer>",
" Create a new consumer in the specified group.",
"DELCONSUMER <key> <groupname> <consumer>",
" Remove the specified consumer.",
"DESTROY <key> <groupname>",
" Remove the specified group.",
"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
" Set the current group ID and entries_read counter.",
"HELP",
" Print this help."};
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
return rb->SendSimpleStrArr(help_arr);
}
Expand Down Expand Up @@ -2316,63 +2343,21 @@ void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) {
cntx->SendError(result.status());
}

void HelpSubCmd(facade::CmdArgParser* parser, ConnectionContext* cntx) {
XGroupHelp(parser->Tail(), cntx);
}

void StreamFamily::XGroup(CmdArgList args, ConnectionContext* cntx) {
facade::CmdArgParser parser{args};

string_view sub_cmd = parser.ToUpper().Next();
if (sub_cmd == "HELP") {
string_view help[] = {"XGROUP <subcommand> [<arg> [value] [opt] ...]. Subcommands are:",
"CREATE <key> <groupname> <id|$> [option]",
" Create a new consumer group. Options are:",
" * MKSTREAM",
" Create the empty stream if it does not exist.",
" * ENTRIESREAD entries_read",
" Set the group's entries_read counter (internal use).",
"CREATECONSUMER <key> <groupname> <consumer>",
" Create a new consumer in the specified group.",
"DELCONSUMER <key> <groupname> <consumer>",
" Remove the specified consumer.",
"DESTROY <key> <groupname>",
" Remove the specified group.",
"SETID <key> <groupname> <id|$> [ENTRIESREAD entries_read]",
" Set the current group ID and entries_read counter.",
"HELP",
" Print this help."};
auto* rb = static_cast<RedisReplyBuilder*>(cntx->reply_builder());
return rb->SendStringArr(help);
}

if (!parser.HasAtLeast(2))
return cntx->SendError(kSyntaxErr, kScriptErrType);

string_view key = parser.Next();

if (sub_cmd == "CREATE") {
args.remove_prefix(2);
return CreateGroup(std::move(args), key, cntx);
}
const auto* sub_cmd_func = parser.MapNext("HELP", &HelpSubCmd, "CREATE", &CreateGroup, "DESTROY",
&DestroyGroup, "CREATECONSUMER", &CreateConsumer,
"DELCONSUMER", &DelConsumer, "SETID", &SetId);

string_view gname = parser.Next();
if (sub_cmd == "DESTROY" && args.size() == 3) {
return DestroyGroup(key, gname, cntx);
}

if (sub_cmd == "CREATECONSUMER" && args.size() == 4) {
string_view cname = parser.Next();
return CreateConsumer(key, gname, cname, cntx);
}

if (sub_cmd == "DELCONSUMER" && args.size() == 4) {
string_view cname = parser.Next();
return DelConsumer(key, gname, cname, cntx);
}

if (sub_cmd == "SETID" && args.size() >= 4) {
args.remove_prefix(3);
return SetId(key, gname, std::move(args), cntx);
}
if (auto err = parser.Error(); err)
return cntx->SendError(err->MakeReply());

return cntx->SendError(UnknownSubCmd(sub_cmd, "XGROUP"));
sub_cmd_func(&parser, cntx);
}

void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {
Expand Down

0 comments on commit 222e98a

Please sign in to comment.