Skip to content

Commit

Permalink
fix stream lookup bytes calculation (#12026)
Browse files Browse the repository at this point in the history
  • Loading branch information
gridnevvvit authored Nov 28, 2024
1 parent 97c9f43 commit 02c6b58
Show file tree
Hide file tree
Showing 3 changed files with 329 additions and 11 deletions.
32 changes: 25 additions & 7 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, LookupStrategy(settings.GetLookupStrategy())
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
, Counters(counters)
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
Expand Down Expand Up @@ -67,7 +68,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
}

void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats*) override {
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats* mstats) override {
if (last) {
NYql::NDqProto::TDqTableStats* tableStats = nullptr;
for (auto& table : *stats->MutableTables()) {
Expand All @@ -81,9 +82,25 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
tableStats->SetTablePath(StreamLookupWorker->GetTablePath());
}

ui64 rowsReadEstimate = ReadRowsCount;
ui64 bytesReadEstimate = ReadBytesCount;

if (mstats) {
switch(LookupStrategy) {
case NKqpProto::EStreamLookupStrategy::LOOKUP: {
// in lookup case we return as result actual data, that we read from the datashard.
rowsReadEstimate = mstats->Inputs[InputIndex]->RowsConsumed;
bytesReadEstimate = mstats->Inputs[InputIndex]->BytesConsumed;
break;
}
default:
;
}
}

// TODO: use evread statistics after KIKIMR-16924
tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount);
tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount);
tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate);
tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate);
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());

NKqpProto::TKqpTableExtraStats tableExtraStats;
Expand Down Expand Up @@ -148,7 +165,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
};

struct TEvRetryRead : public TEventLocal<TEvRetryRead, EvRetryRead> {
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
: ReadId(readId)
, LastSeqNo(lastSeqNo)
, InstantStart(instantStart) {
Expand Down Expand Up @@ -259,7 +276,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
if (ev->Get()->Request->ErrorCount > 0) {
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
<< StreamLookupWorker->GetTablePath();
LookupActorStateSpan.EndError(errorMsg);

Expand Down Expand Up @@ -419,7 +436,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
auto readIt = Reads.find(ev->Get()->ReadId);
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
auto& read = readIt->second;

if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) {
if (ev->Get()->InstantStart) {
read.SetFinished();
Expand Down Expand Up @@ -566,7 +583,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));

Counters->IteratorsShardResolve->Inc();
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
Expand Down Expand Up @@ -625,6 +642,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
NActors::TActorId SchemeCacheRequestTimeoutTimer;
TVector<NKikimrDataEvents::TLock> Locks;
TVector<NKikimrDataEvents::TLock> BrokenLocks;
NKqpProto::EStreamLookupStrategy LookupStrategy;
std::unique_ptr<TKqpStreamLookupWorker> StreamLookupWorker;
ui64 ReadId = 0;
size_t TotalRetryAttempts = 0;
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,13 +349,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);

i64 rowSize = 0;
i64 storageRowSize = 0;
for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
const auto& column = Columns[colIndex];
if (IsSystemColumn(column.Name)) {
NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
rowSize += sizeof(NUdf::TUnboxedValue);
} else {
YQL_ENSURE(resultColIndex < resultRow.size());
storageRowSize += resultRow[resultColIndex].Size();
rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
++resultColIndex;
Expand All @@ -370,10 +372,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {

batch.push_back(std::move(row));

storageRowSize = std::max(storageRowSize, (i64)8);

resultStats.ReadRowsCount += 1;
resultStats.ReadBytesCount += rowSize;
resultStats.ReadBytesCount += storageRowSize;
resultStats.ResultRowsCount += 1;
resultStats.ResultBytesCount += rowSize;
resultStats.ResultBytesCount += storageRowSize;
}

if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
Expand Down Expand Up @@ -895,6 +899,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
auto leftRowType = GetLeftRowType();
YQL_ENSURE(leftRowType);

i64 storageReadBytes = 0;

for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) {
auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i));
leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes;
Expand All @@ -916,6 +922,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType);
rightRowSize += sizeof(NUdf::TUnboxedValue);
} else {
storageReadBytes += rightRow[std::distance(ReadColumns.begin(), it)].Size();
rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)],
column.PType);
rightRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
Expand All @@ -927,7 +934,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {

rowStats.ReadRowsCount += (leftRowInfo.RightRowExist ? 1 : 0);
// TODO: use datashard statistics KIKIMR-16924
rowStats.ReadBytesCount += rightRowSize;
rowStats.ReadBytesCount += storageReadBytes;
rowStats.ResultRowsCount += 1;
rowStats.ResultBytesCount += leftRowSize + rightRowSize;

Expand Down
Loading

0 comments on commit 02c6b58

Please sign in to comment.