Skip to content

Commit

Permalink
Fix hanging control (ydb-platform#12051)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent 0c35ddd commit 61b4546
Show file tree
Hide file tree
Showing 16 changed files with 101 additions and 51 deletions.
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -520,6 +520,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
SetupLogLevelFromTestParam(NKikimrServices::KQP_BLOBS_STORAGE);
SetupLogLevelFromTestParam(NKikimrServices::KQP_WORKLOAD_SERVICE);
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD);
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD_SCAN);
SetupLogLevelFromTestParam(NKikimrServices::LOCAL_PGWIRE);

RunCall([this, domain = settings.DomainRoot]{
Expand Down
25 changes: 24 additions & 1 deletion ydb/core/tx/columnshard/counters/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,40 @@ class TConcreteScanCounters: public TScanCounters {
private:
using TBase = TScanCounters;
std::shared_ptr<TAtomicCounter> FetchAccessorsCount;
std::shared_ptr<TAtomicCounter> FetchBlobsCount;
std::shared_ptr<TAtomicCounter> MergeTasksCount;
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
std::shared_ptr<TAtomicCounter> ReadTasksCount;
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
std::shared_ptr<TAtomicCounter> ResultsForSourceCount;
std::shared_ptr<TAtomicCounter> ResultsForReplyGuard;

public:
TScanAggregations Aggregations;

TString DebugString() const {
return TStringBuilder() << "FetchAccessorsCount:" << FetchAccessorsCount->Val() << ";"
<< "FetchBlobsCount:" << FetchBlobsCount->Val() << ";"
<< "MergeTasksCount:" << MergeTasksCount->Val() << ";"
<< "AssembleTasksCount:" << AssembleTasksCount->Val() << ";"
<< "ReadTasksCount:" << ReadTasksCount->Val() << ";"
<< "ResourcesAllocationTasksCount:" << ResourcesAllocationTasksCount->Val() << ";"
<< "ResultsForSourceCount:" << ResultsForSourceCount->Val() << ";"
<< "ResultsForReplyGuard:" << ResultsForReplyGuard->Val() << ";";
}

TCounterGuard GetResultsForReplyGuard() const {
return TCounterGuard(ResultsForReplyGuard);
}

TCounterGuard GetFetcherAcessorsGuard() const {
return TCounterGuard(FetchAccessorsCount);
}

TCounterGuard GetFetchBlobsGuard() const {
return TCounterGuard(FetchBlobsCount);
}

TCounterGuard GetResultsForSourceGuard() const {
return TCounterGuard(ResultsForSourceCount);
}
Expand All @@ -326,7 +347,7 @@ class TConcreteScanCounters: public TScanCounters {

bool InWaiting() const {
return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val() || ResourcesAllocationTasksCount->Val() ||
FetchAccessorsCount->Val() || ResultsForSourceCount->Val();
FetchAccessorsCount->Val() || ResultsForSourceCount->Val() || FetchBlobsCount->Val() || ResultsForReplyGuard->Val();
}

void OnBlobsWaitDuration(const TDuration d, const TDuration fullScanDuration) const {
Expand All @@ -337,11 +358,13 @@ class TConcreteScanCounters: public TScanCounters {
TConcreteScanCounters(const TScanCounters& counters)
: TBase(counters)
, FetchAccessorsCount(std::make_shared<TAtomicCounter>())
, FetchBlobsCount(std::make_shared<TAtomicCounter>())
, MergeTasksCount(std::make_shared<TAtomicCounter>())
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
, ReadTasksCount(std::make_shared<TAtomicCounter>())
, ResourcesAllocationTasksCount(std::make_shared<TAtomicCounter>())
, ResultsForSourceCount(std::make_shared<TAtomicCounter>())
, ResultsForReplyGuard(std::make_shared<TAtomicCounter>())
, Aggregations(TBase::BuildAggregations())
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/tx/columnshard/data_accessor/manager.h>
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>

#include <ydb/library/accessor/accessor.h>

namespace NKikimr::NOlap::NReader {

class TPartialReadResult;

class TComputeShardingPolicy {
private:
YDB_READONLY(ui32, ShardsCount, 0);
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,9 @@ void TColumnShardScan::ContinueProcessing() {
}
}
}
// AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)(
// "gen", ScanGen)("tablet", TabletId)("debug", ScanIterator->DebugString());
AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)(
"scan_id", ScanId)("gen", ScanGen)("tablet", TabletId)("debug",
ScanIterator->DebugString())("counters", ScanCountersPool.DebugString());
}

void TColumnShardScan::MakeResult(size_t reserveRows /*= 0*/) {
Expand Down
19 changes: 18 additions & 1 deletion ydb/core/tx/columnshard/engines/reader/common/result.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include "result.h"

#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>

namespace NKikimr::NOlap::NReader {

class TCurrentBatch {
private:
std::vector<std::shared_ptr<TPartialReadResult>> Results;
ui64 RecordsCount = 0;

public:
ui64 GetRecordsCount() const {
return RecordsCount;
Expand Down Expand Up @@ -49,4 +52,18 @@ std::vector<std::shared_ptr<TPartialReadResult>> TPartialReadResult::SplitResult
return result;
}

}
TPartialReadResult::TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
const std::optional<ui32> notFinishedIntervalIdx)
: ResourceGuards(resourceGuards)
, GroupGuard(gGuard)
, ResultBatch(batch)
, ScanCursor(scanCursor)
, NotFinishedIntervalIdx(notFinishedIntervalIdx)
, Guard(TValidator::CheckNotNull(context)->GetCounters().GetResultsForReplyGuard()) {
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
Y_ABORT_UNLESS(ScanCursor);
}

} // namespace NKikimr::NOlap::NReader
19 changes: 8 additions & 11 deletions ydb/core/tx/columnshard/engines/reader/common/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
#include <ydb/core/tx/program/program.h>

#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>

namespace NKikimr::NOlap::NReader {

class TReadContext;

// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
class TPartialReadResult: public TNonCopyable {
private:
Expand All @@ -20,6 +23,7 @@ class TPartialReadResult: public TNonCopyable {
// NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
std::shared_ptr<IScanCursor> ScanCursor;
YDB_READONLY_DEF(std::optional<ui32>, NotFinishedIntervalIdx);
const NColumnShard::TCounterGuard Guard;

public:
void Cut(const ui32 limit) {
Expand Down Expand Up @@ -56,19 +60,12 @@ class TPartialReadResult: public TNonCopyable {

explicit TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
const std::shared_ptr<IScanCursor>& scanCursor, const std::optional<ui32> notFinishedIntervalIdx)
: ResourceGuards(resourceGuards)
, GroupGuard(gGuard)
, ResultBatch(batch)
, ScanCursor(scanCursor)
, NotFinishedIntervalIdx(notFinishedIntervalIdx) {
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
Y_ABORT_UNLESS(ScanCursor);
}
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
const std::optional<ui32> notFinishedIntervalIdx);

explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor,
const std::optional<ui32> notFinishedIntervalIdx)
: TPartialReadResult({}, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
const std::shared_ptr<TReadContext>& context, const std::optional<ui32> notFinishedIntervalIdx)
: TPartialReadResult({}, nullptr, batch, scanCursor, context, notFinishedIntervalIdx) {
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include "plain_read_data.h"

#include <ydb/core/tx/columnshard/engines/reader/common/result.h>

namespace NKikimr::NOlap::NReader::NPlain {

TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
: TBase(context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context))
{
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
Expand All @@ -28,8 +29,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
if (GetReadMetadata()->IsMyUncommitted(i.GetInsertWriteId())) {
continue;
}
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) ||
GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) {
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) || GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) {
GetReadMetadata()->SetConflictedWriteId(i.GetInsertWriteId());
}
}
Expand All @@ -56,21 +56,21 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
stats->CommittedPortionsBytes = committedPortionsBytes;
stats->InsertedPortionsBytes = insertedPortionsBytes;
stats->CompactedPortionsBytes = compactedPortionsBytes;

}

std::vector<std::shared_ptr<TPartialReadResult>> TPlainReadData::DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) {
auto result = std::move(PartialResults);
PartialResults.clear();
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
ui32 count = 0;
for (auto&& r: result) {
for (auto&& r : result) {
count += r->GetRecordsCount();
}
AFL_VERIFY(count == ReadyResultsCount);
ReadyResultsCount = 0;

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)("finished", Scanner->IsFinished());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)(
"finished", Scanner->IsFinished());
return result;
}

Expand All @@ -79,9 +79,9 @@ TConclusion<bool> TPlainReadData::DoReadNextInterval() {
}

void TPlainReadData::OnIntervalResult(const std::shared_ptr<TPartialReadResult>& result) {
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
ReadyResultsCount += result->GetRecordsCount();
PartialResults.emplace_back(result);
}

}
} // namespace NKikimr::NOlap::NReader::NPlain
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "scanner.h"

#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>

#include <ydb/library/actors/core/log.h>

Expand Down Expand Up @@ -29,7 +30,7 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
}
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> guards = { std::move(allocationGuard) };
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(guards, std::move(gGuard), *newBatch,
std::make_shared<TPlainScanCursor>(lastPK), callbackIdxSubscriver)).second);
std::make_shared<TPlainScanCursor>(lastPK), Context->GetCommonContext(), callbackIdxSubscriver)).second);
} else {
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
private:
TFetchingScriptCursor Step;
std::shared_ptr<IDataSource> Source;
const NColumnShard::TCounterGuard Guard;
virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override {
AFL_VERIFY(!result.HasErrors());
AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size());
Expand All @@ -226,10 +227,10 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
public:
TPortionAccessorFetchingSubscriber(const TFetchingScriptCursor& step, const std::shared_ptr<IDataSource>& source)
: Step(step)
, Source(source) {
, Source(source)
, Guard(Source->GetContext()->GetCommonContext()->GetCounters().GetFetcherAcessorsGuard()) {
}
};

} // namespace

bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,14 @@ bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& ra
return false;
}

TBlobsFetcherTask::TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions,
const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr<TSpecialReadContext>& context,
const TString& taskCustomer, const TString& externalTaskId)
: TBase(readActions, taskCustomer, externalTaskId)
, Source(sourcePtr)
, Step(step)
, Context(context)
, Guard(Context->GetCommonContext()->GetCounters().GetFetchBlobsGuard()) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,13 @@ class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnSha
const std::shared_ptr<IDataSource> Source;
TFetchingScriptCursor Step;
const std::shared_ptr<TSpecialReadContext> Context;
const NColumnShard::TCounterGuard Guard;

virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override;
public:
TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, const std::shared_ptr<IDataSource>& sourcePtr,
const TFetchingScriptCursor& step, const std::shared_ptr<TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId)
: TBase(readActions, taskCustomer, externalTaskId)
, Source(sourcePtr)
, Step(step)
, Context(context)
{

}
const TFetchingScriptCursor& step, const std::shared_ptr<TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId);
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class TStepAction: public IDataTasksProcessor::ITask {
std::shared_ptr<IDataSource> Source;
TFetchingScriptCursor Cursor;
bool FinishedFlag = false;
NColumnShard::TCounterGuard CountersGuard;
const NColumnShard::TCounterGuard CountersGuard;

protected:
virtual bool DoApply(IDataReader& owner) const override;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include "plain_read_data.h"

#include <ydb/core/tx/columnshard/engines/reader/common/result.h>

namespace NKikimr::NOlap::NReader::NSimple {

TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
: TBase(context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context))
{
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
Expand All @@ -27,21 +28,21 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
stats->InsertedPortionsBytes = insertedPortionsBytes;
stats->CompactedPortionsBytes = compactedPortionsBytes;

}

std::vector<std::shared_ptr<TPartialReadResult>> TPlainReadData::DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) {
auto result = std::move(PartialResults);
PartialResults.clear();
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
ui32 count = 0;
for (auto&& r: result) {
for (auto&& r : result) {
count += r->GetRecordsCount();
}
AFL_VERIFY(count == ReadyResultsCount);
ReadyResultsCount = 0;

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)("finished", Scanner->IsFinished());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)(
"finished", Scanner->IsFinished());
return result;
}

Expand All @@ -50,9 +51,9 @@ TConclusion<bool> TPlainReadData::DoReadNextInterval() {
}

void TPlainReadData::OnIntervalResult(const std::shared_ptr<TPartialReadResult>& result) {
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
ReadyResultsCount += result->GetRecordsCount();
PartialResults.emplace_back(result);
}

}
} // namespace NKikimr::NOlap::NReader::NSimple
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "scanner.h"

#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>

#include <ydb/library/actors/core/log.h>

Expand Down Expand Up @@ -36,8 +37,8 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
"source_idx", frontSource->GetSourceIdx())("table", table->num_rows());
auto cursor =
std::make_shared<TSimpleScanCursor>(frontSource->GetStartPKRecordBatch(), frontSource->GetSourceId(), startIndex + recordsCount);
reader.OnIntervalResult(
std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table, cursor, sourceIdxToContinue));
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table,
cursor, Context->GetCommonContext(), sourceIdxToContinue));
} else if (sourceIdxToContinue) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
Expand Down
Loading

0 comments on commit 61b4546

Please sign in to comment.