diff --git a/src/server/stream_family.cc b/src/server/stream_family.cc index 444701393982..07ce58444736 100644 --- a/src/server/stream_family.cc +++ b/src/server/stream_family.cc @@ -1797,19 +1797,18 @@ OpResult OpPending(const OpArgs& op_args, string_view key, const return result; } -void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) { - if (args.size() < 2) - return cntx->SendError(UnknownSubCmd("CREATE", "XGROUP")); +void CreateGroup(facade::CmdArgParser* parser, ConnectionContext* cntx) { + auto key = parser->Next(); CreateOpts opts; - opts.gname = ArgS(args, 0); - opts.id = ArgS(args, 1); - if (args.size() >= 3) { - ToUpper(&args[2]); - if (ArgS(args, 2) == "MKSTREAM") - opts.flags |= kCreateOptMkstream; + std::tie(opts.gname, opts.id) = parser->Next(); + if (parser->Check("MKSTREAM")) { + opts.flags |= kCreateOptMkstream; } + if (auto err = parser->Error(); err) + return cntx->SendError(err->MakeReply()); + auto cb = [&](Transaction* t, EngineShard* shard) { return OpCreate(t->GetOpArgs(shard), key, opts); }; @@ -1823,7 +1822,15 @@ void CreateGroup(CmdArgList args, string_view key, ConnectionContext* cntx) { } } -void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) { +void DestroyGroup(facade::CmdArgParser* parser, ConnectionContext* cntx) { + auto [key, gname] = parser->Next(); + + if (auto err = parser->Error(); err) + return cntx->SendError(err->MakeReply()); + + if (parser->HasNext()) + return cntx->SendError(UnknownSubCmd("DESTROY", "XGROUP")); + auto cb = [&](Transaction* t, EngineShard* shard) { return OpDestroyGroup(t->GetOpArgs(shard), key, gname); }; @@ -1841,8 +1848,15 @@ void DestroyGroup(string_view key, string_view gname, ConnectionContext* cntx) { } } -void CreateConsumer(string_view key, string_view gname, string_view consumer, - ConnectionContext* cntx) { +void CreateConsumer(facade::CmdArgParser* parser, ConnectionContext* cntx) { + auto [key, gname, consumer] = parser->Next(); + + if (auto err = parser->Error(); err) + return cntx->SendError(err->MakeReply()); + + if (parser->HasNext()) + return cntx->SendError(UnknownSubCmd("CREATECONSUMER", "XGROUP")); + auto cb = [&](Transaction* t, EngineShard* shard) { return OpCreateConsumer(t->GetOpArgs(shard), key, gname, consumer); }; @@ -1862,8 +1876,15 @@ void CreateConsumer(string_view key, string_view gname, string_view consumer, } } -void DelConsumer(string_view key, string_view gname, string_view consumer, - ConnectionContext* cntx) { +void DelConsumer(facade::CmdArgParser* parser, ConnectionContext* cntx) { + auto [key, gname, consumer] = parser->Next(); + + if (auto err = parser->Error(); err) + return cntx->SendError(err->MakeReply()); + + if (parser->HasNext()) + return cntx->SendError(UnknownSubCmd("DELCONSUMER", "XGROUP")); + auto cb = [&](Transaction* t, EngineShard* shard) { return OpDelConsumer(t->GetOpArgs(shard), key, gname, consumer); }; @@ -1882,18 +1903,21 @@ void DelConsumer(string_view key, string_view gname, string_view consumer, } } -void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContext* cntx) { - facade::CmdArgParser parser{args}; +void SetId(facade::CmdArgParser* parser, ConnectionContext* cntx) { + auto [key, gname, id] = parser->Next(); - string_view id = parser.Next(); - while (parser.HasNext()) { - if (parser.Check("ENTRIESREAD")) { + while (parser->HasNext()) { + if (parser->Check("ENTRIESREAD")) { // TODO: to support ENTRIESREAD. return cntx->SendError(kSyntaxErr); } else { return cntx->SendError(kSyntaxErr); } } + + if (auto err = parser->Error(); err) + return cntx->SendError(err->MakeReply()); + auto cb = [&](Transaction* t, EngineShard* shard) { return OpSetId(t->GetOpArgs(shard), key, gname, id); }; @@ -1910,20 +1934,23 @@ void SetId(string_view key, string_view gname, CmdArgList args, ConnectionContex } void XGroupHelp(CmdArgList args, ConnectionContext* cntx) { - string_view help_arr[] = { - "CREATE [option]", - " Create a new consumer group. Options are:", - " * MKSTREAM", - " Create the empty stream if it does not exist.", - "CREATECONSUMER ", - " Create a new consumer in the specified group.", - "DELCONSUMER ", - " Remove the specified consumer.", - "DESTROY ", - " Remove the specified group.", - "SETID ", - " Set the current group ID.", - }; + string_view help_arr[] = {"XGROUP [ [value] [opt] ...]. Subcommands are:", + "CREATE [option]", + " Create a new consumer group. Options are:", + " * MKSTREAM", + " Create the empty stream if it does not exist.", + " * ENTRIESREAD entries_read", + " Set the group's entries_read counter (internal use).", + "CREATECONSUMER ", + " Create a new consumer in the specified group.", + "DELCONSUMER ", + " Remove the specified consumer.", + "DESTROY ", + " Remove the specified group.", + "SETID [ENTRIESREAD entries_read]", + " Set the current group ID and entries_read counter.", + "HELP", + " Print this help."}; auto* rb = static_cast(cntx->reply_builder()); return rb->SendSimpleStrArr(help_arr); } @@ -2316,63 +2343,21 @@ void StreamFamily::XDel(CmdArgList args, ConnectionContext* cntx) { cntx->SendError(result.status()); } +void HelpSubCmd(facade::CmdArgParser* parser, ConnectionContext* cntx) { + XGroupHelp(parser->Tail(), cntx); +} + void StreamFamily::XGroup(CmdArgList args, ConnectionContext* cntx) { facade::CmdArgParser parser{args}; - string_view sub_cmd = parser.ToUpper().Next(); - if (sub_cmd == "HELP") { - string_view help[] = {"XGROUP [ [value] [opt] ...]. Subcommands are:", - "CREATE [option]", - " Create a new consumer group. Options are:", - " * MKSTREAM", - " Create the empty stream if it does not exist.", - " * ENTRIESREAD entries_read", - " Set the group's entries_read counter (internal use).", - "CREATECONSUMER ", - " Create a new consumer in the specified group.", - "DELCONSUMER ", - " Remove the specified consumer.", - "DESTROY ", - " Remove the specified group.", - "SETID [ENTRIESREAD entries_read]", - " Set the current group ID and entries_read counter.", - "HELP", - " Print this help."}; - auto* rb = static_cast(cntx->reply_builder()); - return rb->SendStringArr(help); - } - - if (!parser.HasAtLeast(2)) - return cntx->SendError(kSyntaxErr, kScriptErrType); - - string_view key = parser.Next(); - - if (sub_cmd == "CREATE") { - args.remove_prefix(2); - return CreateGroup(std::move(args), key, cntx); - } + const auto* sub_cmd_func = parser.MapNext("HELP", &HelpSubCmd, "CREATE", &CreateGroup, "DESTROY", + &DestroyGroup, "CREATECONSUMER", &CreateConsumer, + "DELCONSUMER", &DelConsumer, "SETID", &SetId); - string_view gname = parser.Next(); - if (sub_cmd == "DESTROY" && args.size() == 3) { - return DestroyGroup(key, gname, cntx); - } - - if (sub_cmd == "CREATECONSUMER" && args.size() == 4) { - string_view cname = parser.Next(); - return CreateConsumer(key, gname, cname, cntx); - } - - if (sub_cmd == "DELCONSUMER" && args.size() == 4) { - string_view cname = parser.Next(); - return DelConsumer(key, gname, cname, cntx); - } - - if (sub_cmd == "SETID" && args.size() >= 4) { - args.remove_prefix(3); - return SetId(key, gname, std::move(args), cntx); - } + if (auto err = parser.Error(); err) + return cntx->SendError(err->MakeReply()); - return cntx->SendError(UnknownSubCmd(sub_cmd, "XGROUP")); + sub_cmd_func(&parser, cntx); } void StreamFamily::XInfo(CmdArgList args, ConnectionContext* cntx) {