Skip to content

Commit

Permalink
blobs fetcher unification (ydb-platform#12858)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 8, 2025
1 parent ad5fa84 commit a3ef3bb
Show file tree
Hide file tree
Showing 10 changed files with 105 additions and 96 deletions.
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
#include "constructor.h"
#include <ydb/core/tx/conveyor/usage/service.h>

#include <ydb/core/tx/columnshard/columnshard_private_events.h>
#include <ydb/core/tx/conveyor/usage/service.h>

namespace NKikimr::NOlap::NReader::NSimple {
namespace NKikimr::NOlap::NReader::NCommon {

void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& /*resourcesGuard*/) {
Source->MutableStageData().AddBlobs(Source->DecodeBlobAddresses(ExtractBlobsData()));
Expand All @@ -12,22 +13,23 @@ void TBlobsFetcherTask::DoOnDataReady(const std::shared_ptr<NResourceBroker::NSu
}

bool TBlobsFetcherTask::DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) {
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())("scan_actor_id", Context->GetCommonContext()->GetScanActorId())
("status", status.GetErrorMessage())("status_code", status.GetStatus())("storage_id", storageId);
NActors::TActorContext::AsActorContext().Send(Context->GetCommonContext()->GetScanActorId(),
std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD_SCAN)("error_on_blob_reading", range.ToString())(
"scan_actor_id", Context->GetCommonContext()->GetScanActorId())("status", status.GetErrorMessage())("status_code", status.GetStatus())(
"storage_id", storageId);
NActors::TActorContext::AsActorContext().Send(
Context->GetCommonContext()->GetScanActorId(), std::make_unique<NColumnShard::TEvPrivate::TEvTaskProcessedResult>(
TConclusionStatus::Fail("cannot read blob range " + range.ToString())));
return false;
}

TBlobsFetcherTask::TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions,
const std::shared_ptr<NCommon::IDataSource>& sourcePtr, const TFetchingScriptCursor& step,
const std::shared_ptr<NCommon::TSpecialReadContext>& context,
const TString& taskCustomer, const TString& externalTaskId)
const std::shared_ptr<NCommon::TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId)
: TBase(readActions, taskCustomer, externalTaskId)
, Source(sourcePtr)
, Step(step)
, Context(context)
, Guard(Context->GetCommonContext()->GetCounters().GetFetchBlobsGuard()) {
}

}
} // namespace NKikimr::NOlap::NReader::NCommon
Original file line number Diff line number Diff line change
@@ -1,33 +1,37 @@
#pragma once
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/blobs_reader/task.h>
#include <ydb/core/tx/columnshard/blob.h>
#include "fetching.h"
#include "source.h"

namespace NKikimr::NOlap::NReader::NPlain {
#include <ydb/core/tx/columnshard/blob.h>
#include <ydb/core/tx/columnshard/blobs_reader/task.h>
#include <ydb/core/tx/columnshard/engines/portions/column_record.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_context.h>
#include <ydb/core/tx/columnshard/engines/reader/abstract/read_metadata.h>

namespace NKikimr::NOlap::NReader::NCommon {

class TBlobsFetcherTask: public NBlobOperations::NRead::ITask, public NColumnShard::TMonitoringObjectsCounter<TBlobsFetcherTask> {
private:
using TBase = NBlobOperations::NRead::ITask;
const std::shared_ptr<NCommon::IDataSource> Source;
const std::shared_ptr<IDataSource> Source;
TFetchingScriptCursor Step;
const std::shared_ptr<NCommon::TSpecialReadContext> Context;
const std::shared_ptr<TSpecialReadContext> Context;
const NColumnShard::TCounterGuard Guard;

virtual void DoOnDataReady(const std::shared_ptr<NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override;
virtual bool DoOnError(const TString& storageId, const TBlobRange& range, const IBlobsReadingAction::TErrorStatus& status) override;

public:
TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, const std::shared_ptr<NCommon::IDataSource>& sourcePtr,
template <class TSource>
TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions, const std::shared_ptr<TSource>& sourcePtr,
const TFetchingScriptCursor& step, const std::shared_ptr<NCommon::TSpecialReadContext>& context, const TString& taskCustomer,
const TString& externalTaskId)
: TBase(readActions, taskCustomer, externalTaskId)
, Source(sourcePtr)
, Step(step)
, Context(context)
{

: TBlobsFetcherTask(readActions, std::static_pointer_cast<IDataSource>(sourcePtr), step, context, taskCustomer, externalTaskId) {
}

TBlobsFetcherTask(const std::vector<std::shared_ptr<IBlobsReadingAction>>& readActions,
const std::shared_ptr<NCommon::IDataSource>& sourcePtr, const TFetchingScriptCursor& step,
const std::shared_ptr<NCommon::TSpecialReadContext>& context, const TString& taskCustomer, const TString& externalTaskId);
};

}
} // namespace NKikimr::NOlap::NReader::NCommon
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include "columns_set.h"

#include <ydb/core/tx/columnshard/counters/common/owner.h>
#include <ydb/core/tx/columnshard/counters/scan.h>
#include <ydb/core/tx/columnshard/engines/reader/common/conveyor_task.h>

Expand All @@ -17,11 +18,58 @@ class IDataSource;
class TSpecialReadContext;
class TFetchingScriptCursor;

class IFetchingStep {
class TFetchingStepSignals: public NColumnShard::TCommonCountersOwner {
private:
using TBase = NColumnShard::TCommonCountersOwner;
NMonitoring::TDynamicCounters::TCounterPtr DurationCounter;
NMonitoring::TDynamicCounters::TCounterPtr BytesCounter;

public:
TFetchingStepSignals(NColumnShard::TCommonCountersOwner&& owner)
: TBase(std::move(owner))
, DurationCounter(TBase::GetDeriviative("duration_ms"))
, BytesCounter(TBase::GetDeriviative("bytes_ms")) {
}

void AddDuration(const TDuration d) const {
DurationCounter->Add(d.MilliSeconds());
}

void AddBytes(const ui32 v) const {
BytesCounter->Add(v);
}
};

class TFetchingStepsSignalsCollection: public NColumnShard::TCommonCountersOwner {
private:
using TBase = NColumnShard::TCommonCountersOwner;
TMutex Mutex;
THashMap<TString, TFetchingStepSignals> Collection;
TFetchingStepSignals GetSignalsImpl(const TString& name) {
TGuard<TMutex> g(Mutex);
auto it = Collection.find(name);
if (it == Collection.end()) {
it = Collection.emplace(name, TFetchingStepSignals(CreateSubGroup("step_name", name))).first;
}
return it->second;
}

public:
TFetchingStepsSignalsCollection()
: TBase("scan_steps") {
}

static TFetchingStepSignals GetSignals(const TString& name) {
return Singleton<TFetchingStepsSignalsCollection>()->GetSignalsImpl(name);
}
};

class IFetchingStep: public TNonCopyable {
private:
YDB_READONLY_DEF(TString, Name);
YDB_READONLY(TDuration, SumDuration, TDuration::Zero());
YDB_READONLY(ui64, SumSize, 0);
TFetchingStepSignals Signals;

protected:
virtual TConclusion<bool> DoExecuteInplace(const std::shared_ptr<IDataSource>& source, const TFetchingScriptCursor& step) const = 0;
Expand All @@ -32,9 +80,11 @@ class IFetchingStep {
public:
void AddDuration(const TDuration d) {
SumDuration += d;
Signals.AddDuration(d);
}
void AddDataSize(const ui64 size) {
SumSize += size;
Signals.AddBytes(size);
}

virtual ~IFetchingStep() = default;
Expand All @@ -48,7 +98,8 @@ class IFetchingStep {
}

IFetchingStep(const TString& name)
: Name(name) {
: Name(name)
, Signals(TFetchingStepsSignalsCollection::GetSignals(name)) {
}

TString DebugString() const;
Expand Down Expand Up @@ -195,9 +246,7 @@ class TStepAction: public IDataTasksProcessor::ITask {

template <class T>
TStepAction(const std::shared_ptr<T>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId)
: TStepAction(std::static_pointer_cast<IDataSource>(source), std::move(cursor), ownerActorId)
{

: TStepAction(std::static_pointer_cast<IDataSource>(source), std::move(cursor), ownerActorId) {
}
TStepAction(const std::shared_ptr<IDataSource>& source, TFetchingScriptCursor&& cursor, const NActors::TActorId& ownerActorId);
};
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
LIBRARY()

SRCS(
fetched_data.cpp
columns_set.cpp
iterator.cpp
constructor.cpp
context.cpp
source.cpp
fetching.cpp
fetch_steps.cpp
fetched_data.cpp
fetching.cpp
iterator.cpp
source.cpp
)

PEERDIR(
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
#include "constructor.h"
#include "fetched_data.h"
#include "interval.h"
#include "plain_read_data.h"
Expand All @@ -7,6 +6,7 @@
#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
#include <ydb/core/tx/columnshard/blobs_reader/events.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
Expand All @@ -17,7 +17,7 @@ namespace NKikimr::NOlap::NReader::NPlain {

void IDataSource::InitFetchingPlan(const std::shared_ptr<TFetchingScript>& fetching) {
AFL_VERIFY(fetching);
// AFL_VERIFY(!FetchingPlan);
// AFL_VERIFY(!FetchingPlan);
FetchingPlan = fetching;
}

Expand Down Expand Up @@ -122,7 +122,8 @@ bool TPortionDataSource::DoStartFetchingColumns(
return false;
}

auto constructor = std::make_shared<TBlobsFetcherTask>(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
auto constructor =
std::make_shared<NCommon::TBlobsFetcherTask>(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
return true;
}
Expand Down Expand Up @@ -157,7 +158,8 @@ bool TPortionDataSource::DoStartFetchingIndexes(
return false;
}

auto constructor = std::make_shared<TBlobsFetcherTask>(readingActions, std::static_pointer_cast<NCommon::IDataSource>(sourcePtr), step, GetContext(), "CS::READ::" + step.GetName(), "");
auto constructor =
std::make_shared<NCommon::TBlobsFetcherTask>(readingActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
return true;
}
Expand Down Expand Up @@ -276,7 +278,7 @@ bool TCommittedDataSource::DoStartFetchingColumns(
readAction->AddRange(CommittedBlob.GetBlobRange());

std::vector<std::shared_ptr<IBlobsReadingAction>> actions = { readAction };
auto constructor = std::make_shared<TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
auto constructor = std::make_shared<NCommon::TBlobsFetcherTask>(actions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
return true;
}
Expand All @@ -290,7 +292,8 @@ void TCommittedDataSource::DoAssembleColumns(const std::shared_ptr<TColumnsSet>&
AFL_VERIFY(GetStageData().GetBlobs().size() == 1);
auto bData = MutableStageData().ExtractBlob(GetStageData().GetBlobs().begin()->first);
auto schema = GetContext()->GetReadMetadata()->GetBlobSchema(CommittedBlob.GetSchemaVersion());
auto rBatch = NArrow::DeserializeBatch(bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema.begin(), schema.end())));
auto rBatch = NArrow::DeserializeBatch(
bData, std::make_shared<arrow::Schema>(CommittedBlob.GetSchemaSubset().Apply(schema.begin(), schema.end())));
AFL_VERIFY(rBatch)("schema", schema.ToString());
auto batch = std::make_shared<NArrow::TGeneralContainer>(rBatch);
std::set<ui32> columnIdsToDelete = batchSchema->GetColumnIdsToDelete(resultSchema);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ LIBRARY()

SRCS(
scanner.cpp
constructor.cpp
source.cpp
interval.cpp
fetched_data.cpp
Expand Down

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
#include "constructor.h"
#include "fetched_data.h"
#include "plain_read_data.h"
#include "source.h"

#include <ydb/core/tx/columnshard/blobs_reader/actor.h>
#include <ydb/core/tx/columnshard/blobs_reader/events.h>
#include <ydb/core/tx/columnshard/engines/portions/data_accessor.h>
#include <ydb/core/tx/columnshard/engines/reader/common_reader/iterator/constructor.h>
#include <ydb/core/tx/columnshard/hooks/abstract/abstract.h>
#include <ydb/core/tx/conveyor/usage/service.h>
#include <ydb/core/tx/limiter/grouped_memory/usage/service.h>
Expand All @@ -29,7 +29,7 @@ void IDataSource::StartProcessing(const std::shared_ptr<IDataSource>& sourcePtr)
GetContext()->GetProcessMemoryControlId(), GetContext()->GetCommonContext()->GetScanId());
SetMemoryGroupId(SourceGroupGuard->GetGroupId());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("InitFetchingPlan", FetchingPlan->DebugString())("source_idx", GetSourceIdx());
// NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
// NActors::TLogContextGuard logGuard(NActors::TLogContextBuilder::Build()("source", SourceIdx)("method", "InitFetchingPlan"));
TFetchingScriptCursor cursor(FetchingPlan, 0);
auto task = std::make_shared<TStepAction>(sourcePtr, std::move(cursor), GetContext()->GetCommonContext()->GetScanActorId());
NConveyor::TScanServiceOperator::SendTaskToExecute(task);
Expand Down Expand Up @@ -123,7 +123,8 @@ bool TPortionDataSource::DoStartFetchingColumns(
return false;
}

auto constructor = std::make_shared<TBlobsFetcherTask>(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
auto constructor =
std::make_shared<NCommon::TBlobsFetcherTask>(readActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
return true;
}
Expand Down Expand Up @@ -158,8 +159,8 @@ bool TPortionDataSource::DoStartFetchingIndexes(
return false;
}

auto constructor = std::make_shared<TBlobsFetcherTask>(
readingActions, std::static_pointer_cast<NCommon::IDataSource>(sourcePtr), step, GetContext(), "CS::READ::" + step.GetName(), "");
auto constructor =
std::make_shared<NCommon::TBlobsFetcherTask>(readingActions, sourcePtr, step, GetContext(), "CS::READ::" + step.GetName(), "");
NActors::TActivationContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(constructor));
return true;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ LIBRARY()

SRCS(
scanner.cpp
constructor.cpp
source.cpp
fetched_data.cpp
plain_read_data.cpp
Expand Down

0 comments on commit a3ef3bb

Please sign in to comment.