Skip to content

Commit

Permalink
YMQ tags (#12872)
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq authored Jan 15, 2025
1 parent e46360e commit 6bcf4a6
Show file tree
Hide file tree
Showing 59 changed files with 1,666 additions and 185 deletions.
3 changes: 3 additions & 0 deletions ydb/core/grpc_services/service_ymq.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,8 @@ void DoYmqSendMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacil
void DoYmqDeleteMessageBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqChangeMessageVisibilityBatchRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqListDeadLetterSourceQueuesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqListQueueTagsRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqTagQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoYmqUntagQueueRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
}
}
1 change: 1 addition & 0 deletions ydb/core/http_proxy/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ namespace NKikimr::NHttpProxy {
struct TEvGrpcRequestResult : public TEventLocal<TEvGrpcRequestResult, EvGrpcRequestResult> {
THolder<google::protobuf::Message> Message;
THolder<NYdb::TStatus> Status;
THolder<THashMap<TString, TString>> QueueTags;
};

struct TEvDiscoverDatabaseEndpointRequest : public TEventLocal<TEvDiscoverDatabaseEndpointRequest, EvDiscoverDatabaseEndpointRequest> {
Expand Down
51 changes: 18 additions & 33 deletions ydb/core/http_proxy/http_req.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,14 @@ namespace NKikimr::NHttpProxy {
NYdb::EStatus(response.operation().status()),
std::move(issues)
);
Ydb::Ymq::V1::QueueTags queueTags;
response.operation().metadata().UnpackTo(&queueTags);
for (const auto& [k, v] : queueTags.GetTags()) {
if (!result->QueueTags.Get()) {
result->QueueTags = MakeHolder<THashMap<TString, TString>>();
}
result->QueueTags->emplace(k, v);
}
actorSystem->Send(actorId, result.Release());
}
);
Expand Down Expand Up @@ -374,6 +382,9 @@ namespace NKikimr::NHttpProxy {
);
HttpContext.ResponseData.IsYmq = true;
HttpContext.ResponseData.YmqHttpCode = 200;
if (ev->Get()->QueueTags) {
HttpContext.ResponseData.QueueTags = std::move(*ev->Get()->QueueTags);
}
ReplyToHttpContext(ctx);
} else {
auto retryClass = NYdb::NTopic::GetRetryErrorClass(ev->Get()->Status->GetStatus());
Expand Down Expand Up @@ -510,40 +521,8 @@ namespace NKikimr::NHttpProxy {
SendGrpcRequestNoDriver(ctx);
} else {
auto requestHolder = MakeHolder<NKikimrClient::TSqsRequest>();
// TODO? action = NSQS::ActionFromString(Method);
NSQS::EAction action = NSQS::EAction::Unknown;
if (Method == "CreateQueue") {
action = NSQS::EAction::CreateQueue;
} else if (Method == "GetQueueUrl") {
action = NSQS::EAction::GetQueueUrl;
} else if (Method == "SendMessage") {
action = NSQS::EAction::SendMessage;
} else if (Method == "ReceiveMessage") {
action = NSQS::EAction::ReceiveMessage;
} else if (Method == "GetQueueAttributes") {
action = NSQS::EAction::GetQueueAttributes;
} else if (Method == "ListQueues") {
action = NSQS::EAction::ListQueues;
} else if (Method == "DeleteMessage") {
action = NSQS::EAction::DeleteMessage;
} else if (Method == "PurgeQueue") {
action = NSQS::EAction::PurgeQueue;
} else if (Method == "DeleteQueue") {
action = NSQS::EAction::DeleteQueue;
} else if (Method == "ChangeMessageVisibility") {
action = NSQS::EAction::ChangeMessageVisibility;
} else if (Method == "SetQueueAttributes") {
action = NSQS::EAction::SetQueueAttributes;
} else if (Method == "SendMessageBatch") {
action = NSQS::EAction::SendMessageBatch;
}else if (Method == "DeleteMessageBatch") {
action = NSQS::EAction::DeleteMessageBatch;
} else if (Method == "ChangeMessageVisibilityBatch") {
action = NSQS::EAction::ChangeMessageVisibilityBatch;
} else if (Method == "ListDeadLetterSourceQueues") {
action = NSQS::EAction::ListDeadLetterSourceQueues;
}

NSQS::EAction action = NSQS::ActionFromString(Method);
requestHolder->SetRequestId(HttpContext.RequestId);

NSQS::TAuthActorData data {
Expand Down Expand Up @@ -1081,6 +1060,9 @@ namespace NKikimr::NHttpProxy {
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(DeleteMessageBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ChangeMessageVisibilityBatch);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ListDeadLetterSourceQueues);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(ListQueueTags);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(TagQueue);
DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN(UntagQueue);
#undef DECLARE_YMQ_PROCESSOR_QUEUE_KNOWN
}

Expand Down Expand Up @@ -1264,6 +1246,9 @@ namespace NKikimr::NHttpProxy {
requestAttributes.SourceAddress = SourceAddress;
requestAttributes.ResourceId = ResourceId;
requestAttributes.Action = NSQS::ActionFromString(MethodName);
for (const auto& [k, v] : ResponseData.QueueTags) {
requestAttributes.QueueTags[k] = v;
}

LOG_SP_DEBUG_S(
ctx,
Expand Down
1 change: 1 addition & 0 deletions ydb/core/http_proxy/http_req.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ struct THttpResponseData {
TString YmqStatusCode;
ui32 YmqHttpCode = 500;
bool YmqIsFifo = false;
THashMap<TString, TString> QueueTags;

TString DumpBody(MimeTypes contentType);
};
Expand Down
19 changes: 18 additions & 1 deletion ydb/core/http_proxy/ut/datastreams_fixture.h
Original file line number Diff line number Diff line change
Expand Up @@ -351,7 +351,9 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {

NJson::TJsonMap SendJsonRequest(TString method, NJson::TJsonMap request, ui32 expectedHttpCode = 200) {
auto res = SendHttpRequest("/Root", TStringBuilder() << "AmazonSQS." << method, request, FormAuthorizationStr("ru-central1"));
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, expectedHttpCode);
if (expectedHttpCode != 0) {
UNIT_ASSERT_VALUES_EQUAL(res.HttpCode, expectedHttpCode);
}
NJson::TJsonMap json;
UNIT_ASSERT(NJson::ReadJsonTree(res.Body, &json));
return json;
Expand Down Expand Up @@ -405,6 +407,18 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
return SendJsonRequest("SetQueueAttributes", request, expectedHttpCode);
}

NJson::TJsonMap ListQueueTags(NJson::TJsonMap request, ui32 expectedHttpCode = 200) {
return SendJsonRequest("ListQueueTags", request, expectedHttpCode);
}

NJson::TJsonMap TagQueue(NJson::TJsonMap request = {}, ui32 expectedHttpCode = 200) {
return SendJsonRequest("TagQueue", request, expectedHttpCode);
}

NJson::TJsonMap UntagQueue(NJson::TJsonMap request = {}, ui32 expectedHttpCode = 200) {
return SendJsonRequest("UntagQueue", request, expectedHttpCode);
}

void WaitQueueAttributes(TString queueUrl, size_t retries, NJson::TJsonMap attributes) {
WaitQueueAttributes(queueUrl, retries, [&attributes](NJson::TJsonMap json) {
for (const auto& [k, v] : attributes.GetMapSafe()) {
Expand Down Expand Up @@ -534,6 +548,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
ActorRuntime->SetLogPriority(NKikimrServices::HTTP_PROXY, NLog::PRI_DEBUG);
ActorRuntime->SetLogPriority(NActorsServices::EServiceCommon::HTTP, NLog::PRI_DEBUG);
ActorRuntime->SetLogPriority(NKikimrServices::TICKET_PARSER, NLog::PRI_TRACE);
ActorRuntime->SetLogPriority(NKikimrServices::SQS, NLog::PRI_TRACE);

if (enableMetering) {
ActorRuntime->RegisterService(
Expand Down Expand Up @@ -572,6 +587,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
"Columns { Name: \"Version\" Type: \"Uint64\"}"
"Columns { Name: \"DlqName\" Type: \"Utf8\"}"
"Columns { Name: \"TablesFormat\" Type: \"Uint32\"}"
"Columns { Name: \"Tags\" Type: \"Utf8\"}"
"KeyColumnNames: [\"Account\", \"QueueName\"]"
);

Expand Down Expand Up @@ -710,6 +726,7 @@ class THttpProxyTestMock : public NUnitTest::TBaseFixture {
"Columns { Name: \"CustomQueueName\" Type: \"Utf8\"}"
"Columns { Name: \"EventTimestamp\" Type: \"Uint64\"}"
"Columns { Name: \"FolderId\" Type: \"Utf8\"}"
"Columns { Name: \"Labels\" Type: \"Utf8\"}"
"KeyColumnNames: [\"Account\", \"QueueName\", \"EventType\"]"
);

Expand Down
Loading

0 comments on commit 6bcf4a6

Please sign in to comment.