Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix hanging control #12051

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions ydb/core/kqp/ut/common/kqp_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -526,6 +526,7 @@ void TKikimrRunner::Initialize(const TKikimrSettings& settings) {
SetupLogLevelFromTestParam(NKikimrServices::KQP_BLOBS_STORAGE);
SetupLogLevelFromTestParam(NKikimrServices::KQP_WORKLOAD_SERVICE);
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD);
SetupLogLevelFromTestParam(NKikimrServices::TX_COLUMNSHARD_SCAN);
SetupLogLevelFromTestParam(NKikimrServices::LOCAL_PGWIRE);

RunCall([this, domain = settings.DomainRoot]{
Expand Down
25 changes: 24 additions & 1 deletion ydb/core/tx/columnshard/counters/scan.h
Original file line number Diff line number Diff line change
Expand Up @@ -291,19 +291,40 @@ class TConcreteScanCounters: public TScanCounters {
private:
using TBase = TScanCounters;
std::shared_ptr<TAtomicCounter> FetchAccessorsCount;
std::shared_ptr<TAtomicCounter> FetchBlobsCount;
std::shared_ptr<TAtomicCounter> MergeTasksCount;
std::shared_ptr<TAtomicCounter> AssembleTasksCount;
std::shared_ptr<TAtomicCounter> ReadTasksCount;
std::shared_ptr<TAtomicCounter> ResourcesAllocationTasksCount;
std::shared_ptr<TAtomicCounter> ResultsForSourceCount;
std::shared_ptr<TAtomicCounter> ResultsForReplyGuard;

public:
TScanAggregations Aggregations;

TString DebugString() const {
return TStringBuilder() << "FetchAccessorsCount:" << FetchAccessorsCount->Val() << ";"
<< "FetchBlobsCount:" << FetchBlobsCount->Val() << ";"
<< "MergeTasksCount:" << MergeTasksCount->Val() << ";"
<< "AssembleTasksCount:" << AssembleTasksCount->Val() << ";"
<< "ReadTasksCount:" << ReadTasksCount->Val() << ";"
<< "ResourcesAllocationTasksCount:" << ResourcesAllocationTasksCount->Val() << ";"
<< "ResultsForSourceCount:" << ResultsForSourceCount->Val() << ";"
<< "ResultsForReplyGuard:" << ResultsForReplyGuard->Val() << ";";
}

TCounterGuard GetResultsForReplyGuard() const {
return TCounterGuard(ResultsForReplyGuard);
}

TCounterGuard GetFetcherAcessorsGuard() const {
return TCounterGuard(FetchAccessorsCount);
}

TCounterGuard GetFetchBlobsGuard() const {
return TCounterGuard(FetchBlobsCount);
}

TCounterGuard GetResultsForSourceGuard() const {
return TCounterGuard(ResultsForSourceCount);
}
Expand All @@ -326,7 +347,7 @@ class TConcreteScanCounters: public TScanCounters {

bool InWaiting() const {
return MergeTasksCount->Val() || AssembleTasksCount->Val() || ReadTasksCount->Val() || ResourcesAllocationTasksCount->Val() ||
FetchAccessorsCount->Val() || ResultsForSourceCount->Val();
FetchAccessorsCount->Val() || ResultsForSourceCount->Val() || FetchBlobsCount->Val() || ResultsForReplyGuard->Val();
}

void OnBlobsWaitDuration(const TDuration d, const TDuration fullScanDuration) const {
Expand All @@ -337,11 +358,13 @@ class TConcreteScanCounters: public TScanCounters {
TConcreteScanCounters(const TScanCounters& counters)
: TBase(counters)
, FetchAccessorsCount(std::make_shared<TAtomicCounter>())
, FetchBlobsCount(std::make_shared<TAtomicCounter>())
, MergeTasksCount(std::make_shared<TAtomicCounter>())
, AssembleTasksCount(std::make_shared<TAtomicCounter>())
, ReadTasksCount(std::make_shared<TAtomicCounter>())
, ResourcesAllocationTasksCount(std::make_shared<TAtomicCounter>())
, ResultsForSourceCount(std::make_shared<TAtomicCounter>())
, ResultsForReplyGuard(std::make_shared<TAtomicCounter>())
, Aggregations(TBase::BuildAggregations())
{

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,14 @@
#include <ydb/core/tx/columnshard/blobs_action/abstract/storages_manager.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/tx/columnshard/data_accessor/manager.h>
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>
#include <ydb/core/tx/columnshard/resource_subscriber/task.h>

#include <ydb/library/accessor/accessor.h>

namespace NKikimr::NOlap::NReader {

class TPartialReadResult;

class TComputeShardingPolicy {
private:
YDB_READONLY(ui32, ShardsCount, 0);
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/engines/reader/actor/actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -304,8 +304,9 @@ void TColumnShardScan::ContinueProcessing() {
}
}
}
// AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)("scan_id", ScanId)(
// "gen", ScanGen)("tablet", TabletId)("debug", ScanIterator->DebugString());
AFL_VERIFY(!ScanIterator || !ChunksLimiter.HasMore() || ScanCountersPool.InWaiting())("scan_actor_id", ScanActorId)("tx_id", TxId)(
"scan_id", ScanId)("gen", ScanGen)("tablet", TabletId)("debug",
ScanIterator->DebugString())("counters", ScanCountersPool.DebugString());
}

void TColumnShardScan::MakeResult(size_t reserveRows /*= 0*/) {
Expand Down
19 changes: 18 additions & 1 deletion ydb/core/tx/columnshard/engines/reader/common/result.cpp
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
#include "result.h"

#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>

namespace NKikimr::NOlap::NReader {

class TCurrentBatch {
private:
std::vector<std::shared_ptr<TPartialReadResult>> Results;
ui64 RecordsCount = 0;

public:
ui64 GetRecordsCount() const {
return RecordsCount;
Expand Down Expand Up @@ -49,4 +52,18 @@ std::vector<std::shared_ptr<TPartialReadResult>> TPartialReadResult::SplitResult
return result;
}

}
TPartialReadResult::TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
const std::optional<ui32> notFinishedIntervalIdx)
: ResourceGuards(resourceGuards)
, GroupGuard(gGuard)
, ResultBatch(batch)
, ScanCursor(scanCursor)
, NotFinishedIntervalIdx(notFinishedIntervalIdx)
, Guard(TValidator::CheckNotNull(context)->GetCounters().GetResultsForReplyGuard()) {
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
Y_ABORT_UNLESS(ScanCursor);
}

} // namespace NKikimr::NOlap::NReader
19 changes: 8 additions & 11 deletions ydb/core/tx/columnshard/engines/reader/common/result.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@
#include <ydb/core/tx/program/program.h>

#include <ydb/library/yql/dq/actors/protos/dq_stats.pb.h>

namespace NKikimr::NOlap::NReader {

class TReadContext;

// Represents a batch of rows produced by ASC or DESC scan with applied filters and partial aggregation
class TPartialReadResult: public TNonCopyable {
private:
Expand All @@ -20,6 +23,7 @@ class TPartialReadResult: public TNonCopyable {
// NOTE: it might be different from the Key of last row in ResulBatch in case of filtering/aggregation/limit
std::shared_ptr<IScanCursor> ScanCursor;
YDB_READONLY_DEF(std::optional<ui32>, NotFinishedIntervalIdx);
const NColumnShard::TCounterGuard Guard;

public:
void Cut(const ui32 limit) {
Expand Down Expand Up @@ -56,19 +60,12 @@ class TPartialReadResult: public TNonCopyable {

explicit TPartialReadResult(const std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>>& resourceGuards,
const std::shared_ptr<NGroupedMemoryManager::TGroupGuard>& gGuard, const NArrow::TShardedRecordBatch& batch,
const std::shared_ptr<IScanCursor>& scanCursor, const std::optional<ui32> notFinishedIntervalIdx)
: ResourceGuards(resourceGuards)
, GroupGuard(gGuard)
, ResultBatch(batch)
, ScanCursor(scanCursor)
, NotFinishedIntervalIdx(notFinishedIntervalIdx) {
Y_ABORT_UNLESS(ResultBatch.GetRecordsCount());
Y_ABORT_UNLESS(ScanCursor);
}
const std::shared_ptr<IScanCursor>& scanCursor, const std::shared_ptr<TReadContext>& context,
const std::optional<ui32> notFinishedIntervalIdx);

explicit TPartialReadResult(const NArrow::TShardedRecordBatch& batch, const std::shared_ptr<IScanCursor>& scanCursor,
const std::optional<ui32> notFinishedIntervalIdx)
: TPartialReadResult({}, nullptr, batch, scanCursor, notFinishedIntervalIdx) {
const std::shared_ptr<TReadContext>& context, const std::optional<ui32> notFinishedIntervalIdx)
: TPartialReadResult({}, nullptr, batch, scanCursor, context, notFinishedIntervalIdx) {
}
};

Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include "plain_read_data.h"

#include <ydb/core/tx/columnshard/engines/reader/common/result.h>

namespace NKikimr::NOlap::NReader::NPlain {

TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
: TBase(context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context))
{
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
Expand All @@ -28,8 +29,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
if (GetReadMetadata()->IsMyUncommitted(i.GetInsertWriteId())) {
continue;
}
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) ||
GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) {
if (GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetFirst()) || GetReadMetadata()->GetPKRangesFilter().CheckPoint(i.GetLast())) {
GetReadMetadata()->SetConflictedWriteId(i.GetInsertWriteId());
}
}
Expand All @@ -56,21 +56,21 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
stats->CommittedPortionsBytes = committedPortionsBytes;
stats->InsertedPortionsBytes = insertedPortionsBytes;
stats->CompactedPortionsBytes = compactedPortionsBytes;

}

std::vector<std::shared_ptr<TPartialReadResult>> TPlainReadData::DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) {
auto result = std::move(PartialResults);
PartialResults.clear();
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
ui32 count = 0;
for (auto&& r: result) {
for (auto&& r : result) {
count += r->GetRecordsCount();
}
AFL_VERIFY(count == ReadyResultsCount);
ReadyResultsCount = 0;

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)("finished", Scanner->IsFinished());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)(
"finished", Scanner->IsFinished());
return result;
}

Expand All @@ -79,9 +79,9 @@ TConclusion<bool> TPlainReadData::DoReadNextInterval() {
}

void TPlainReadData::OnIntervalResult(const std::shared_ptr<TPartialReadResult>& result) {
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
ReadyResultsCount += result->GetRecordsCount();
PartialResults.emplace_back(result);
}

}
} // namespace NKikimr::NOlap::NReader::NPlain
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "scanner.h"

#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>

#include <ydb/library/actors/core/log.h>

Expand Down Expand Up @@ -29,7 +30,7 @@ void TScanHead::OnIntervalResult(std::shared_ptr<NGroupedMemoryManager::TAllocat
}
std::vector<std::shared_ptr<NGroupedMemoryManager::TAllocationGuard>> guards = { std::move(allocationGuard) };
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, std::make_shared<TPartialReadResult>(guards, std::move(gGuard), *newBatch,
std::make_shared<TPlainScanCursor>(lastPK), callbackIdxSubscriver)).second);
std::make_shared<TPlainScanCursor>(lastPK), Context->GetCommonContext(), callbackIdxSubscriver)).second);
} else {
AFL_VERIFY(ReadyIntervals.emplace(intervalIdx, nullptr).second);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
private:
TFetchingScriptCursor Step;
std::shared_ptr<IDataSource> Source;
const NColumnShard::TCounterGuard Guard;
virtual void DoOnRequestsFinished(TDataAccessorsResult&& result) override {
AFL_VERIFY(!result.HasErrors());
AFL_VERIFY(result.GetPortions().size() == 1)("count", result.GetPortions().size());
Expand All @@ -226,10 +227,10 @@ class TPortionAccessorFetchingSubscriber: public IDataAccessorRequestsSubscriber
public:
TPortionAccessorFetchingSubscriber(const TFetchingScriptCursor& step, const std::shared_ptr<IDataSource>& source)
: Step(step)
, Source(source) {
, Source(source)
, Guard(Source->GetContext()->GetCommonContext()->GetCounters().GetFetcherAcessorsGuard()) {
}
};

} // namespace

bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,4 +19,14 @@ bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& ra
return false;
}

TBlobsFetcherTask::TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions,
const std::shared_ptr<IDataSource>& sourcePtr, const TFetchingScriptCursor& step, const std::shared_ptr<TSpecialReadContext>& context,
const TString& taskCustomer, const TString& externalTaskId)
: TBase(readActions, taskCustomer, externalTaskId)
, Source(sourcePtr)
, Step(step)
, Context(context)
, Guard(Context->GetCommonContext()->GetCounters().GetFetchBlobsGuard()) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,13 @@ class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnSha
const std::shared_ptr<IDataSource> Source;
TFetchingScriptCursor Step;
const std::shared_ptr<TSpecialReadContext> Context;
const NColumnShard::TCounterGuard Guard;

virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override;
public:
TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, const std::shared_ptr<IDataSource>& sourcePtr,
const TFetchingScriptCursor& step, const std::shared_ptr<TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId)
: TBase(readActions, taskCustomer, externalTaskId)
, Source(sourcePtr)
, Step(step)
, Context(context)
{

}
const TFetchingScriptCursor& step, const std::shared_ptr<TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId);
};

}
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,7 @@ class TStepAction: public IDataTasksProcessor::ITask {
std::shared_ptr<IDataSource> Source;
TFetchingScriptCursor Cursor;
bool FinishedFlag = false;
NColumnShard::TCounterGuard CountersGuard;
const NColumnShard::TCounterGuard CountersGuard;

protected:
virtual bool DoApply(IDataReader& owner) const override;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
#include "plain_read_data.h"

#include <ydb/core/tx/columnshard/engines/reader/common/result.h>

namespace NKikimr::NOlap::NReader::NSimple {

TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
: TBase(context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context))
{
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
Expand All @@ -27,21 +28,21 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
stats->InsertedPortionsBytes = insertedPortionsBytes;
stats->CompactedPortionsBytes = compactedPortionsBytes;

}

std::vector<std::shared_ptr<TPartialReadResult>> TPlainReadData::DoExtractReadyResults(const int64_t /*maxRowsInBatch*/) {
auto result = std::move(PartialResults);
PartialResults.clear();
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
// auto result = TPartialReadResult::SplitResults(std::move(PartialResults), maxRowsInBatch);
ui32 count = 0;
for (auto&& r: result) {
for (auto&& r : result) {
count += r->GetRecordsCount();
}
AFL_VERIFY(count == ReadyResultsCount);
ReadyResultsCount = 0;

AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)("finished", Scanner->IsFinished());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", "DoExtractReadyResults")("result", result.size())("count", count)(
"finished", Scanner->IsFinished());
return result;
}

Expand All @@ -50,9 +51,9 @@ TConclusion<bool> TPlainReadData::DoReadNextInterval() {
}

void TPlainReadData::OnIntervalResult(const std::shared_ptr<TPartialReadResult>& result) {
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
// result->GetResourcesGuardOnly()->Update(result->GetMemorySize());
ReadyResultsCount += result->GetRecordsCount();
PartialResults.emplace_back(result);
}

}
} // namespace NKikimr::NOlap::NReader::NSimple
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
#include "scanner.h"

#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/common/result.h>

#include <ydb/library/actors/core/log.h>

Expand Down Expand Up @@ -36,8 +37,8 @@ void TScanHead::OnSourceReady(const std::shared_ptr<IDataSource>& source, std::s
"source_idx", frontSource->GetSourceIdx())("table", table->num_rows());
auto cursor =
std::make_shared<TSimpleScanCursor>(frontSource->GetStartPKRecordBatch(), frontSource->GetSourceId(), startIndex + recordsCount);
reader.OnIntervalResult(
std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table, cursor, sourceIdxToContinue));
reader.OnIntervalResult(std::make_shared<TPartialReadResult>(frontSource->GetResourceGuards(), frontSource->GetGroupGuard(), table,
cursor, Context->GetCommonContext(), sourceIdxToContinue));
} else if (sourceIdxToContinue) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "continue_source")("source_id", frontSource->GetSourceId())(
"source_idx", frontSource->GetSourceIdx());
Expand Down
Loading
Loading