Skip to content

Commit

Permalink
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Merge 4b54ea9 into 47026cc
Browse files Browse the repository at this point in the history
gridnevvvit authored Nov 28, 2024
2 parents 47026cc + 4b54ea9 commit 927d961
Showing 3 changed files with 330 additions and 11 deletions.
32 changes: 25 additions & 7 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
@@ -40,6 +40,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, LookupStrategy(settings.GetLookupStrategy())
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
, Counters(counters)
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
@@ -67,7 +68,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
}

void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats*) override {
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats* mstats) override {
if (last) {
NYql::NDqProto::TDqTableStats* tableStats = nullptr;
for (auto& table : *stats->MutableTables()) {
@@ -81,9 +82,25 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
tableStats->SetTablePath(StreamLookupWorker->GetTablePath());
}

ui64 rowsReadEstimate = ReadRowsCount;
ui64 bytesReadEstimate = ReadBytesCount;

if (mstats) {
switch(LookupStrategy) {
case NKqpProto::EStreamLookupStrategy::LOOKUP: {
// in lookup case we return as result actual data, that we read from the datashard.
rowsReadEstimate = mstats->Inputs[InputIndex]->RowsConsumed;
bytesReadEstimate = mstats->Inputs[InputIndex]->BytesConsumed;
break;
}
default:
;
}
}

// TODO: use evread statistics after KIKIMR-16924
tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount);
tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount);
tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate);
tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate);
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());

NKqpProto::TKqpTableExtraStats tableExtraStats;
@@ -148,7 +165,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
};

struct TEvRetryRead : public TEventLocal<TEvRetryRead, EvRetryRead> {
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
: ReadId(readId)
, LastSeqNo(lastSeqNo)
, InstantStart(instantStart) {
@@ -259,7 +276,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
if (ev->Get()->Request->ErrorCount > 0) {
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
<< StreamLookupWorker->GetTablePath();
LookupActorStateSpan.EndError(errorMsg);

@@ -419,7 +436,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
auto readIt = Reads.find(ev->Get()->ReadId);
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
auto& read = readIt->second;

if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) {
if (ev->Get()->InstantStart) {
read.SetFinished();
@@ -566,7 +583,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));

Counters->IteratorsShardResolve->Inc();
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
@@ -625,6 +642,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
NActors::TActorId SchemeCacheRequestTimeoutTimer;
TVector<NKikimrDataEvents::TLock> Locks;
TVector<NKikimrDataEvents::TLock> BrokenLocks;
NKqpProto::EStreamLookupStrategy LookupStrategy;
std::unique_ptr<TKqpStreamLookupWorker> StreamLookupWorker;
ui64 ReadId = 0;
size_t TotalRetryAttempts = 0;
13 changes: 10 additions & 3 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
@@ -361,13 +361,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);

i64 rowSize = 0;
i64 storageRowSize = 0;
for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
const auto& column = Columns[colIndex];
if (IsSystemColumn(column.Name)) {
NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
rowSize += sizeof(NUdf::TUnboxedValue);
} else {
YQL_ENSURE(resultColIndex < resultRow.size());
storageRowSize += resultRow[resultColIndex].Size();
rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
++resultColIndex;
@@ -382,10 +384,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {

batch.push_back(std::move(row));

storageRowSize = std::max(storageRowSize, (i64)8);

resultStats.ReadRowsCount += 1;
resultStats.ReadBytesCount += rowSize;
resultStats.ReadBytesCount += storageRowSize;
resultStats.ResultRowsCount += 1;
resultStats.ResultBytesCount += rowSize;
resultStats.ResultBytesCount += storageRowSize;
}

if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
@@ -907,6 +911,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
auto leftRowType = GetLeftRowType();
YQL_ENSURE(leftRowType);

i64 storageReadBytes = 0;

for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) {
auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i));
leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes;
@@ -928,6 +934,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType);
rightRowSize += sizeof(NUdf::TUnboxedValue);
} else {
storageReadBytes += rightRow[std::distance(ReadColumns.begin(), it)].Size();
rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)],
column.PType);
rightRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
@@ -939,7 +946,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {

rowStats.ReadRowsCount += (leftRowInfo.RightRowExist ? 1 : 0);
// TODO: use datashard statistics KIKIMR-16924
rowStats.ReadBytesCount += rightRowSize;
rowStats.ReadBytesCount += storageReadBytes;
rowStats.ResultRowsCount += 1;
rowStats.ResultBytesCount += leftRowSize + rightRowSize;

296 changes: 295 additions & 1 deletion ydb/core/kqp/ut/cost/kqp_cost_ut.cpp
Original file line number Diff line number Diff line change
@@ -11,11 +11,12 @@ namespace NKqp {
using namespace NYdb;
using namespace NYdb::NTable;

static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true) {
static NKikimrConfig::TAppConfig GetAppConfig(bool sourceRead, bool streamLookup = true, bool streamLookupJoin = false) {
auto app = NKikimrConfig::TAppConfig();
app.MutableTableServiceConfig()->SetEnableKqpDataQuerySourceRead(sourceRead);
app.MutableTableServiceConfig()->SetEnableKqpScanQuerySourceRead(sourceRead);
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamLookup(streamLookup);
app.MutableTableServiceConfig()->SetEnableKqpDataQueryStreamIdxLookupJoin(streamLookupJoin);
return app;
}

@@ -25,6 +26,64 @@ static NYdb::NTable::TExecDataQuerySettings GetDataQuerySettings() {
return execSettings;
}


static void CreateSampleTables(TSession session) {
UNIT_ASSERT(session.ExecuteSchemeQuery(R"(
CREATE TABLE `/Root/Join1_1` (
Key Int32,
Fk21 Int32,
Fk22 String,
Value String,
PRIMARY KEY (Key)
);
CREATE TABLE `/Root/Join1_2` (
Key1 Int32,
Key2 String,
Fk3 String,
Value String,
PRIMARY KEY (Key1, Key2)
);
CREATE TABLE `/Root/Join1_3` (
Key String,
Value Int32,
PRIMARY KEY (Key)
);
)").GetValueSync().IsSuccess());

UNIT_ASSERT(session.ExecuteDataQuery(R"(
REPLACE INTO `/Root/Join1_1` (Key, Fk21, Fk22, Value) VALUES
(1, 101, "One", "Value1"),
(2, 102, "Two", "Value1"),
(3, 103, "One", "Value2"),
(4, 104, "Two", "Value2"),
(5, 105, "One", "Value3"),
(6, 106, "Two", "Value3"),
(7, 107, "One", "Value4"),
(8, 108, "One", "Value5");
REPLACE INTO `/Root/Join1_2` (Key1, Key2, Fk3, Value) VALUES
(101, "One", "Name1", "Value21"),
(101, "Two", "Name1", "Value22"),
(101, "Three", "Name3", "Value23"),
(102, "One", "Name2", "Value24"),
(103, "One", "Name1", "Value25"),
(104, "One", "Name3", "Value26"),
(105, "One", "Name2", "Value27"),
(105, "Two", "Name4", "Value28"),
(106, "One", "Name3", "Value29"),
(108, "One", NULL, "Value31"),
(109, "Four", NULL, "Value41");
REPLACE INTO `/Root/Join1_3` (Key, Value) VALUES
("Name1", 1001),
("Name2", 1002),
("Name4", 1004);
)", TTxControl::BeginTx().CommitTx()).GetValueSync().IsSuccess());
}


Y_UNIT_TEST_SUITE(KqpCost) {
void EnableDebugLogging(NActors::TTestActorRuntime * runtime) {
//runtime->SetLogPriority(NKikimrServices::FLAT_TX_SCHEMESHARD, NActors::NLog::PRI_DEBUG);
@@ -43,6 +102,203 @@ Y_UNIT_TEST_SUITE(KqpCost) {
//runtime->SetLogPriority(NKikimrServices::BLOB_CACHE, NActors::NLog::PRI_DEBUG);
//runtime->SetLogPriority(NKikimrServices::GRPC_SERVER, NActors::NLog::PRI_DEBUG);
}

Y_UNIT_TEST_TWIN(IndexLookup, StreamLookup) {
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/SecondaryKeys` (
Key Int32,
Fk Int32,
Value String,
ValueInt Int32,
PRIMARY KEY (Key),
INDEX Index GLOBAL ON (Fk)
);
)").GetValueSync();

session.ExecuteDataQuery(R"(
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
(1, 1, "Payload1", 100),
(2, 2, "Payload2", 200),
(5, 5, "Payload5", 500),
(NULL, 6, "Payload6", 600),
(7, NULL, "Payload7", 700),
(NULL, NULL, "Payload8", 800);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();

auto query = Q_(R"(
SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1;
)");

auto txControl = TTxControl::BeginTx().CommitTx();

auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);

CompareYson(R"(
[
[["Payload1"]]
]
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

std::unordered_map<TString, std::pair<int, int>> readsByTable;
for(const auto& queryPhase : stats.query_phases()) {
for(const auto& tableAccess: queryPhase.table_access()) {
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
it->second.first += tableAccess.reads().rows();
it->second.second += tableAccess.reads().bytes();
}
}

for(const auto& [name, rowsAndBytes]: readsByTable) {
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
}

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8);
}

Y_UNIT_TEST_TWIN(IndexLookupAtLeast8BytesInStorage, StreamLookup) {
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/SecondaryKeys` (
Key Int32,
Fk Int32,
Value String,
ValueInt Int32,
PRIMARY KEY (Key),
INDEX Index GLOBAL ON (Fk)
);
)").GetValueSync();

session.ExecuteDataQuery(R"(
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
(1, 1, "Payload1", 100),
(2, 2, "Payload2", 200),
(5, 5, "Payload5", 500),
(NULL, 6, "Payload6", 600),
(7, NULL, "Payload7", 700),
(NULL, NULL, "Payload8", 800);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();

auto query = Q_(R"(
SELECT ValueInt FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk = 1;
)");

auto txControl = TTxControl::BeginTx().CommitTx();

auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);

CompareYson(R"(
[
[[100]]
]
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

std::unordered_map<TString, std::pair<int, int>> readsByTable;
for(const auto& queryPhase : stats.query_phases()) {
for(const auto& tableAccess: queryPhase.table_access()) {
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
it->second.first += tableAccess.reads().rows();
it->second.second += tableAccess.reads().bytes();
}
}

for(const auto& [name, rowsAndBytes]: readsByTable) {
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
}

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
// 4 bytes is unexpected, because datashards has 8 bytes per row in storage.
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 8);
}

Y_UNIT_TEST_TWIN(IndexLookupAndTake, StreamLookup) {
TKikimrRunner kikimr(GetAppConfig(true, StreamLookup));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

session.ExecuteSchemeQuery(R"(
--!syntax_v1
CREATE TABLE `/Root/SecondaryKeys` (
Key Int32,
Fk Int32,
Value String,
ValueInt Int32,
PRIMARY KEY (Key),
INDEX Index GLOBAL ON (Fk)
);
)").GetValueSync();

session.ExecuteDataQuery(R"(
REPLACE INTO `/Root/SecondaryKeys` (Key, Fk, Value, ValueInt) VALUES
(1, 1, "Payload1", 100),
(2, 2, "Payload2", 200),
(5, 5, "Payload5", 500),
(NULL, 6, "Payload6", 600),
(7, NULL, "Payload7", 700),
(NULL, NULL, "Payload8", 800);
)", TTxControl::BeginTx().CommitTx()).GetValueSync();

auto query = Q_(R"(
SELECT Value FROM `/Root/SecondaryKeys` VIEW Index WHERE Fk >= 1 and Fk <= 2 AND StartsWith(Value, "Payload") LIMIT 1;
)");

auto txControl = TTxControl::BeginTx().CommitTx();

auto result = session.ExecuteDataQuery(query, txControl, GetDataQuerySettings()).ExtractValueSync();
UNIT_ASSERT_VALUES_EQUAL(result.GetStatus(), EStatus::SUCCESS);

CompareYson(R"(
[
[["Payload1"]]
]
)", NYdb::FormatResultSetYson(result.GetResultSet(0)));

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

std::unordered_map<TString, std::pair<int, int>> readsByTable;
for(const auto& queryPhase : stats.query_phases()) {
for(const auto& tableAccess: queryPhase.table_access()) {
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
it->second.first += tableAccess.reads().rows();
it->second.second += tableAccess.reads().bytes();
}
}

for(const auto& [name, rowsAndBytes]: readsByTable) {
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
}

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys").second, 8);

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").first, 2);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/SecondaryKeys/Index/indexImplTable").second, 16);
}

Y_UNIT_TEST_TWIN(PointLookup, SourceRead) {
TKikimrRunner kikimr(GetAppConfig(SourceRead, false));
auto db = kikimr.GetTableClient();
@@ -97,6 +353,44 @@ Y_UNIT_TEST_SUITE(KqpCost) {
UNIT_ASSERT_VALUES_EQUAL(stats.query_phases(phase).table_access(0).reads().bytes(), 40);
}

Y_UNIT_TEST_TWIN(IndexLookupJoin, StreamLookupJoin) {
TKikimrRunner kikimr(GetAppConfig(true, true, StreamLookupJoin));
auto db = kikimr.GetTableClient();
auto session = db.CreateSession().GetValueSync().GetSession();

CreateSampleTables(session);

auto result = session.ExecuteDataQuery(Q_(R"(
PRAGMA DisableSimpleColumns;
SELECT * FROM `/Root/Join1_1` AS t1
INNER JOIN `/Root/Join1_2` AS t2
ON t1.Fk21 = t2.Key1 AND t1.Fk22 = t2.Key2
WHERE t1.Value = 'Value3' AND t2.Value IS NOT NULL
)"), TTxControl::BeginTx().CommitTx(), GetDataQuerySettings()).ExtractValueSync();
UNIT_ASSERT(result.IsSuccess());

auto stats = NYdb::TProtoAccessor::GetProto(*result.GetStats());

std::unordered_map<TString, std::pair<int, int>> readsByTable;
for(const auto& queryPhase : stats.query_phases()) {
for(const auto& tableAccess: queryPhase.table_access()) {
auto [it, success] = readsByTable.emplace(tableAccess.name(), std::make_pair(0, 0));
it->second.first += tableAccess.reads().rows();
it->second.second += tableAccess.reads().bytes();
}
}

for(const auto& [name, rowsAndBytes]: readsByTable) {
Cerr << name << " " << rowsAndBytes.first << " " << rowsAndBytes.second << Endl;
}

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/Join1_2").first, 1);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/Join1_2").second, 19);

UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/Join1_1").first, 8);
UNIT_ASSERT_VALUES_EQUAL(readsByTable.at("/Root/Join1_1").second, 136);
}

Y_UNIT_TEST_TWIN(RangeFullScan, SourceRead) {
TKikimrRunner kikimr(GetAppConfig(SourceRead));

0 comments on commit 927d961

Please sign in to comment.