Skip to content

Commit

Permalink
Merge 774044a into ae1af5f
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Nov 26, 2024
2 parents ae1af5f + 774044a commit aeb3121
Show file tree
Hide file tree
Showing 2 changed files with 137 additions and 2 deletions.
8 changes: 6 additions & 2 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);

i64 rowSize = 0;
i64 storageRowSize = 0;
for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
const auto& column = Columns[colIndex];
if (IsSystemColumn(column.Name)) {
NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
rowSize += sizeof(NUdf::TUnboxedValue);
} else {
YQL_ENSURE(resultColIndex < resultRow.size());
storageRowSize += resultRow[resultColIndex].Size();
rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
++resultColIndex;
Expand All @@ -370,10 +372,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {

batch.push_back(std::move(row));

storageRowSize = std::max(storageRowSize, (i64)8);

resultStats.ReadRowsCount += 1;
resultStats.ReadBytesCount += rowSize;
resultStats.ReadBytesCount += storageRowSize;
resultStats.ResultRowsCount += 1;
resultStats.ResultBytesCount += rowSize;
resultStats.ResultBytesCount += storageRowSize;
}

if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
Expand Down
131 changes: 131 additions & 0 deletions ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,137 @@ Y_UNIT_TEST_SUITE(KqpCost) {
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
}

Y_UNIT_TEST_TWIN(IndexLookup, StreamLookup) {
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/SecondaryKeys` (
Key Int32,
Fk Int32,
Value String,
ValueInt Int32,
PRIMARY KEY (Key),
INDEX Index GLOBAL ON (Fk)
);
)").GetValueSync();

session.ExecuteDataQuery(R"(
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
(1, 1, "Payload1", 100),
(2, 2, "Payload2", 200),
(5, 5, "Payload5", 500),
(NULL, 6, "Payload6", 600),
(7, NULL, "Payload7", 700),
(NULL, NULL, "Payload8", 800);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();

auto query = Q_(R"(
SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1;
)");

auto txControl = TTxControl::BeginTx().CommitTx();

auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);

CompareYson(R"(
[
[["Payload1"]]
]
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

std::unordered_map<TString, std::pair<int, int>> readsByTable;
for(const auto& queryPhase : stats.query_phases()) {
for(const auto& tableAccess: queryPhase.table_access()) {
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
it->second.first += tableAccess.reads().rows();
it->second.second += tableAccess.reads().bytes();
}
}

for(const auto& [name, rowsAndBytes]: readsByTable) {
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
}

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8);
}

Y_UNIT_TEST_TWIN(IndexLookupAtLeast8BytesInStorage, StreamLookup) {
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/SecondaryKeys` (
Key Int32,
Fk Int32,
Value String,
ValueInt Int32,
PRIMARY KEY (Key),
INDEX Index GLOBAL ON (Fk)
);
)").GetValueSync();

session.ExecuteDataQuery(R"(
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
(1, 1, "Payload1", 100),
(2, 2, "Payload2", 200),
(5, 5, "Payload5", 500),
(NULL, 6, "Payload6", 600),
(7, NULL, "Payload7", 700),
(NULL, NULL, "Payload8", 800);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();

auto query = Q_(R"(
SELECT ValueInt FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1;
)");

auto txControl = TTxControl::BeginTx().CommitTx();

auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);

CompareYson(R"(
[
[[100]]
]
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

std::unordered_map<TString, std::pair<int, int>> readsByTable;
for(const auto& queryPhase : stats.query_phases()) {
for(const auto& tableAccess: queryPhase.table_access()) {
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
it->second.first += tableAccess.reads().rows();
it->second.second += tableAccess.reads().bytes();
}
}

for(const auto& [name, rowsAndBytes]: readsByTable) {
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
}

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
// 4 bytes is unexpected, because datashards has 8 bytes per row in storage.
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8);
}

Y_UNIT_TEST(PointLookup) {
TKikimrRunner kikimr(GetAppConfig(false, false));
auto db = kikimr.GetTableClient();
Expand Down

0 comments on commit aeb3121

Please sign in to comment.