Skip to content

Commit

Permalink
fix states usage for accessors request filling (ydb-platform#12553)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 8, 2025
1 parent 9fbe836 commit a17a8a7
Show file tree
Hide file tree
Showing 22 changed files with 109 additions and 46 deletions.
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/columnshard__statistics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ class TColumnPortionsAccumulator {
return;
}
Result->AddWaitingTask();
std::shared_ptr<NOlap::TDataAccessorsRequest> request = std::make_shared<NOlap::TDataAccessorsRequest>();
std::shared_ptr<NOlap::TDataAccessorsRequest> request = std::make_shared<NOlap::TDataAccessorsRequest>("STATISTICS_FLUSH");
for (auto&& i : Portions) {
request->AddPortion(i);
}
Expand Down Expand Up @@ -227,7 +227,7 @@ void TColumnShard::Handle(NStat::TEvStatistics::TEvStatisticsRequest::TPtr& ev,
columnTagsRequested = std::set<ui32>(allColumnIds.begin(), allColumnIds.end());
}

NOlap::TDataAccessorsRequest request;
NOlap::TDataAccessorsRequest request("STATISTICS");
std::shared_ptr<TResultAccumulator> resultAccumulator =
std::make_shared<TResultAccumulator>(columnTagsRequested, ev->Sender, ev->Cookie, std::move(response));
auto versionedIndex = std::make_shared<NOlap::TVersionedIndex>(index.GetVersionedIndex());
Expand Down
19 changes: 12 additions & 7 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1405,12 +1405,15 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback> FetchCallback;
THashMap<ui64, std::vector<NOlap::TPortionInfo::TConstPtr>> PortionsByPath;
std::vector<TPortionConstructorV2> FetchedAccessors;
const TString Consumer;

public:
TTxAskPortionChunks(TColumnShard* self, const std::shared_ptr<NOlap::NDataAccessorControl::IAccessorCallback>& fetchCallback,
THashMap<ui64, NOlap::TPortionInfo::TConstPtr>&& portions)
THashMap<ui64, NOlap::TPortionInfo::TConstPtr>&& portions, const TString& consumer)
: TBase(self)
, FetchCallback(fetchCallback) {
, FetchCallback(fetchCallback)
, Consumer(consumer)
{
for (auto&& i : portions) {
PortionsByPath[i.second->GetPathId()].emplace_back(i.second);
}
Expand All @@ -1421,8 +1424,9 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {

TBlobGroupSelector selector(Self->Info());
bool reask = false;
NActors::TLogContextGuard lGuard = NActors::TLogContextBuilder::Build()("consumer", Consumer)("event", "TTxAskPortionChunks::Execute");
for (auto&& i : PortionsByPath) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("size", i.second.size())("path_id", i.first);
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("size", i.second.size())("path_id", i.first);
for (auto&& p : i.second) {
if (!p->GetSchema(Self->GetIndexAs<NOlap::TColumnEngineForLogs>().GetVersionedIndex())->GetIndexesCount()) {
continue;
Expand All @@ -1440,7 +1444,8 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
}

for (auto&& i : PortionsByPath) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "processing")("size", i.second.size())("path_id", i.first);
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "processing")("size", i.second.size())(
"path_id", i.first);
while (i.second.size()) {
auto p = i.second.back();
TPortionConstructorV2 constructor(p);
Expand Down Expand Up @@ -1470,11 +1475,11 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
FetchedAccessors.emplace_back(std::move(constructor));
i.second.pop_back();
}
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished")("size", i.second.size())(
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished")("size", i.second.size())(
"path_id", i.first);
}

AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "TTxAskPortionChunks::Execute")("stage", "finished");
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("stage", "finished");
NConveyor::TInsertServiceOperator::AsyncTaskToExecute(std::make_shared<TAccessorsParsingTask>(FetchCallback, std::move(FetchedAccessors)));
return true;
}
Expand All @@ -1486,7 +1491,7 @@ class TTxAskPortionChunks: public TTransactionBase<TColumnShard> {
};

void TColumnShard::Handle(NOlap::NDataAccessorControl::TEvAskTabletDataAccessors::TPtr& ev, const TActorContext& /*ctx*/) {
Execute(new TTxAskPortionChunks(this, ev->Get()->GetCallback(), std::move(ev->Get()->MutablePortions())));
Execute(new TTxAskPortionChunks(this, ev->Get()->GetCallback(), std::move(ev->Get()->MutablePortions()), ev->Get()->GetConsumer()));
}

void TColumnShard::Handle(NOlap::NDataSharing::NEvents::TEvAckFinishFromInitiator::TPtr& ev, const TActorContext& ctx) {
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/abstract/collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
namespace NKikimr::NOlap::NDataAccessorControl {

THashMap<ui64, TPortionDataAccessor> IGranuleDataAccessor::AskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) {
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
AFL_VERIFY(portions.size());
return DoAskData(portions, callback);
return DoAskData(portions, callback, consumer);
}

void TActorAccessorsCallback::OnAccessorsFetched(std::vector<TPortionDataAccessor>&& accessors) {
Expand Down
5 changes: 3 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/abstract/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ class IGranuleDataAccessor {
const ui64 PathId;

virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) = 0;
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) = 0;
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) = 0;

public:
Expand All @@ -39,7 +39,8 @@ class IGranuleDataAccessor {
: PathId(pathId) {
}

THashMap<ui64, TPortionDataAccessor> AskData(const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback);
THashMap<ui64, TPortionDataAccessor> AskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer);
void ModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) {
return DoModifyPortions(add, remove);
}
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/columnshard/data_accessor/events.h
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,14 @@ class TEvAskTabletDataAccessors: public NActors::TEventLocal<TEvAskTabletDataAcc
using TPortions = THashMap<ui64, TPortionInfo::TConstPtr>;
YDB_ACCESSOR_DEF(TPortions, Portions);
YDB_READONLY_DEF(std::shared_ptr<NDataAccessorControl::IAccessorCallback>, Callback);
YDB_READONLY_DEF(TString, Consumer);

public:
explicit TEvAskTabletDataAccessors(
const THashMap<ui64, TPortionInfo::TConstPtr>& portions, const std::shared_ptr<NDataAccessorControl::IAccessorCallback>& callback)
explicit TEvAskTabletDataAccessors(const THashMap<ui64, TPortionInfo::TConstPtr>& portions,
const std::shared_ptr<NDataAccessorControl::IAccessorCallback>& callback, const TString& consumer)
: Portions(portions)
, Callback(callback) {
, Callback(callback)
, Consumer(consumer) {
}
};

Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/data_accessor/in_mem/collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
namespace NKikimr::NOlap::NDataAccessorControl::NInMem {

THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& /*callback*/) {
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& /*callback*/, const TString& /*consumer*/) {
THashMap<ui64, TPortionDataAccessor> accessors;
for (auto&& i : portions) {
auto it = Accessors.find(i->GetPortionId());
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/data_accessor/in_mem/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ class TCollector: public IGranuleDataAccessor {
using TBase = IGranuleDataAccessor;
THashMap<ui64, TPortionDataAccessor> Accessors;
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) override;
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add,
const std::vector<ui64>& remove) override;

Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/local_db/collector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
namespace NKikimr::NOlap::NDataAccessorControl::NLocalDB {

THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) {
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) {
THashMap<ui64, TPortionDataAccessor> accessors;
THashMap<ui64, TPortionInfo::TConstPtr> portionsToDirectAsk;
for (auto&& p : portions) {
Expand All @@ -17,7 +17,7 @@ THashMap<ui64, TPortionDataAccessor> TCollector::DoAskData(
}
if (portionsToDirectAsk.size()) {
NActors::TActivationContext::Send(
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portionsToDirectAsk, callback));
TabletActorId, std::make_unique<NDataAccessorControl::TEvAskTabletDataAccessors>(portionsToDirectAsk, callback, consumer));
}
return accessors;
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/local_db/collector.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ class TCollector: public IGranuleDataAccessor {

TLRUCache<ui64, TPortionDataAccessor, TNoopDelete, TMetadataSizeProvider> AccessorsCache;
using TBase = IGranuleDataAccessor;
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(
const std::vector<TPortionInfo::TConstPtr>& portions, const std::shared_ptr<IAccessorCallback>& callback) override;
virtual THashMap<ui64, TPortionDataAccessor> DoAskData(const std::vector<TPortionInfo::TConstPtr>& portions,
const std::shared_ptr<IAccessorCallback>& callback, const TString& consumer) override;
virtual void DoModifyPortions(const std::vector<TPortionDataAccessor>& add, const std::vector<ui64>& remove) override;

public:
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/data_accessor/manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ class TLocalManager: public IDataAccessorsManager {
if (portionsAsk.empty()) {
continue;
}
auto accessors = it->second->AskData(portionsAsk, AccessorCallback);
auto accessors = it->second->AskData(portionsAsk, AccessorCallback, request->GetConsumer());
for (auto&& p : portionsAsk) {
auto itAccessor = accessors.find(p->GetPortionId());
if (itAccessor == accessors.end()) {
Expand Down
10 changes: 8 additions & 2 deletions ydb/core/tx/columnshard/data_accessor/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -169,6 +169,7 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
static inline TAtomicCounter Counter = 0;
ui32 FetchStage = 0;
YDB_READONLY(ui64, RequestId, Counter.Inc());
YDB_READONLY_DEF(TString, Consumer);
THashSet<ui64> PortionIds;
THashMap<ui64, TPathFetchingState> PathIdStatus;
THashSet<ui64> PathIds;
Expand Down Expand Up @@ -208,7 +209,11 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
return sb;
}

TDataAccessorsRequest() = default;
TDataAccessorsRequest(const TString& consumer)
: Consumer(consumer)
{

}

ui64 PredictAccessorsMemory(const ISnapshotSchema::TPtr& schema) const {
ui64 result = 0;
Expand Down Expand Up @@ -276,7 +281,8 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat
}

void AddError(const ui64 pathId, const TString& errorMessage) {
AFL_VERIFY(FetchStage == 1);
AFL_ERROR(NKikimrServices::TX_COLUMNSHARD)("error", errorMessage)("event", "ErrorOnFetching")("path_id", pathId);
AFL_VERIFY(FetchStage <= 1);
auto itStatus = PathIdStatus.find(pathId);
AFL_VERIFY(itStatus != PathIdStatus.end());
itStatus->second.OnError(errorMessage);
Expand Down
14 changes: 14 additions & 0 deletions ydb/core/tx/columnshard/data_locks/locks/composite.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,20 @@ class TCompositeLock: public ILock {
return Locks.empty();
}
public:
static std::shared_ptr<ILock> Build(const TString& lockName, const std::initializer_list<std::shared_ptr<ILock>>& locks) {
std::vector<std::shared_ptr<ILock>> locksUseful;
for (auto&& i : locks) {
if (i && !i->IsEmpty()) {
locksUseful.emplace_back(i);
}
}
if (locksUseful.size() == 1) {
return locksUseful.front();
} else {
return std::make_shared<TCompositeLock>(lockName, locksUseful);
}
}

TCompositeLock(const TString& lockName, const std::vector<std::shared_ptr<ILock>>& locks,
const ELockCategory category = NDataLocks::ELockCategory::Any, const bool readOnly = false)
: TBase(lockName, category, readOnly)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,7 +242,7 @@ class TColumnEngineChanges {
return DoBuildDataLock();
}

std::shared_ptr<TDataAccessorsRequest> PortionsToAccess = std::make_shared<TDataAccessorsRequest>();
std::shared_ptr<TDataAccessorsRequest> PortionsToAccess = std::make_shared<TDataAccessorsRequest>(TaskIdentifier);
virtual void OnDataAccessorsInitialized(const TDataAccessorsInitializationContext& context) = 0;

public:
Expand Down
33 changes: 33 additions & 0 deletions ydb/core/tx/columnshard/engines/changes/cleanup_portions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,19 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC
if (!self) {
return;
}
THashSet<ui64> usedPortionIds;
auto schemaPtr = context.EngineLogs.GetVersionedIndex().GetLastSchema();
for (auto&& i : PortionsToRemove) {
Y_ABORT_UNLESS(!i->HasRemoveSnapshot());
AFL_VERIFY(usedPortionIds.emplace(i->GetPortionId()).second)("portion_info", i->DebugString(true));
const auto pred = [&](TPortionInfo& portionCopy) {
portionCopy.SetRemoveSnapshot(context.Snapshot);
};
context.EngineLogs.GetGranuleVerified(i->GetPathId())
.ModifyPortionOnExecute(
context.DBWrapper, GetPortionDataAccessor(i->GetPortionId()), pred, schemaPtr->GetIndexInfo().GetPKFirstColumnId());
}

THashMap<TString, THashSet<TUnifiedBlobId>> blobIdsByStorage;
for (auto&& [_, p] : FetchedDataAccessors->GetPortions()) {
p.RemoveFromDatabase(context.DBWrapper);
Expand All @@ -37,12 +50,32 @@ void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnExecute(NColumnShard::TC
}

void TCleanupPortionsColumnEngineChanges::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self, TWriteIndexCompleteContext& context) {
{
auto g = context.EngineLogs.GranulesStorage->GetStats()->StartPackModification();
for (auto&& i : PortionsToRemove) {
Y_ABORT_UNLESS(!i->HasRemoveSnapshot());
const auto pred = [&](const std::shared_ptr<TPortionInfo>& portion) {
portion->SetRemoveSnapshot(context.Snapshot);
};
context.EngineLogs.ModifyPortionOnComplete(i, pred);
context.EngineLogs.AddCleanupPortion(i);
}
}
for (auto& portionInfo : PortionsToDrop) {
if (!context.EngineLogs.ErasePortion(*portionInfo)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "Cannot erase portion")("portion", portionInfo->DebugString());
}
}
if (self) {
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_DEACTIVATED, PortionsToRemove.size());
for (auto& portionInfo : PortionsToRemove) {
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BLOBS_DEACTIVATED, portionInfo->GetBlobIdsCount());
for (auto& blobId : portionInfo->GetBlobIds()) {
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_BYTES_DEACTIVATED, blobId.BlobSize());
}
self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_RAW_BYTES_DEACTIVATED, portionInfo->GetTotalRawBytes());
}

self->Counters.GetTabletCounters()->IncCounter(NColumnShard::COUNTER_PORTIONS_ERASED, PortionsToDrop.size());
for (auto&& p : PortionsToDrop) {
self->Counters.GetTabletCounters()->OnDropPortionEvent(p->GetTotalRawBytes(), p->GetTotalBlobBytes(), p->GetRecordsCount());
Expand Down
22 changes: 12 additions & 10 deletions ydb/core/tx/columnshard/engines/changes/cleanup_portions.h
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges,
using TBase = TColumnEngineChanges;
THashMap<TString, std::vector<std::shared_ptr<TPortionInfo>>> StoragePortions;
std::vector<TPortionInfo::TConstPtr> PortionsToDrop;
std::vector<TPortionInfo::TConstPtr> PortionsToRemove;
THashSet<ui64> TablesToDrop;

protected:
Expand Down Expand Up @@ -37,16 +38,13 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges,
return NDataLocks::ELockCategory::Cleanup;
}
virtual std::shared_ptr<NDataLocks::ILock> DoBuildDataLock() const override {
auto portionsLock = std::make_shared<NDataLocks::TListPortionsLock>(
TypeString() + "::PORTIONS::" + GetTaskIdentifier(), PortionsToDrop, NDataLocks::ELockCategory::Cleanup);
if (TablesToDrop.size()) {
auto tablesLock = std::make_shared<NDataLocks::TListTablesLock>(
TypeString() + "::TABLES::" + GetTaskIdentifier(), TablesToDrop, NDataLocks::ELockCategory::Tables);
return std::shared_ptr<NDataLocks::TCompositeLock>(
new NDataLocks::TCompositeLock(TypeString() + "::COMPOSITE::" + GetTaskIdentifier(), { portionsLock, tablesLock }));
} else {
return portionsLock;
}
auto portionsDropLock = std::make_shared<NDataLocks::TListPortionsLock>(
TypeString() + "::PORTIONS_DROP::" + GetTaskIdentifier(), PortionsToDrop, NDataLocks::ELockCategory::Cleanup);
auto portionsRemoveLock = std::make_shared<NDataLocks::TListPortionsLock>(
TypeString() + "::PORTIONS_REMOVE::" + GetTaskIdentifier(), PortionsToRemove, NDataLocks::ELockCategory::Compaction);
auto tablesLock = std::make_shared<NDataLocks::TListTablesLock>(
TypeString() + "::TABLES::" + GetTaskIdentifier(), TablesToDrop, NDataLocks::ELockCategory::Tables);
return NDataLocks::TCompositeLock::Build(TypeString() + "::COMPOSITE::" + GetTaskIdentifier(), {portionsDropLock, portionsRemoveLock, tablesLock});
}

public:
Expand All @@ -68,6 +66,10 @@ class TCleanupPortionsColumnEngineChanges: public TColumnEngineChanges,
PortionsToAccess->AddPortion(portion);
}

void AddPortionToRemove(const TPortionInfo::TConstPtr& portion) {
PortionsToRemove.emplace_back(portion);
}

virtual ui32 GetWritePortionsCount() const override {
return 0;
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,7 +355,7 @@ std::shared_ptr<TCleanupPortionsColumnEngineChanges> TColumnEngineForLogs::Start
limitExceeded = true;
break;
}
changes->AddPortionToDrop(info);
changes->AddPortionToRemove(info);
++portionsFromDrop;
}
changes->AddTableToDrop(pathId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -237,7 +237,7 @@ bool TPortionDataSource::DoStartFetchingAccessor(const std::shared_ptr<IDataSour
AFL_VERIFY(!StageData->HasPortionAccessor());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", step.GetName())("fetching_info", step.DebugString());

std::shared_ptr<TDataAccessorsRequest> request = std::make_shared<TDataAccessorsRequest>();
std::shared_ptr<TDataAccessorsRequest> request = std::make_shared<TDataAccessorsRequest>("PLAIN::" + step.GetName());
request->AddPortion(Portion);
request->RegisterSubscriber(std::make_shared<TPortionAccessorFetchingSubscriber>(step, sourcePtr));
GetContext()->GetCommonContext()->GetDataAccessorsManager()->AskData(request);
Expand Down
Loading

0 comments on commit a17a8a7

Please sign in to comment.