diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp index 7f94a524d3cc..a70c81ac5ae3 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp @@ -40,6 +40,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped()) , NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe()) , SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT) + , LookupStrategy(settings.GetLookupStrategy()) , StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc)) , Counters(counters) , LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor") @@ -67,7 +68,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedMutableTables()) { @@ -81,9 +82,25 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedSetTablePath(StreamLookupWorker->GetTablePath()); } + ui64 rowsReadEstimate = ReadRowsCount; + ui64 bytesReadEstimate = ReadBytesCount; + + if (mstats) { + switch(LookupStrategy) { + case NKqpProto::EStreamLookupStrategy::LOOKUP: { + // in lookup case we return as result actual data, that we read from the datashard. + rowsReadEstimate = mstats->Inputs[InputIndex]->RowsConsumed; + bytesReadEstimate = mstats->Inputs[InputIndex]->BytesConsumed; + break; + } + default: + ; + } + } + // TODO: use evread statistics after KIKIMR-16924 - tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount); - tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount); + tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate); + tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate); tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size()); NKqpProto::TKqpTableExtraStats tableExtraStats; @@ -148,7 +165,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped { - explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false) + explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false) : ReadId(readId) , LastSeqNo(lastSeqNo) , InstantStart(instantStart) { @@ -259,7 +276,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedGetTablePath()); if (ev->Get()->Request->ErrorCount > 0) { - TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: " + TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: " << StreamLookupWorker->GetTablePath(); LookupActorStateSpan.EndError(errorMsg); @@ -419,7 +436,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrappedGet()->ReadId); YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId); auto& read = readIt->second; - + if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) { if (ev->Get()->InstantStart) { read.SetFinished(); @@ -566,7 +583,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped{})); Counters->IteratorsShardResolve->Inc(); - LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(), + LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(), "WaitForShardsResolve", NWilson::EFlags::AUTO_END); Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {})); @@ -625,6 +642,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped Locks; TVector BrokenLocks; + NKqpProto::EStreamLookupStrategy LookupStrategy; std::unique_ptr StreamLookupWorker; ui64 ReadId = 0; size_t TotalRetryAttempts = 0; diff --git a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp index fce412585df8..05b0e0863c03 100644 --- a/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp +++ b/ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp @@ -361,6 +361,7 @@ class TKqpLookupRows : public TKqpStreamLookupWorker { auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems); i64 rowSize = 0; + i64 storageRowSize = 0; for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) { const auto& column = Columns[colIndex]; if (IsSystemColumn(column.Name)) { @@ -368,6 +369,7 @@ class TKqpLookupRows : public TKqpStreamLookupWorker { rowSize += sizeof(NUdf::TUnboxedValue); } else { YQL_ENSURE(resultColIndex < resultRow.size()); + storageRowSize += resultRow[resultColIndex].Size(); rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType); rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes; ++resultColIndex; @@ -382,10 +384,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker { batch.push_back(std::move(row)); + storageRowSize = std::max(storageRowSize, (i64)8); + resultStats.ReadRowsCount += 1; - resultStats.ReadBytesCount += rowSize; + resultStats.ReadBytesCount += storageRowSize; resultStats.ResultRowsCount += 1; - resultStats.ResultBytesCount += rowSize; + resultStats.ResultBytesCount += storageRowSize; } if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) { @@ -907,6 +911,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { auto leftRowType = GetLeftRowType(); YQL_ENSURE(leftRowType); + i64 storageReadBytes = 0; + for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) { auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i)); leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes; @@ -928,6 +934,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType); rightRowSize += sizeof(NUdf::TUnboxedValue); } else { + storageReadBytes += rightRow[std::distance(ReadColumns.begin(), it)].Size(); rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)], column.PType); rightRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes; @@ -939,7 +946,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker { rowStats.ReadRowsCount += (leftRowInfo.RightRowExist ? 1 : 0); // TODO: use datashard statistics KIKIMR-16924 - rowStats.ReadBytesCount += rightRowSize; + rowStats.ReadBytesCount += storageReadBytes; rowStats.ResultRowsCount += 1; rowStats.ResultBytesCount += leftRowSize + rightRowSize; diff --git a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp index 7be32f87aa9b..2077c10116e0 100644 --- a/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp +++ b/ydb/core/kqp/ut/cost/kqp_cost_ut.cpp @@ -11,11 +11,12 @@ namespace NKqp { using namespace NYdb; using namespace NYdb::NTable; -static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true) { +static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true, bool streamLookupJoin = false) { auto app = NKikimrConfig::TAppConfig(); app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(sourceRead); app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(sourceRead); app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup); + app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(streamLookupJoin); return app; } @@ -25,6 +26,64 @@ static NYdb::NTable::TExecDataQuerySettings GetDataQuerySettings() { return execSettings; } + +static void CreateSampleTables(TSession session) { + UNIT_ASSERT(session.ExecuteSchemeQuery(R"( + CREATE TABLE `/Root/Join1_1` ( + Key Int32, + Fk21 Int32, + Fk22 String, + Value String, + PRIMARY KEY (Key) + ); + CREATE TABLE `/Root/Join1_2` ( + Key1 Int32, + Key2 String, + Fk3 String, + Value String, + PRIMARY KEY (Key1, Key2) + ); + CREATE TABLE `/Root/Join1_3` ( + Key String, + Value Int32, + PRIMARY KEY (Key) + ); + )").GetValueSync().IsSuccess()); + + UNIT_ASSERT(session.ExecuteDataQuery(R"( + + REPLACE INTO `/Root/Join1_1` (Key, Fk21, Fk22, Value) VALUES + (1, 101, "One", "Value1"), + (2, 102, "Two", "Value1"), + (3, 103, "One", "Value2"), + (4, 104, "Two", "Value2"), + (5, 105, "One", "Value3"), + (6, 106, "Two", "Value3"), + (7, 107, "One", "Value4"), + (8, 108, "One", "Value5"); + + REPLACE INTO `/Root/Join1_2` (Key1, Key2, Fk3, Value) VALUES + (101, "One", "Name1", "Value21"), + (101, "Two", "Name1", "Value22"), + (101, "Three", "Name3", "Value23"), + (102, "One", "Name2", "Value24"), + (103, "One", "Name1", "Value25"), + (104, "One", "Name3", "Value26"), + (105, "One", "Name2", "Value27"), + (105, "Two", "Name4", "Value28"), + (106, "One", "Name3", "Value29"), + (108, "One", NULL, "Value31"), + (109, "Four", NULL, "Value41"); + + REPLACE INTO `/Root/Join1_3` (Key, Value) VALUES + ("Name1", 1001), + ("Name2", 1002), + ("Name4", 1004); + + )", TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess()); +} + + Y_UNIT_TEST_SUITE(KqpCost) { void EnableDebugLogging(NActors::TTestActorRuntime * runtime) { //runtime->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG); @@ -43,6 +102,203 @@ Y_UNIT_TEST_SUITE(KqpCost) { //runtime->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG); //runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG); } + + Y_UNIT_TEST_TWIN(IndexLookup, StreamLookup) { + TKikimrRunner kikimr(GetAppConfig(true, StreamLookup)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/SecondaryKeys` ( + Key Int32, + Fk Int32, + Value String, + ValueInt Int32, + PRIMARY KEY (Key), + INDEX Index GLOBAL ON (Fk) + ); + + )").GetValueSync(); + + session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES + (1, 1, "Payload1", 100), + (2, 2, "Payload2", 200), + (5, 5, "Payload5", 500), + (NULL, 6, "Payload6", 600), + (7, NULL, "Payload7", 700), + (NULL, NULL, "Payload8", 800); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + + auto query = Q_(R"( + SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1; + )"); + + auto txControl = TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + CompareYson(R"( + [ + [["Payload1"]] + ] + )", NYdb::FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + std::unordered_map> readsByTable; + for(const auto& queryPhase : stats.query_phases()) { + for(const auto& tableAccess: queryPhase.table_access()) { + auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0)); + it->second.first += tableAccess.reads().rows(); + it->second.second += tableAccess.reads().bytes(); + } + } + + for(const auto& [name, rowsAndBytes]: readsByTable) { + Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl; + } + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8); + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8); + } + + Y_UNIT_TEST_TWIN(IndexLookupAtLeast8BytesInStorage, StreamLookup) { + TKikimrRunner kikimr(GetAppConfig(true, StreamLookup)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/SecondaryKeys` ( + Key Int32, + Fk Int32, + Value String, + ValueInt Int32, + PRIMARY KEY (Key), + INDEX Index GLOBAL ON (Fk) + ); + + )").GetValueSync(); + + session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES + (1, 1, "Payload1", 100), + (2, 2, "Payload2", 200), + (5, 5, "Payload5", 500), + (NULL, 6, "Payload6", 600), + (7, NULL, "Payload7", 700), + (NULL, NULL, "Payload8", 800); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + + auto query = Q_(R"( + SELECT ValueInt FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1; + )"); + + auto txControl = TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + CompareYson(R"( + [ + [[100]] + ] + )", NYdb::FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + std::unordered_map> readsByTable; + for(const auto& queryPhase : stats.query_phases()) { + for(const auto& tableAccess: queryPhase.table_access()) { + auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0)); + it->second.first += tableAccess.reads().rows(); + it->second.second += tableAccess.reads().bytes(); + } + } + + for(const auto& [name, rowsAndBytes]: readsByTable) { + Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl; + } + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1); + // 4 bytes is unexpected, because datashards has 8 bytes per row in storage. + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8); + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8); + } + + Y_UNIT_TEST_TWIN(IndexLookupAndTake, StreamLookup) { + TKikimrRunner kikimr(GetAppConfig(true, StreamLookup)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + session.ExecuteSchemeQuery(R"( + --!syntax_v1 + CREATE TABLE `/Root/SecondaryKeys` ( + Key Int32, + Fk Int32, + Value String, + ValueInt Int32, + PRIMARY KEY (Key), + INDEX Index GLOBAL ON (Fk) + ); + + )").GetValueSync(); + + session.ExecuteDataQuery(R"( + REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES + (1, 1, "Payload1", 100), + (2, 2, "Payload2", 200), + (5, 5, "Payload5", 500), + (NULL, 6, "Payload6", 600), + (7, NULL, "Payload7", 700), + (NULL, NULL, "Payload8", 800); + )", TTxControl::BeginTx().CommitTx()).GetValueSync(); + + auto query = Q_(R"( + SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk >= 1 and Fk <= 2 AND StartsWith(Value, "Payload") LIMIT 1; + )"); + + auto txControl = TTxControl::BeginTx().CommitTx(); + + auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS); + + CompareYson(R"( + [ + [["Payload1"]] + ] + )", NYdb::FormatResultSetYson(result.GetResultSet(0))); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + std::unordered_map> readsByTable; + for(const auto& queryPhase : stats.query_phases()) { + for(const auto& tableAccess: queryPhase.table_access()) { + auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0)); + it->second.first += tableAccess.reads().rows(); + it->second.second += tableAccess.reads().bytes(); + } + } + + for(const auto& [name, rowsAndBytes]: readsByTable) { + Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl; + } + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8); + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 2); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 16); + } + Y_UNIT_TEST_TWIN(PointLookup, SourceRead) { TKikimrRunner kikimr(GetAppConfig(SourceRead, false)); auto db = kikimr.GetTableClient(); @@ -97,6 +353,44 @@ Y_UNIT_TEST_SUITE(KqpCost) { UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).reads().bytes(), 40); } + Y_UNIT_TEST_TWIN(IndexLookupJoin, StreamLookupJoin) { + TKikimrRunner kikimr(GetAppConfig(true, true, StreamLookupJoin)); + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + + CreateSampleTables(session); + + auto result = session.ExecuteDataQuery(Q_(R"( + PRAGMA DisableSimpleColumns; + SELECT * FROM `/Root/Join1_1` AS t1 + INNER JOIN `/Root/Join1_2` AS t2 + ON t1.Fk21 = t2.Key1 AND t1.Fk22 = t2.Key2 + WHERE t1.Value = 'Value3' AND t2.Value IS NOT NULL + )"), TTxControl::BeginTx().CommitTx(), GetDataQuerySettings()).ExtractValueSync(); + UNIT_ASSERT(result.IsSuccess()); + + auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats()); + + std::unordered_map> readsByTable; + for(const auto& queryPhase : stats.query_phases()) { + for(const auto& tableAccess: queryPhase.table_access()) { + auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0)); + it->second.first += tableAccess.reads().rows(); + it->second.second += tableAccess.reads().bytes(); + } + } + + for(const auto& [name, rowsAndBytes]: readsByTable) { + Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl; + } + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/Join1_2").first, 1); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/Join1_2").second, 19); + + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/Join1_1").first, 8); + UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/Join1_1").second, 136); + } + Y_UNIT_TEST_TWIN(RangeFullScan, SourceRead) { TKikimrRunner kikimr(GetAppConfig(SourceRead));