Skip to content

Commit

Permalink
Merge 4599eec into 41baadd
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Nov 21, 2024
2 parents 41baadd + 4599eec commit 1d511c3
Show file tree
Hide file tree
Showing 8 changed files with 117 additions and 37 deletions.
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
return;
}

auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaVerified(operation.GetTableId().GetSchemaVersion());
auto schema = TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetSchemaOptional(operation.GetTableId().GetSchemaVersion());
if (!schema) {
Counters.GetTabletCounters()->IncCounter(COUNTER_WRITE_FAIL);
auto result = NEvents::TDataEvents::TEvWriteResult::BuildError(
Expand Down
100 changes: 78 additions & 22 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
#include "blobs_action/transaction/tx_remove_blobs.h"
#include "blobs_action/transaction/tx_gc_insert_table.h"
#include "blobs_action/transaction/tx_gc_indexed.h"
#include "blobs_reader/actor.h"
#include "bg_tasks/events/events.h"

#include "data_accessor/manager.h"
Expand Down Expand Up @@ -579,8 +580,13 @@ class TChangesReadTask: public NOlap::NBlobOperations::NRead::ITask {

protected:
virtual void DoOnDataReady(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& resourcesGuard) override {
if (!!resourcesGuard) {
AFL_VERIFY(!TxEvent->IndexChanges->ResourcesGuard);
TxEvent->IndexChanges->ResourcesGuard = resourcesGuard;
} else {
AFL_VERIFY(TxEvent->IndexChanges->ResourcesGuard);
}
TxEvent->IndexChanges->Blobs = ExtractBlobsData();
TxEvent->IndexChanges->ResourcesGuard = resourcesGuard;
const bool isInsert = !!dynamic_pointer_cast<NOlap::TInsertColumnEngineChanges>(TxEvent->IndexChanges);
std::shared_ptr<NConveyor::ITask> task = std::make_shared<TChangesTask>(std::move(TxEvent), Counters, TabletId, ParentActorId, LastCompletedTx);
if (isInsert) {
Expand Down Expand Up @@ -615,6 +621,7 @@ class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
const NActors::TActorId ShardActorId;
std::shared_ptr<NOlap::TColumnEngineChanges> Changes;
std::shared_ptr<NOlap::TVersionedIndex> VersionedIndex;
std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard> ResourcesGuard;

virtual void DoOnRequestsFinishedImpl() = 0;

Expand All @@ -624,6 +631,16 @@ class TDataAccessorsSubscriber: public NOlap::IDataAccessorRequestsSubscriber {
}

public:
void SetResourcesGuard(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) {
AFL_VERIFY(!ResourcesGuard);
ResourcesGuard = guard;
}

std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>&& ExtractResourcesGuard() {
AFL_VERIFY(ResourcesGuard);
return std::move(ResourcesGuard);
}

TDataAccessorsSubscriber(const NActors::TActorId& shardActorId, const std::shared_ptr<NOlap::TColumnEngineChanges>& changes,
const std::shared_ptr<NOlap::TVersionedIndex>& versionedIndex)
: ShardActorId(shardActorId)
Expand Down Expand Up @@ -801,6 +818,30 @@ void TColumnShard::SetupCompaction(const std::set<ui64>& pathIds) {
}
}

class TAccessorsMemorySubscriber: public NOlap::NResourceBroker::NSubscribe::ITask {
private:
using TBase = NOlap::NResourceBroker::NSubscribe::ITask;
std::shared_ptr<NOlap::TDataAccessorsRequest> Request;
std::shared_ptr<TDataAccessorsSubscriber> Subscriber;
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;

virtual void DoOnAllocationSuccess(const std::shared_ptr<NOlap::NResourceBroker::NSubscribe::TResourcesGuard>& guard) override {
Subscriber->SetResourcesGuard(guard);
Request->RegisterSubscriber(Subscriber);
DataAccessorsManager->AskData(Request);
}

public:
TAccessorsMemorySubscriber(const ui64 memory, const TString& externalTaskId, const NOlap::NResourceBroker::NSubscribe::TTaskContext& context,
std::shared_ptr<NOlap::TDataAccessorsRequest>&& request, const std::shared_ptr<TDataAccessorsSubscriber>& subscriber,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager)
: TBase(0, memory, externalTaskId, context)
, Request(std::move(request))
, Subscriber(subscriber)
, DataAccessorsManager(dataAccessorsManager) {
}
};

class TCompactionDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRead {
private:
using TBase = TDataAccessorsSubscriberWithRead;
Expand All @@ -811,10 +852,9 @@ class TCompactionDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRea
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "compaction")("external_task_id", externalTaskId);

auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, CacheDataAfterWrite);
auto readSubscriber = std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
std::make_shared<TCompactChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification), 0,
Changes->CalcMemoryForUsage(), externalTaskId, TaskSubscriptionContext);
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor, readSubscriber);
ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard();
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
std::make_shared<TCompactChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
}

public:
Expand All @@ -837,10 +877,14 @@ void TColumnShard::StartCompaction(const std::shared_ptr<NPrioritiesQueue::TAllo

auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
auto request = compaction->ExtractDataAccessorsRequest();
request->RegisterSubscriber(std::make_shared<TCompactionDataAccessorsSubscriber>(ResourceSubscribeActor, indexChanges, actualIndexInfo,
const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) +
indexChanges->CalcMemoryForUsage();
const auto subscriber = std::make_shared<TCompactionDataAccessorsSubscriber>(ResourceSubscribeActor, indexChanges, actualIndexInfo,
Settings.CacheDataAfterCompaction, SelfId(), TabletID(), Counters.GetCompactionCounters(), GetLastCompletedTx(),
CompactTaskSubscription));
TablesManager.GetPrimaryIndex()->FetchDataAccessors(request);
CompactTaskSubscription);
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, indexChanges->GetTaskIdentifier(),
CompactTaskSubscription, std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
}

class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriberWithRead {
Expand All @@ -851,11 +895,9 @@ class TWriteEvictPortionsDataAccessorsSubscriber: public TDataAccessorsSubscribe
virtual void DoOnRequestsFinishedImpl() override {
ACFL_DEBUG("background", "ttl")("need_writes", true);
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false);
auto readSubscriber = std::make_shared<NOlap::NBlobOperations::NRead::ITask::TReadSubscriber>(
std::make_shared<TTTLChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification), 0,
Changes->CalcMemoryForUsage(), Changes->GetTaskIdentifier(), TaskSubscriptionContext);

NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(ResourceSubscribeActor, readSubscriber);
ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard();
TActorContext::AsActorContext().Register(new NOlap::NBlobOperations::NRead::TActor(
std::make_shared<TTTLChangesReadTask>(std::move(ev), ShardActorId, ShardTabletId, Counters, SnapshotModification)));
}

public:
Expand Down Expand Up @@ -911,7 +953,8 @@ void TColumnShard::SetupMetadata() {
}

bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
if (!AppDataVerified().ColumnShardConfig.GetTTLEnabled() || !NYDBTest::TControllers::GetColumnShardController()->IsBackgroundEnabled(NYDBTest::ICSController::EBackground::TTL)) {
if (!AppDataVerified().ColumnShardConfig.GetTTLEnabled() ||
!NYDBTest::TControllers::GetColumnShardController()->IsBackgroundEnabled(NYDBTest::ICSController::EBackground::TTL)) {
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("event", "skip_ttl")("reason", "disabled");
return false;
}
Expand All @@ -922,7 +965,8 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
}

const ui64 memoryUsageLimit = HasAppData() ? AppDataVerified().ColumnShardConfig.GetTieringsMemoryLimit() : ((ui64)512 * 1024 * 1024);
std::vector<std::shared_ptr<NOlap::TTTLColumnEngineChanges>> indexChanges = TablesManager.MutablePrimaryIndex().StartTtl(eviction, DataLocksManager, memoryUsageLimit);
std::vector<std::shared_ptr<NOlap::TTTLColumnEngineChanges>> indexChanges =
TablesManager.MutablePrimaryIndex().StartTtl(eviction, DataLocksManager, memoryUsageLimit);

if (indexChanges.empty()) {
ACFL_DEBUG("background", "ttl")("skip_reason", "no_changes");
Expand All @@ -933,14 +977,21 @@ bool TColumnShard::SetupTtl(const THashMap<ui64, NOlap::TTiering>& pathTtls) {
for (auto&& i : indexChanges) {
i->Start(*this);
auto request = i->ExtractDataAccessorsRequest();
ui64 memoryUsage = 0;
std::shared_ptr<TDataAccessorsSubscriber> subscriber;
if (i->NeedConstruction()) {
request->RegisterSubscriber(std::make_shared<TWriteEvictPortionsDataAccessorsSubscriber>(ResourceSubscribeActor, i,
actualIndexInfo, Settings.CacheDataAfterCompaction, SelfId(), TabletID(), Counters.GetEvictionCounters(), GetLastCompletedTx(),
TTLTaskSubscription));
subscriber = std::make_shared<TWriteEvictPortionsDataAccessorsSubscriber>(ResourceSubscribeActor, i, actualIndexInfo,
Settings.CacheDataAfterCompaction, SelfId(), TabletID(), Counters.GetEvictionCounters(), GetLastCompletedTx(),
TTLTaskSubscription);
memoryUsage = i->CalcMemoryForUsage();
} else {
request->RegisterSubscriber(std::make_shared<TNoWriteEvictPortionsDataAccessorsSubscriber>(SelfId(), i, actualIndexInfo));
subscriber = std::make_shared<TNoWriteEvictPortionsDataAccessorsSubscriber>(SelfId(), i, actualIndexInfo);
}
TablesManager.GetPrimaryIndex()->FetchDataAccessors(request);
const ui64 accessorsMemory =
request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema()) + memoryUsage;
NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, i->GetTaskIdentifier(), TTLTaskSubscription,
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
}
return true;
}
Expand All @@ -953,6 +1004,7 @@ class TCleanupPortionsDataAccessorsSubscriber: public TDataAccessorsSubscriber {
virtual void DoOnRequestsFinishedImpl() override {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("background", "cleanup")("changes_info", Changes->DebugString());
auto ev = std::make_unique<TEvPrivate::TEvWriteIndex>(VersionedIndex, Changes, false);
ev->IndexChanges->ResourcesGuard = ExtractResourcesGuard();
ev->SetPutStatus(NKikimrProto::OK); // No new blobs to write
NActors::TActivationContext::Send(ShardActorId, std::move(ev));
}
Expand Down Expand Up @@ -982,8 +1034,12 @@ void TColumnShard::SetupCleanupPortions() {

auto request = changes->ExtractDataAccessorsRequest();
auto actualIndexInfo = std::make_shared<NOlap::TVersionedIndex>(TablesManager.GetPrimaryIndex()->GetVersionedIndex());
request->RegisterSubscriber(std::make_shared<TCleanupPortionsDataAccessorsSubscriber>(SelfId(), changes, actualIndexInfo));
TablesManager.GetPrimaryIndex()->FetchDataAccessors(request);
const ui64 accessorsMemory = request->PredictAccessorsMemory(TablesManager.GetPrimaryIndex()->GetVersionedIndex().GetLastSchema());
const auto subscriber = std::make_shared<TCleanupPortionsDataAccessorsSubscriber>(SelfId(), changes, actualIndexInfo);

NOlap::NResourceBroker::NSubscribe::ITask::StartResourceSubscription(
ResourceSubscribeActor, std::make_shared<TAccessorsMemorySubscriber>(accessorsMemory, changes->GetTaskIdentifier(), TTLTaskSubscription,
std::move(request), subscriber, DataAccessorsManager.GetObjectPtrVerified()));
}

void TColumnShard::SetupCleanupTables() {
Expand Down
10 changes: 10 additions & 0 deletions ydb/core/tx/columnshard/data_accessor/request.h
Original file line number Diff line number Diff line change
Expand Up @@ -208,6 +208,16 @@ class TDataAccessorsRequest: public NColumnShard::TMonitoringObjectsCounter<TDat

TDataAccessorsRequest() = default;

ui64 PredictAccessorsMemory(const ISnapshotSchema::TPtr& schema) const {
ui64 result = 0;
for (auto&& i : PathIdStatus) {
for (auto&& [_, p] : i.second.GetPortions()) {
result += p->PredictAccessorsMemory(schema);
}
}
return result;
}

bool HasSubscriber() const {
return !!Subscriber;
}
Expand Down
4 changes: 4 additions & 0 deletions ydb/core/tx/columnshard/engines/portions/portion_info.h
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,10 @@ class TPortionInfo {
TPortionInfo(TPortionInfo&&) = default;
TPortionInfo& operator=(TPortionInfo&&) = default;

ui32 PredictAccessorsMemory(const ISnapshotSchema::TPtr& schema) const {
return (GetRecordsCount() / 10000 + 1) * sizeof(TColumnRecord) * schema->GetColumnsCount() + schema->GetIndexesCount() * sizeof(TIndexChunk);
}

ui32 PredictMetadataMemorySize(const ui32 columnsCount) const {
return (GetRecordsCount() / 10000 + 1) * sizeof(TColumnRecord) * columnsCount;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -339,4 +339,8 @@ TConclusion<TWritePortionInfoWithBlobsResult> ISnapshotSchema::PrepareForWrite(c
return TWritePortionInfoWithBlobsResult(std::move(constructor));
}

ui32 ISnapshotSchema::GetIndexesCount() const {
return GetIndexInfo().GetIndexes().size();
}

} // namespace NKikimr::NOlap
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@ class ISnapshotSchema {
virtual const TSnapshot& GetSnapshot() const = 0;
virtual ui64 GetVersion() const = 0;
virtual ui32 GetColumnsCount() const = 0;
ui32 GetIndexesCount() const;

std::set<ui32> GetPkColumnsIds() const;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,15 @@ void TTieringActualizer::DoAddPortion(const TPortionInfo& portion, const TAddExt
if (MaxByPortionId.contains(portion.GetPortionId())) {
AddPortionImpl(portion, addContext.GetNow());
} else {
NewPortionIds.emplace(portion.GetPortionId());
auto schema = portion.GetSchema(VersionedIndex);
if (*TieringColumnId == schema->GetIndexInfo().GetPKColumnIds().front()) {
NYDBTest::TControllers::GetColumnShardController()->OnMaxValueUsage();
auto max = NArrow::TStatusValidator::GetValid(portion.GetMeta().GetFirstLastPK().GetFirst().Column(0).GetScalar(0));
AFL_VERIFY(MaxByPortionId.emplace(portion.GetPortionId(), max).second);
AddPortionImpl(portion, addContext.GetNow());
} else {
NewPortionIds.emplace(portion.GetPortionId());
}
}
}

Expand All @@ -102,15 +110,12 @@ void TTieringActualizer::ActualizePortionInfo(const TPortionDataAccessor& access
auto& portion = accessor.GetPortionInfo();
if (Tiering) {
std::shared_ptr<ISnapshotSchema> portionSchema = portion.GetSchema(VersionedIndex);
auto indexMeta = portionSchema->GetIndexInfo().GetIndexMetaMax(*TieringColumnId);
std::shared_ptr<arrow::Scalar> max;
if (indexMeta) {
AFL_VERIFY(*TieringColumnId != portionSchema->GetIndexInfo().GetPKColumnIds().front());
if (auto indexMeta = portionSchema->GetIndexInfo().GetIndexMetaMax(*TieringColumnId)) {
NYDBTest::TControllers::GetColumnShardController()->OnStatisticsUsage(NIndexes::TIndexMetaContainer(indexMeta));
const std::vector<TString> data = accessor.GetIndexInplaceDataVerified(indexMeta->GetIndexId());
max = indexMeta->GetMaxScalarVerified(data, portionSchema->GetIndexInfo().GetColumnFieldVerified(*TieringColumnId)->type());
} else if (*TieringColumnId == portionSchema->GetIndexInfo().GetPKColumnIds().front()) {
NYDBTest::TControllers::GetColumnShardController()->OnMaxValueUsage();
max = NArrow::TStatusValidator::GetValid(portion.GetMeta().GetFirstLastPK().GetFirst().Column(0).GetScalar(0));
}
AFL_VERIFY(MaxByPortionId.emplace(portion.GetPortionId(), max).second);
}
Expand Down
16 changes: 8 additions & 8 deletions ydb/core/tx/columnshard/ut_schema/ut_columnshard_schema.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,7 +339,7 @@ void TestTtl(bool reboots, bool internal, TTestSchema::TTableSpecials spec = {},
UNIT_ASSERT(CheckSame(rb, PORTION_ROWS, spec.TtlColumn, ts[0]));
}

if (spec.NeedTestStatistics()) {
if (spec.NeedTestStatistics() && spec.TtlColumn != "timestamp") {
AFL_VERIFY(csControllerGuard->GetStatisticsUsageCount().Val());
AFL_VERIFY(!csControllerGuard->GetMaxValueUsageCount().Val());
} else {
Expand Down Expand Up @@ -706,13 +706,13 @@ std::vector<std::pair<ui32, ui64>> TestTiers(bool reboots, const std::vector<TSt
}
}

if (specs[0].NeedTestStatistics()) {
AFL_VERIFY(csControllerGuard->GetStatisticsUsageCount().Val());
AFL_VERIFY(!csControllerGuard->GetMaxValueUsageCount().Val());
} else {
AFL_VERIFY(!csControllerGuard->GetStatisticsUsageCount().Val());
AFL_VERIFY(csControllerGuard->GetMaxValueUsageCount().Val());
}
// if (specs[0].NeedTestStatistics()) {
// AFL_VERIFY(csControllerGuard->GetStatisticsUsageCount().Val());
// AFL_VERIFY(!csControllerGuard->GetMaxValueUsageCount().Val());
// } else {
// AFL_VERIFY(!csControllerGuard->GetStatisticsUsageCount().Val());
// AFL_VERIFY(csControllerGuard->GetMaxValueUsageCount().Val());
// }

return specRowsBytes;
}
Expand Down

0 comments on commit 1d511c3

Please sign in to comment.