Skip to content

Commit

Permalink
Merge b001939 into cb1675a
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Nov 27, 2024
2 parents cb1675a + b001939 commit 144e0e0
Show file tree
Hide file tree
Showing 3 changed files with 212 additions and 9 deletions.
17 changes: 10 additions & 7 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
}

void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats*) override {
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats* mstats) override {
if (last) {
NYql::NDqProto::TDqTableStats* tableStats = nullptr;
for (auto& table : *stats->MutableTables()) {
Expand All @@ -81,9 +81,12 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
tableStats->SetTablePath(StreamLookupWorker->GetTablePath());
}

ui64 consumedRows = mstats ? mstats->Inputs[InputIndex]->RowsConsumed : ReadRowsCount;
ui64 consumedBytes = mstats ? mstats->Inputs[InputIndex]->BytesConsumed : ReadBytesCount;

// TODO: use evread statistics after KIKIMR-16924
tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount);
tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount);
tableStats->SetReadRows(tableStats->GetReadRows() + consumedRows);
tableStats->SetReadBytes(tableStats->GetReadBytes() + consumedBytes);
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());

NKqpProto::TKqpTableExtraStats tableExtraStats;
Expand Down Expand Up @@ -148,7 +151,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
};

struct TEvRetryRead : public TEventLocal<TEvRetryRead, EvRetryRead> {
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
: ReadId(readId)
, LastSeqNo(lastSeqNo)
, InstantStart(instantStart) {
Expand Down Expand Up @@ -259,7 +262,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
if (ev->Get()->Request->ErrorCount > 0) {
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
<< StreamLookupWorker->GetTablePath();
LookupActorStateSpan.EndError(errorMsg);

Expand Down Expand Up @@ -419,7 +422,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
auto readIt = Reads.find(ev->Get()->ReadId);
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
auto& read = readIt->second;

if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) {
if (ev->Get()->InstantStart) {
read.SetFinished();
Expand Down Expand Up @@ -566,7 +569,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));

Counters->IteratorsShardResolve->Inc();
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
Expand Down
8 changes: 6 additions & 2 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);

i64 rowSize = 0;
i64 storageRowSize = 0;
for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
const auto& column = Columns[colIndex];
if (IsSystemColumn(column.Name)) {
NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
rowSize += sizeof(NUdf::TUnboxedValue);
} else {
YQL_ENSURE(resultColIndex < resultRow.size());
storageRowSize += resultRow[resultColIndex].Size();
rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
++resultColIndex;
Expand All @@ -370,10 +372,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {

batch.push_back(std::move(row));

storageRowSize = std::max(storageRowSize, (i64)8);

resultStats.ReadRowsCount += 1;
resultStats.ReadBytesCount += rowSize;
resultStats.ReadBytesCount += storageRowSize;
resultStats.ResultRowsCount += 1;
resultStats.ResultBytesCount += rowSize;
resultStats.ResultBytesCount += storageRowSize;
}

if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
Expand Down
196 changes: 196 additions & 0 deletions ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,202 @@ Y_UNIT_TEST_SUITE(KqpCost) {
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
}

Y_UNIT_TEST_TWIN(IndexLookup, StreamLookup) {
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/SecondaryKeys` (
Key Int32,
Fk Int32,
Value String,
ValueInt Int32,
PRIMARY KEY (Key),
INDEX Index GLOBAL ON (Fk)
);
)").GetValueSync();

session.ExecuteDataQuery(R"(
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
(1, 1, "Payload1", 100),
(2, 2, "Payload2", 200),
(5, 5, "Payload5", 500),
(NULL, 6, "Payload6", 600),
(7, NULL, "Payload7", 700),
(NULL, NULL, "Payload8", 800);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();

auto query = Q_(R"(
SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1;
)");

auto txControl = TTxControl::BeginTx().CommitTx();

auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);

CompareYson(R"(
[
[["Payload1"]]
]
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

std::unordered_map<TString, std::pair<int, int>> readsByTable;
for(const auto& queryPhase : stats.query_phases()) {
for(const auto& tableAccess: queryPhase.table_access()) {
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
it->second.first += tableAccess.reads().rows();
it->second.second += tableAccess.reads().bytes();
}
}

for(const auto& [name, rowsAndBytes]: readsByTable) {
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
}

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8);
}

Y_UNIT_TEST_TWIN(IndexLookupAtLeast8BytesInStorage, StreamLookup) {
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/SecondaryKeys` (
Key Int32,
Fk Int32,
Value String,
ValueInt Int32,
PRIMARY KEY (Key),
INDEX Index GLOBAL ON (Fk)
);
)").GetValueSync();

session.ExecuteDataQuery(R"(
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
(1, 1, "Payload1", 100),
(2, 2, "Payload2", 200),
(5, 5, "Payload5", 500),
(NULL, 6, "Payload6", 600),
(7, NULL, "Payload7", 700),
(NULL, NULL, "Payload8", 800);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();

auto query = Q_(R"(
SELECT ValueInt FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1;
)");

auto txControl = TTxControl::BeginTx().CommitTx();

auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);

CompareYson(R"(
[
[[100]]
]
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

std::unordered_map<TString, std::pair<int, int>> readsByTable;
for(const auto& queryPhase : stats.query_phases()) {
for(const auto& tableAccess: queryPhase.table_access()) {
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
it->second.first += tableAccess.reads().rows();
it->second.second += tableAccess.reads().bytes();
}
}

for(const auto& [name, rowsAndBytes]: readsByTable) {
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
}

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
// 4 bytes is unexpected, because datashards has 8 bytes per row in storage.
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8);
}

Y_UNIT_TEST_TWIN(IndexLookupAndTake, StreamLookup) {
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/SecondaryKeys` (
Key Int32,
Fk Int32,
Value String,
ValueInt Int32,
PRIMARY KEY (Key),
INDEX Index GLOBAL ON (Fk)
);
)").GetValueSync();

session.ExecuteDataQuery(R"(
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
(1, 1, "Payload1", 100),
(2, 2, "Payload2", 200),
(5, 5, "Payload5", 500),
(NULL, 6, "Payload6", 600),
(7, NULL, "Payload7", 700),
(NULL, NULL, "Payload8", 800);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();

auto query = Q_(R"(
SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk >= 1 and Fk <= 2 AND StartsWith(Value, "Payload") LIMIT 1;
)");

auto txControl = TTxControl::BeginTx().CommitTx();

auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);

CompareYson(R"(
[
[["Payload1"]]
]
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

std::unordered_map<TString, std::pair<int, int>> readsByTable;
for(const auto& queryPhase : stats.query_phases()) {
for(const auto& tableAccess: queryPhase.table_access()) {
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
it->second.first += tableAccess.reads().rows();
it->second.second += tableAccess.reads().bytes();
}
}

for(const auto& [name, rowsAndBytes]: readsByTable) {
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
}

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 2);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 16);
}

Y_UNIT_TEST(PointLookup) {
TKikimrRunner kikimr(GetAppConfig(false, false));
auto db = kikimr.GetTableClient();
Expand Down

0 comments on commit 144e0e0

Please sign in to comment.