Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix stream lookup bytes calculation (#12026) #12120

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions ydb/core/kqp/runtime/kqp_stream_lookup_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
, LockTxId(settings.HasLockTxId() ? settings.GetLockTxId() : TMaybe<ui64>())
, NodeLockId(settings.HasLockNodeId() ? settings.GetLockNodeId() : TMaybe<ui32>())
, SchemeCacheRequestTimeout(SCHEME_CACHE_REQUEST_TIMEOUT)
, LookupStrategy(settings.GetLookupStrategy())
, StreamLookupWorker(CreateStreamLookupWorker(std::move(settings), args.TypeEnv, args.HolderFactory, args.InputDesc))
, Counters(counters)
, LookupActorSpan(TWilsonKqp::LookupActor, std::move(args.TraceId), "LookupActor")
Expand Down Expand Up @@ -67,7 +68,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
return NKikimrServices::TActivity::KQP_STREAM_LOOKUP_ACTOR;
}

void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats*) override {
void FillExtraStats(NYql::NDqProto::TDqTaskStats* stats , bool last, const NYql::NDq::TDqMeteringStats* mstats) override {
if (last) {
NYql::NDqProto::TDqTableStats* tableStats = nullptr;
for (auto& table : *stats->MutableTables()) {
Expand All @@ -81,9 +82,25 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
tableStats->SetTablePath(StreamLookupWorker->GetTablePath());
}

ui64 rowsReadEstimate = ReadRowsCount;
ui64 bytesReadEstimate = ReadBytesCount;

if (mstats) {
switch(LookupStrategy) {
case NKqpProto::EStreamLookupStrategy::LOOKUP: {
// in lookup case we return as result actual data, that we read from the datashard.
rowsReadEstimate = mstats->Inputs[InputIndex]->RowsConsumed;
bytesReadEstimate = mstats->Inputs[InputIndex]->BytesConsumed;
break;
}
default:
;
}
}

// TODO: use evread statistics after KIKIMR-16924
tableStats->SetReadRows(tableStats->GetReadRows() + ReadRowsCount);
tableStats->SetReadBytes(tableStats->GetReadBytes() + ReadBytesCount);
tableStats->SetReadRows(tableStats->GetReadRows() + rowsReadEstimate);
tableStats->SetReadBytes(tableStats->GetReadBytes() + bytesReadEstimate);
tableStats->SetAffectedPartitions(tableStats->GetAffectedPartitions() + ReadsPerShard.size());

NKqpProto::TKqpTableExtraStats tableExtraStats;
Expand Down Expand Up @@ -148,7 +165,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
};

struct TEvRetryRead : public TEventLocal<TEvRetryRead, EvRetryRead> {
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
explicit TEvRetryRead(ui64 readId, ui64 lastSeqNo, bool instantStart = false)
: ReadId(readId)
, LastSeqNo(lastSeqNo)
, InstantStart(instantStart) {
Expand Down Expand Up @@ -259,7 +276,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
void Handle(TEvTxProxySchemeCache::TEvResolveKeySetResult::TPtr& ev) {
CA_LOG_D("TEvResolveKeySetResult was received for table: " << StreamLookupWorker->GetTablePath());
if (ev->Get()->Request->ErrorCount > 0) {
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
TString errorMsg = TStringBuilder() << "Failed to get partitioning for table: "
<< StreamLookupWorker->GetTablePath();
LookupActorStateSpan.EndError(errorMsg);

Expand Down Expand Up @@ -419,7 +436,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
auto readIt = Reads.find(ev->Get()->ReadId);
YQL_ENSURE(readIt != Reads.end(), "Unexpected readId: " << ev->Get()->ReadId);
auto& read = readIt->second;

if (read.State == EReadState::Running && read.LastSeqNo <= ev->Get()->LastSeqNo) {
if (ev->Get()->InstantStart) {
read.SetFinished();
Expand Down Expand Up @@ -566,7 +583,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
keyColumnTypes, TVector<TKeyDesc::TColumnOp>{}));

Counters->IteratorsShardResolve->Inc();
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
LookupActorStateSpan = NWilson::TSpan(TWilsonKqp::LookupActorShardsResolve, LookupActorSpan.GetTraceId(),
"WaitForShardsResolve", NWilson::EFlags::AUTO_END);

Send(MakeSchemeCacheID(), new TEvTxProxySchemeCache::TEvInvalidateTable(StreamLookupWorker->GetTableId(), {}));
Expand Down Expand Up @@ -625,6 +642,7 @@ class TKqpStreamLookupActor : public NActors::TActorBootstrapped<TKqpStreamLooku
NActors::TActorId SchemeCacheRequestTimeoutTimer;
TVector<NKikimrDataEvents::TLock> Locks;
TVector<NKikimrDataEvents::TLock> BrokenLocks;
NKqpProto::EStreamLookupStrategy LookupStrategy;
std::unique_ptr<TKqpStreamLookupWorker> StreamLookupWorker;
ui64 ReadId = 0;
size_t TotalRetryAttempts = 0;
Expand Down
13 changes: 10 additions & 3 deletions ydb/core/kqp/runtime/kqp_stream_lookup_worker.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -361,13 +361,15 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {
auto row = HolderFactory.CreateDirectArrayHolder(Columns.size(), rowItems);

i64 rowSize = 0;
i64 storageRowSize = 0;
for (size_t colIndex = 0, resultColIndex = 0; colIndex < Columns.size(); ++colIndex) {
const auto& column = Columns[colIndex];
if (IsSystemColumn(column.Name)) {
NMiniKQL::FillSystemColumn(rowItems[colIndex], result.ShardId, column.Id, column.PType);
rowSize += sizeof(NUdf::TUnboxedValue);
} else {
YQL_ENSURE(resultColIndex < resultRow.size());
storageRowSize += resultRow[resultColIndex].Size();
rowItems[colIndex] = NMiniKQL::GetCellValue(resultRow[resultColIndex], column.PType);
rowSize += NMiniKQL::GetUnboxedValueSize(rowItems[colIndex], column.PType).AllocatedBytes;
++resultColIndex;
Expand All @@ -382,10 +384,12 @@ class TKqpLookupRows : public TKqpStreamLookupWorker {

batch.push_back(std::move(row));

storageRowSize = std::max(storageRowSize, (i64)8);

resultStats.ReadRowsCount += 1;
resultStats.ReadBytesCount += rowSize;
resultStats.ReadBytesCount += storageRowSize;
resultStats.ResultRowsCount += 1;
resultStats.ResultBytesCount += rowSize;
resultStats.ResultBytesCount += storageRowSize;
}

if (result.UnprocessedResultRow == result.ReadResult->Get()->GetRowsCount()) {
Expand Down Expand Up @@ -907,6 +911,8 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
auto leftRowType = GetLeftRowType();
YQL_ENSURE(leftRowType);

i64 storageReadBytes = 0;

for (size_t i = 0; i < leftRowType->GetMembersCount(); ++i) {
auto columnTypeInfo = UnpackTypeInfo(leftRowType->GetMemberType(i));
leftRowSize += NMiniKQL::GetUnboxedValueSize(leftRowInfo.Row.GetElement(i), columnTypeInfo).AllocatedBytes;
Expand All @@ -928,6 +934,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {
NMiniKQL::FillSystemColumn(rightRowItems[colIndex], *shardId, column.Id, column.PType);
rightRowSize += sizeof(NUdf::TUnboxedValue);
} else {
storageReadBytes += rightRow[std::distance(ReadColumns.begin(), it)].Size();
rightRowItems[colIndex] = NMiniKQL::GetCellValue(rightRow[std::distance(ReadColumns.begin(), it)],
column.PType);
rightRowSize += NMiniKQL::GetUnboxedValueSize(rightRowItems[colIndex], column.PType).AllocatedBytes;
Expand All @@ -939,7 +946,7 @@ class TKqpJoinRows : public TKqpStreamLookupWorker {

rowStats.ReadRowsCount += (leftRowInfo.RightRowExist ? 1 : 0);
// TODO: use datashard statistics KIKIMR-16924
rowStats.ReadBytesCount += rightRowSize;
rowStats.ReadBytesCount += storageReadBytes;
rowStats.ResultRowsCount += 1;
rowStats.ResultBytesCount += leftRowSize + rightRowSize;

Expand Down
Loading
Loading