Skip to content

Commit

Permalink
YMQ metering: add test (#12645)
Browse files Browse the repository at this point in the history
  • Loading branch information
qyryq authored Dec 17, 2024
1 parent 99fdf00 commit b2d56de
Showing 1 changed file with 110 additions and 6 deletions.
116 changes: 110 additions & 6 deletions ydb/core/http_proxy/ut/ymq_ut.h
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

#include <library/cpp/json/writer/json_value.h>
#include <library/cpp/testing/unittest/registar.h>
#include <library/cpp/scheme/scheme.h>
#include <library/cpp/string_utils/base64/base64.h>
#include <ydb/core/ymq/actor/metering.h>

#include <chrono>
#include <thread>
Expand Down Expand Up @@ -182,18 +184,120 @@ Y_UNIT_TEST_SUITE(TestYmqHttpProxy) {
}

Y_UNIT_TEST_F(BillingRecordsForJsonApi, THttpProxyTestMockWithMetering) {
auto meteringLogFilePath = KikimrServer->ServerSettings->AppConfig->GetSqsConfig().GetMeteringLogFilePath();

// loadBillingRecords was copied from metering_ut.cpp.
// Probably, that file is a better place for this test.
auto loadBillingRecords = [](const TString& filepath) -> TVector<NSc::TValue> {
TString data = TFileInput(filepath).ReadAll();
auto rawRecords = SplitString(data, "\n");
TVector<NSc::TValue> records;
for (auto& record : rawRecords) {
records.push_back(NSc::TValue::FromJson(record));
}
return records;
};

TVector<NSc::TValue> records;
auto waitBillingRecords = [&]() {
static size_t expectedCount = 0;
expectedCount += 3; // 2 traffic records, 1 request record
while (records.size() != expectedCount) {
Sleep(TDuration::Seconds(1));
records = loadBillingRecords(meteringLogFilePath);
}
};

auto json = CreateQueue({{"QueueName", "ExampleQueueName"}});
auto queueUrl = GetByPath<TString>(json, "QueueUrl");
waitBillingRecords();

json = SendMessage({
SendMessage({
{"QueueUrl", queueUrl},
{"MessageBody", "MessageBody-0"}
{"MessageBody", TString(1_KB, 'x')}, // 1 request unit
});
waitBillingRecords();

json = ReceiveMessage({
{"QueueUrl", queueUrl},
{"WaitTimeSeconds", 20},
});
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 1);
waitBillingRecords();

SendMessage({
{"QueueUrl", queueUrl},
{"MessageBody", TString(150_KB, 'x')}, // 3 request units
});
waitBillingRecords();

json = ReceiveMessage({
{"QueueUrl", queueUrl},
{"WaitTimeSeconds", 20},
});
UNIT_ASSERT_VALUES_EQUAL(json["Messages"].GetArray().size(), 1);
waitBillingRecords();

// TODO:
// Sleep(TDuration::Seconds(500));
// TVector<NSc::TValue> records = LoadBillingRecords(sqsConfig.GetMeteringLogFilePath());
// CheckBillingRecord(records, expectedRecords);
auto makeTags = [](TVector<std::pair<TString, TString>> pairs) {
NSc::TValue tags;
tags.SetDict();
for (auto const& [k, v] : pairs) {
tags[k] = v;
}
return tags;
};
auto makeRecord = [&makeTags](TString type, TString resourceId, size_t quantity, TVector<std::pair<TString, TString>> tags) {
return NKikimr::NSQS::CreateMeteringBillingRecord(
"folder4",
resourceId,
type,
"fqdn",
TInstant::Now(),
quantity,
type == "ymq.traffic.v1" ? "byte" : "request",
makeTags(tags)
);
};
auto asExpected = [](NSc::TValue record, NSc::TValue expected) {
return record["folder_id"] == expected["folder_id"] &&
record["resource_id"] == expected["resource_id"] &&
record["schema"] == expected["schema"] &&
record["usage"]["unit"] == expected["usage"]["unit"] &&
(record["schema"] != "ymq.requests.v1" || record["usage"]["quantity"] == expected["usage"]["quantity"]) &&
record["tags"]["direction"] == expected["tags"]["direction"] &&
record["tags"]["type"] == expected["tags"]["type"] &&
record["tags"]["queue_type"] == expected["tags"]["queue_type"];
};

TVector<NSc::TValue> expectedRecords{
// CreateQueue
makeRecord("ymq.traffic.v1", "", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "", 1, {{"queue_type", "other"}}),

// SendMessage 1 KB
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "000000000000000101v0", 1, {{"queue_type", "std"}}),

// ReceiveMessage 1 KB
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "000000000000000101v0", 1, {{"queue_type", "std"}}),

// SendMessage 150 KB
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "000000000000000101v0", 3, {{"queue_type", "std"}}),

// ReceiveMessage 150 KB
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "ingress"}, {"type", "inet"}}),
makeRecord("ymq.traffic.v1", "000000000000000101v0", 0, {{"direction", "egress"}, {"type", "inet"}}),
makeRecord("ymq.requests.v1", "000000000000000101v0", 3, {{"queue_type", "std"}}),
};
for (size_t i = 0; i < records.size(); ++i) {
UNIT_ASSERT(asExpected(records[i], expectedRecords[i]));
}
}

Y_UNIT_TEST_F(TestSendMessageEmptyQueueUrl, THttpProxyTestMockForSQS) {
Expand Down

0 comments on commit b2d56de

Please sign in to comment.