From 7644ceabacc99c54b96976d9af24ff18ffa3a07a Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 17 Dec 2024 13:57:43 +0300 Subject: [PATCH 1/8] share schemas between CS on same node --- ydb/core/tx/columnshard/columnshard_impl.cpp | 17 +++++---- .../engines/changes/indexation.cpp | 4 +-- .../engines/column_engine_logs.cpp | 10 +++--- .../columnshard/engines/column_engine_logs.h | 8 +++-- .../columnshard/engines/scheme/index_info.cpp | 4 +-- .../engines/scheme/objects_cache.cpp | 12 +++++++ .../engines/scheme/objects_cache.h | 35 +++++++++++++++++-- .../scheme/versions/snapshot_scheme.cpp | 18 +++++----- .../engines/scheme/versions/snapshot_scheme.h | 8 ++--- .../scheme/versions/versioned_index.cpp | 12 +++---- .../engines/scheme/versions/versioned_index.h | 2 +- ydb/core/tx/columnshard/loading/stages.cpp | 3 +- .../columnshard/normalizer/portion/chunks.cpp | 3 +- .../normalizer/portion/leaked_blobs.cpp | 4 +-- .../normalizer/portion/normalizer.cpp | 3 +- .../portion/restore_portion_from_chunks.cpp | 3 +- .../operations/batch_builder/merger.cpp | 16 ++++----- ydb/core/tx/columnshard/tables_manager.cpp | 13 ++++--- ydb/core/tx/columnshard/tables_manager.h | 4 ++- 19 files changed, 117 insertions(+), 62 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 033705facc03..9983915c314f 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -84,7 +84,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod()) , StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval()) , InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters()) - , TablesManager(StoragesManager, std::make_shared(nullptr), info->TabletID) + , TablesManager(StoragesManager, std::make_shared(nullptr), + std::make_shared(), info->TabletID) , Subscribers(std::make_shared(*this)) , PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig())) , InsertTable(std::make_unique()) @@ -354,15 +355,13 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const NIceDb::TNiceDb db(txc.DB); - if (proto.HasOwnerPathId()) { - OwnerPathId = proto.GetOwnerPathId(); - Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); - } + AFL_VERIFY(proto.HasOwnerPathId()); + OwnerPathId = proto.GetOwnerPathId(); + Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); - if (proto.HasOwnerPath()) { - OwnerPath = proto.GetOwnerPath(); - Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath); - } + AFL_VERIFY(proto.HasOwnerPathId()); + OwnerPath = proto.GetOwnerPath(); + Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath); for (auto& createTable : proto.GetTables()) { RunEnsureTable(createTable, version, txc); diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index fe62107aa05f..3d3d5fb8c6c9 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -108,7 +108,7 @@ class TPathFieldsInfo { if (!Schemas.contains(data.GetSchemaVersion())) { Schemas.emplace(data.GetSchemaVersion(), blobSchema); } - auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false); + TColumnIdsView columnIds = blobSchema->GetIndexInfo().GetColumnIds(false); std::vector filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end()); if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) { filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG); @@ -247,7 +247,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont { const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange); - auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema(); + NArrow::TSchemaLiteView blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema(); auto batchSchema = std::make_shared(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end())); batch = std::make_shared(NArrow::DeserializeBatch(blobData, batchSchema)); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index d25abc4941fd..3e7ea824d26e 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -26,12 +26,13 @@ namespace NKikimr::NOlap { -TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, +TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema) : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) { @@ -39,12 +40,13 @@ TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, RegisterSchemaVersion(snapshot, schema); } -TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, +TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema) : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) { @@ -150,7 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd } const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization(); - auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo)); + auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->GetIndexInfoCache(std::move(indexInfo))); if (isCriticalScheme) { StartActualization({}); for (auto&& i : GranulesStorage->GetTables()) { @@ -215,7 +217,7 @@ void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, c } AFL_VERIFY(indexInfoOptional); - VersionedIndex.AddIndex(snapshot, std::move(*indexInfoOptional)); + VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->GetIndexInfoCache(std::move(*indexInfoOptional))); } std::shared_ptr TColumnEngineForLogs::BuildLoader(const std::shared_ptr& dsGroupSelector) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 70ec8a852ba7..d260b522bcaa 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -61,7 +61,7 @@ class TColumnEngineForLogs: public IColumnEngine { std::shared_ptr StoragesManager; std::shared_ptr ActualizationController; - std::shared_ptr SchemaObjectsCache = std::make_shared(); + std::shared_ptr SchemaObjectsCache; TVersionedIndex VersionedIndex; std::shared_ptr VersionedIndexCopy; @@ -98,9 +98,11 @@ class TColumnEngineForLogs: public IColumnEngine { ADD, }; - TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& dataAccessorsManager, + TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, + const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema); - TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& dataAccessorsManager, + TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, + const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema); void OnTieringModified(const std::optional& ttl, const ui64 pathId) override; diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 9974e027bfa3..29d40e0032ee 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -76,7 +76,7 @@ std::vector TIndexInfo::GetColumnNames(const std::vector& ids) co } std::vector TIndexInfo::GetColumnSTLNames(const bool withSpecial) const { - const auto ids = GetColumnIds(withSpecial); + const TColumnIdsView ids = GetColumnIds(withSpecial); std::vector out; out.reserve(ids.size()); for (ui32 id : ids) { @@ -457,7 +457,7 @@ std::shared_ptr TIndexInfo::GetIndexMetaC } std::vector TIndexInfo::GetEntityIds() const { - const auto columnIds = GetColumnIds(true); + const TColumnIdsView columnIds = GetColumnIds(true); std::vector result(columnIds.begin(), columnIds.end()); for (auto&& i : Indexes) { result.emplace_back(i.first); diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp index 9b3da8861191..63801ec5fec6 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp @@ -1,5 +1,17 @@ #include "objects_cache.h" +#include + namespace NKikimr::NOlap { +std::shared_ptr TSchemaObjectsCache::GetIndexInfoCache(TIndexInfo&& indexInfo) { + const ui64 schemaVersion = indexInfo.GetVersion(); + std::unique_lock lock(SchemasMutex); + auto* findSchema = SchemasByVersion.FindPtr(schemaVersion); + if (!findSchema || findSchema->expired()) { + SchemasByVersion[schemaVersion] = std::make_shared(std::move(indexInfo)); + } + return findSchema->lock(); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h index a203623b6025..f8c968a214ff 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h @@ -10,10 +10,18 @@ namespace NKikimr::NOlap { class TSchemaObjectsCache { private: THashMap> Fields; - THashMap> ColumnFeatures; - THashSet StringsCache; mutable ui64 AcceptionFieldsCount = 0; + mutable TMutex FieldsMutex; + + THashMap> ColumnFeatures; mutable ui64 AcceptionFeaturesCount = 0; + mutable TMutex FeaturesMutex; + + THashMap> SchemasByVersion; + mutable TMutex SchemasMutex; + + THashSet StringsCache; + mutable TMutex StringsMutex; public: const TString& GetStringCache(const TString& original) { @@ -26,13 +34,16 @@ class TSchemaObjectsCache { void RegisterField(const TString& fingerprint, const std::shared_ptr& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString()); + std::unique_lock lock(FieldsMutex); AFL_VERIFY(Fields.emplace(fingerprint, f).second); } void RegisterColumnFeatures(const TString& fingerprint, const std::shared_ptr& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_column_features")("fp", fingerprint)("info", f->DebugString()); + std::unique_lock lock(FeaturesMutex); AFL_VERIFY(ColumnFeatures.emplace(fingerprint, f).second); } std::shared_ptr GetField(const TString& fingerprint) const { + std::unique_lock lock(FieldsMutex); auto it = Fields.find(fingerprint); if (it == Fields.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_field_miss")("fp", fingerprint)("count", Fields.size())( @@ -47,6 +58,7 @@ class TSchemaObjectsCache { } template TConclusion> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) { + std::unique_lock lock(FeaturesMutex); auto it = ColumnFeatures.find(fingerprint); if (it == ColumnFeatures.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))( @@ -65,6 +77,25 @@ class TSchemaObjectsCache { } return it->second; } + + std::shared_ptr GetIndexInfoCache(TIndexInfo&& indexInfo); +}; + +class TSchemaCachesManager { +private: + THashMap> CacheByTableOwner; + TMutex Mutex; + +public: + std::shared_ptr GetCache(const ui64 ownerPathId) { + AFL_VERIFY(ownerPathId); + std::unique_lock lock(Mutex); + auto findCache = CacheByTableOwner.FindPtr(ownerPathId); + if (findCache) { + return *findCache; + } + return CacheByTableOwner.emplace(ownerPathId, std::make_shared()).first->second; + } }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp index 05277b7b8967..6904a117cbdf 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp @@ -2,31 +2,31 @@ namespace NKikimr::NOlap { -TSnapshotSchema::TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot) +TSnapshotSchema::TSnapshotSchema(const std::shared_ptr& indexInfo, const TSnapshot& snapshot) : IndexInfo(std::move(indexInfo)) - , Schema(IndexInfo.ArrowSchemaWithSpecials()) + , Schema(IndexInfo->ArrowSchemaWithSpecials()) , Snapshot(snapshot) { } TColumnSaver TSnapshotSchema::GetColumnSaver(const ui32 columnId) const { - return IndexInfo.GetColumnSaver(columnId); + return IndexInfo->GetColumnSaver(columnId); } std::shared_ptr TSnapshotSchema::GetColumnLoaderOptional(const ui32 columnId) const { - return IndexInfo.GetColumnLoaderOptional(columnId); + return IndexInfo->GetColumnLoaderOptional(columnId); } std::optional TSnapshotSchema::GetColumnIdOptional(const std::string& columnName) const { - return IndexInfo.GetColumnIdOptional(columnName); + return IndexInfo->GetColumnIdOptional(columnName); } ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const { - return IndexInfo.GetColumnIdVerified(columnName); + return IndexInfo->GetColumnIdVerified(columnName); } int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const { - return IndexInfo.GetColumnIndexOptional(columnId).value_or(-1); + return IndexInfo->GetColumnIndexOptional(columnId).value_or(-1); } const std::shared_ptr& TSnapshotSchema::GetSchema() const { @@ -34,7 +34,7 @@ const std::shared_ptr& TSnapshotSchema::GetSchema() const { } const TIndexInfo& TSnapshotSchema::GetIndexInfo() const { - return IndexInfo; + return *IndexInfo; } const TSnapshot& TSnapshotSchema::GetSnapshot() const { @@ -46,7 +46,7 @@ ui32 TSnapshotSchema::GetColumnsCount() const { } ui64 TSnapshotSchema::GetVersion() const { - return IndexInfo.GetVersion(); + return IndexInfo->GetVersion(); } } diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h index 5246d3926750..04e175ceb609 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h @@ -8,7 +8,7 @@ namespace NKikimr::NOlap { class TSnapshotSchema: public ISnapshotSchema { private: - TIndexInfo IndexInfo; + std::shared_ptr IndexInfo; std::shared_ptr Schema; TSnapshot Snapshot; protected: @@ -16,15 +16,15 @@ class TSnapshotSchema: public ISnapshotSchema { return TStringBuilder() << "(" "schema=" << Schema->ToString() << ";" << "snapshot=" << Snapshot.DebugString() << ";" << - "index_info=" << IndexInfo.DebugString() << ";" << + "index_info=" << IndexInfo->DebugString() << ";" << ")" ; } public: - TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot); + TSnapshotSchema(const std::shared_ptr& indexInfo, const TSnapshot& snapshot); virtual TColumnIdsView GetColumnIds() const override { - return IndexInfo.GetColumnIds(); + return IndexInfo->GetColumnIds(); } TColumnSaver GetColumnSaver(const ui32 columnId) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp index d9a858f349c3..43df84d588f2 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp @@ -6,16 +6,16 @@ namespace NKikimr::NOlap { -const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { +const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, const std::shared_ptr& indexInfo) { if (Snapshots.empty()) { - PrimaryKey = indexInfo.GetPrimaryKey(); + PrimaryKey = indexInfo->GetPrimaryKey(); } else { - Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo.GetPrimaryKey())); + Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo->GetPrimaryKey())); } - const bool needActualization = indexInfo.GetSchemeNeedActualization(); - auto newVersion = indexInfo.GetVersion(); - auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared(std::move(indexInfo), snapshot)); + const bool needActualization = indexInfo->GetSchemeNeedActualization(); + auto newVersion = indexInfo->GetVersion(); + auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared(indexInfo, snapshot)); if (!itVersion.second) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion); } else if (needActualization) { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h index e4e22071f1ed..fe49b1715eb6 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h @@ -123,7 +123,7 @@ class TVersionedIndex { return PrimaryKey; } - const TIndexInfo* AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo); + const TIndexInfo* AddIndex(const TSnapshot& snapshot, const std::shared_ptr& indexInfo); bool LoadShardingInfo(IDbWrapper& db); }; diff --git a/ydb/core/tx/columnshard/loading/stages.cpp b/ydb/core/tx/columnshard/loading/stages.cpp index ac4ab092d4db..622d9faa0fc7 100644 --- a/ydb/core/tx/columnshard/loading/stages.cpp +++ b/ydb/core/tx/columnshard/loading/stages.cpp @@ -194,7 +194,8 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { NIceDb::TNiceDb db(txc.DB); - TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), Self->TabletID()); + TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), + Singleton()->GetCache(Self->OwnerPathId), Self->TabletID()); { TMemoryProfileGuard g("TTxInit/TTablesManager"); if (!tablesManagerLocal.InitFromDB(db)) { diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp index 21a184f7fd99..e96a18d5e871 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp @@ -140,7 +140,8 @@ TConclusion> TChunksNormalizer::DoInit( return tasks; } - TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), 0); + TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), + std::make_shared(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp index 811cae0957ae..7f4745f7bbc4 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp @@ -181,8 +181,8 @@ TConclusion> TLeakedBlobsNormalizer::DoInit( return TConclusionStatus::Fail("Not ready"); } - NColumnShard::TTablesManager tablesManager( - controller.GetStoragesManager(), std::make_shared(nullptr), TabletId); + NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), + std::make_shared(), TabletId); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index bf4a6f5e89e1..f1ce6882b727 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -25,7 +25,8 @@ TConclusion> TPortionsNormalizerBase::DoInit( return TConclusionStatus::Fail("Not ready"); } - NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), 0); + NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), + std::make_shared(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp index a2c382e5dd68..b6681250975c 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp @@ -72,7 +72,8 @@ TConclusion> TNormalizer::DoInit( return TConclusionStatus::Fail("Not ready"); } - TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), 0); + TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), + std::make_shared(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp index 829f1cbe36a2..ceb87eb027e5 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp @@ -48,14 +48,14 @@ NKikimr::TConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSo AFL_VERIFY(Schema->GetIndexInfo().GetColumnIds(false).size() == exists.GetData().GetColumns().size()) ("index", Schema->GetIndexInfo().GetColumnIds(false).size())("exists", exists.GetData().GetColumns().size()); for (i32 columnIdx = 0; columnIdx < Schema->GetIndexInfo().ArrowSchema().num_fields(); ++columnIdx) { - const std::optional& incomingColumnIdx = IncomingColumnRemap[columnIdx]; - if (incomingColumnIdx && HasIncomingDataFlags[*incomingColumnIdx]->GetView(incoming.GetPosition())) { - const ui32 idxChunk = incoming.GetData().GetPositionInChunk(*incomingColumnIdx, incoming.GetPosition()); - rGuard.Add(*incoming.GetData().GetPositionAddress(*incomingColumnIdx).GetArray(), idxChunk); - } else { - const ui32 idxChunk = exists.GetData().GetPositionInChunk(columnIdx, exists.GetPosition()); - rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk); - } + const std::optional& incomingColumnIdx = IncomingColumnRemap[columnIdx]; + if (incomingColumnIdx && HasIncomingDataFlags[*incomingColumnIdx]->GetView(incoming.GetPosition())) { + const ui32 idxChunk = incoming.GetData().GetPositionInChunk(*incomingColumnIdx, incoming.GetPosition()); + rGuard.Add(*incoming.GetData().GetPositionAddress(*incomingColumnIdx).GetArray(), idxChunk); + } else { + const ui32 idxChunk = exists.GetData().GetPositionInChunk(columnIdx, exists.GetPosition()); + rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk); + } } return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 410c3ca0b108..e1669dfe2bde 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -178,8 +178,8 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { "version", info.GetSchema().GetVersion()); NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique(TabletId, DataAccessorsManager, StoragesManager, - version, schemaInitializationData); + PrimaryIndex = std::make_unique( + TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, version, schemaInitializationData); } else if (PrimaryIndex->GetVersionedIndex().IsEmpty() || info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) { PrimaryIndex->RegisterSchemaVersion(version, schemaInitializationData); @@ -290,8 +290,8 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique( - TabletId, DataAccessorsManager, StoragesManager, version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); + PrimaryIndex = std::make_unique(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, + version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); for (auto&& i : Tables) { PrimaryIndex->RegisterTable(i.first); } @@ -355,11 +355,14 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& } TTablesManager::TTablesManager(const std::shared_ptr& storagesManager, - const std::shared_ptr& dataAccessorsManager, const ui64 tabletId) + const std::shared_ptr& dataAccessorsManager, + const std::shared_ptr& schemaCache, const ui64 tabletId) : StoragesManager(storagesManager) , DataAccessorsManager(dataAccessorsManager) , LoadTimeCounters(std::make_unique()) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) { + AFL_VERIFY(SchemaObjectsCache); } bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const { diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index f44ca4c872ce..d9894dc06d94 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -145,13 +145,15 @@ class TTablesManager { std::shared_ptr StoragesManager; std::shared_ptr DataAccessorsManager; std::unique_ptr LoadTimeCounters; + std::shared_ptr SchemaObjectsCache; ui64 TabletId = 0; public: friend class TTxInit; TTablesManager(const std::shared_ptr& storagesManager, - const std::shared_ptr& dataAccessorsManager, const ui64 tabletId); + const std::shared_ptr& dataAccessorsManager, + const std::shared_ptr& schemaCache, const ui64 tabletId); const std::unique_ptr& GetLoadTimeCounters() const { return LoadTimeCounters; From 3c4da1e152b6f35b53878e95d9fe1653832f5c58 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 17 Dec 2024 14:43:43 +0300 Subject: [PATCH 2/8] fix build --- .../tx/columnshard/engines/ut/ut_logs_engine.cpp | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 6d23f3de94a4..bab1841c61f9 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); for (auto&& i : paths) { engine.RegisterTable(i); } @@ -609,7 +609,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -710,7 +710,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -736,7 +736,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's overloaded after reload TColumnEngineForLogs tmpEngine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.TestingLoad(db); } @@ -768,7 +768,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's not overloaded after reload TColumnEngineForLogs tmpEngine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.TestingLoad(db); } @@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); { TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -868,7 +868,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // load TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); From 05333805b5561c4426e92cc818a1edd3c0fc8d1f Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 17 Dec 2024 16:58:04 +0300 Subject: [PATCH 3/8] default cache --- ydb/core/tx/columnshard/engines/scheme/objects_cache.h | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h index f8c968a214ff..42d5368201df 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h @@ -88,7 +88,9 @@ class TSchemaCachesManager { public: std::shared_ptr GetCache(const ui64 ownerPathId) { - AFL_VERIFY(ownerPathId); + if (!ownerPathId) { + return std::make_shared(); + } std::unique_lock lock(Mutex); auto findCache = CacheByTableOwner.FindPtr(ownerPathId); if (findCache) { From fe1cc5a86eec4f72557381546f6a93d0aded0f2e Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 17 Dec 2024 17:52:59 +0300 Subject: [PATCH 4/8] fix --- ydb/core/tx/columnshard/columnshard_impl.cpp | 14 ++++++++------ .../columnshard/engines/scheme/objects_cache.cpp | 11 ++++++++--- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 9983915c314f..4e7a8636dd61 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -355,13 +355,15 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const NIceDb::TNiceDb db(txc.DB); - AFL_VERIFY(proto.HasOwnerPathId()); - OwnerPathId = proto.GetOwnerPathId(); - Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); + if (proto.HasOwnerPathId()) { + OwnerPathId = proto.GetOwnerPathId(); + Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); + } - AFL_VERIFY(proto.HasOwnerPathId()); - OwnerPath = proto.GetOwnerPath(); - Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath); + if (proto.HasOwnerPath()) { + OwnerPath = proto.GetOwnerPath(); + Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath); + } for (auto& createTable : proto.GetTables()) { RunEnsureTable(createTable, version, txc); diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp index 63801ec5fec6..a53f0a7e404c 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp @@ -8,10 +8,15 @@ std::shared_ptr TSchemaObjectsCache::GetIndexInfoCache(TIndexI const ui64 schemaVersion = indexInfo.GetVersion(); std::unique_lock lock(SchemasMutex); auto* findSchema = SchemasByVersion.FindPtr(schemaVersion); - if (!findSchema || findSchema->expired()) { - SchemasByVersion[schemaVersion] = std::make_shared(std::move(indexInfo)); + std::shared_ptr cachedSchema; + if (findSchema) { + cachedSchema = findSchema->lock(); } - return findSchema->lock(); + if (!cachedSchema) { + cachedSchema = std::make_shared(std::move(indexInfo)); + SchemasByVersion[schemaVersion] = cachedSchema; + } + return cachedSchema; } } // namespace NKikimr::NOlap From 5a3e9bea5ab2995e81bd341272d3d654e82e1ed2 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 17 Dec 2024 21:23:52 +0300 Subject: [PATCH 5/8] fix --- ydb/core/tx/columnshard/engines/scheme/objects_cache.h | 1 + 1 file changed, 1 insertion(+) diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h index 42d5368201df..6f5441c32f82 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h @@ -25,6 +25,7 @@ class TSchemaObjectsCache { public: const TString& GetStringCache(const TString& original) { + std::unique_lock lock(StringsCache); auto it = StringsCache.find(original); if (it == StringsCache.end()) { it = StringsCache.emplace(original).first; From 91d9b8ec8899124b6720bfd81569b238f5b1dcaf Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Tue, 17 Dec 2024 22:38:20 +0300 Subject: [PATCH 6/8] fix --- ydb/core/tx/columnshard/engines/scheme/objects_cache.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h index 6f5441c32f82..b5e5001346ff 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h @@ -25,7 +25,7 @@ class TSchemaObjectsCache { public: const TString& GetStringCache(const TString& original) { - std::unique_lock lock(StringsCache); + std::unique_lock lock(StringsMutex); auto it = StringsCache.find(original); if (it == StringsCache.end()) { it = StringsCache.emplace(original).first; From cac49bc46f87d15128ff44a8c0b19bb1e81d6e2f Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Wed, 18 Dec 2024 15:41:12 +0300 Subject: [PATCH 7/8] identify schema by presetId+schemaVersion --- .../transactions/tx_data_from_source.cpp | 2 +- .../tx/columnshard/engines/column_engine.h | 6 +-- .../engines/column_engine_logs.cpp | 20 ++++---- .../columnshard/engines/column_engine_logs.h | 11 +++-- .../engines/scheme/objects_cache.cpp | 10 ++-- .../engines/scheme/objects_cache.h | 47 +++++++++++++++---- .../columnshard/engines/ut/ut_logs_engine.cpp | 14 +++--- ydb/core/tx/columnshard/loading/stages.cpp | 2 +- ydb/core/tx/columnshard/tables_manager.cpp | 10 ++-- 9 files changed, 75 insertions(+), 47 deletions(-) diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp index d8f33c2906e1..28b13a7eebd0 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp @@ -15,7 +15,7 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, auto& index = Self->TablesManager.MutablePrimaryIndexAsVerified(); for (auto& info : SchemeHistory) { - index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetSchema()); + index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetProto().GetId(), info.GetSchema()); } TDbWrapper dbWrapper(txc.DB, nullptr); diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 13d5c954920b..b3beefc25aa9 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -354,9 +354,9 @@ class IColumnEngine { const std::shared_ptr& dataLocksManager, const ui64 memoryUsageLimit) noexcept = 0; virtual bool ApplyChangesOnTxCreate(std::shared_ptr changes, const TSnapshot& snapshot) noexcept = 0; virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr changes, const TSnapshot& snapshot) noexcept = 0; - virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0; - virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0; - virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0; + virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) = 0; + virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0; + virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0; virtual const TMap>& GetStats() const = 0; virtual const TColumnEngineStats& GetTotalStats() = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index 3e7ea824d26e..aa588965e964 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -28,7 +28,7 @@ namespace NKikimr::NOlap { TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, - const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema) + const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) @@ -37,12 +37,12 @@ TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::share , LastPortion(0) , LastGranule(0) { ActualizationController = std::make_shared(); - RegisterSchemaVersion(snapshot, schema); + RegisterSchemaVersion(snapshot, presetId, schema); } TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, - const std::shared_ptr& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema) + const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema) : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) @@ -51,7 +51,7 @@ TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::share , LastPortion(0) , LastGranule(0) { ActualizationController = std::make_shared(); - RegisterSchemaVersion(snapshot, std::move(schema)); + RegisterSchemaVersion(snapshot, presetId, std::move(schema)); } const TMap>& TColumnEngineForLogs::GetStats() const { @@ -140,7 +140,7 @@ void TColumnEngineForLogs::UpdatePortionStats( } } -void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { +void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& indexInfo) { AFL_VERIFY(DataAccessorsManager); bool switchOptimizer = false; bool switchAccessorsManager = false; @@ -152,7 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd } const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization(); - auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->GetIndexInfoCache(std::move(indexInfo))); + auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(indexInfo))); if (isCriticalScheme) { StartActualization({}); for (auto&& i : GranulesStorage->GetTables()) { @@ -172,7 +172,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd } } -void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) { +void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) { AFL_VERIFY(VersionedIndex.IsEmpty() || schema.GetVersion() >= VersionedIndex.GetLastSchema()->GetVersion())("empty", VersionedIndex.IsEmpty())("current", schema.GetVersion())( "last", VersionedIndex.GetLastSchema()->GetVersion()); @@ -186,10 +186,10 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema.GetSchemaVerified(), StoragesManager, SchemaObjectsCache); } AFL_VERIFY(indexInfoOptional); - RegisterSchemaVersion(snapshot, std::move(*indexInfoOptional)); + RegisterSchemaVersion(snapshot, presetId, std::move(*indexInfoOptional)); } -void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) { +void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) { AFL_VERIFY(!VersionedIndex.IsEmpty()); ui64 version = schema.GetVersion(); @@ -217,7 +217,7 @@ void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, c } AFL_VERIFY(indexInfoOptional); - VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->GetIndexInfoCache(std::move(*indexInfoOptional))); + VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(*indexInfoOptional))); } std::shared_ptr TColumnEngineForLogs::BuildLoader(const std::shared_ptr& dsGroupSelector) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index d260b522bcaa..b62ce870dc30 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -100,10 +100,11 @@ class TColumnEngineForLogs: public IColumnEngine { TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, - const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema); + const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, + const TSchemaInitializationData& schema); TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, - const std::shared_ptr& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema); + const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema); void OnTieringModified(const std::optional& ttl, const ui64 pathId) override; void OnTieringModified(const THashMap& ttl) override; @@ -159,9 +160,9 @@ class TColumnEngineForLogs: public IColumnEngine { virtual bool ApplyChangesOnExecute( IDbWrapper& db, std::shared_ptr indexChanges, const TSnapshot& snapshot) noexcept override; - void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override; - void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override; - void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override; + void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) override; + void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override; + void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override; std::shared_ptr Select( ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp index a53f0a7e404c..a044fa284bd6 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp @@ -4,17 +4,17 @@ namespace NKikimr::NOlap { -std::shared_ptr TSchemaObjectsCache::GetIndexInfoCache(TIndexInfo&& indexInfo) { - const ui64 schemaVersion = indexInfo.GetVersion(); - std::unique_lock lock(SchemasMutex); - auto* findSchema = SchemasByVersion.FindPtr(schemaVersion); +std::shared_ptr TSchemaObjectsCache::UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo) { + const TSchemaVersionId versionId(presetId, indexInfo.GetVersion()); + TGuard lock(SchemasMutex); + auto* findSchema = SchemasByVersion.FindPtr(versionId); std::shared_ptr cachedSchema; if (findSchema) { cachedSchema = findSchema->lock(); } if (!cachedSchema) { cachedSchema = std::make_shared(std::move(indexInfo)); - SchemasByVersion[schemaVersion] = cachedSchema; + SchemasByVersion[versionId] = cachedSchema; } return cachedSchema; } diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h index b5e5001346ff..06ae8d249fce 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h @@ -8,6 +8,29 @@ namespace NKikimr::NOlap { class TSchemaObjectsCache { +public: + class TSchemaVersionId { + private: + YDB_READONLY_DEF(ui64, PresetId); + YDB_READONLY_DEF(ui64, Version); + + public: + struct THash { + ui64 operator()(const TSchemaVersionId& object) const { + return CombineHashes(object.PresetId, object.Version); + } + }; + + bool operator==(const TSchemaVersionId& other) const { + return std::tie(PresetId, Version) == std::tie(other.PresetId, other.Version); + } + + TSchemaVersionId(const ui64 presetId, const ui64 version) + : PresetId(presetId) + , Version(version) { + } + }; + private: THashMap> Fields; mutable ui64 AcceptionFieldsCount = 0; @@ -17,7 +40,7 @@ class TSchemaObjectsCache { mutable ui64 AcceptionFeaturesCount = 0; mutable TMutex FeaturesMutex; - THashMap> SchemasByVersion; + THashMap, TSchemaVersionId::THash> SchemasByVersion; mutable TMutex SchemasMutex; THashSet StringsCache; @@ -25,7 +48,7 @@ class TSchemaObjectsCache { public: const TString& GetStringCache(const TString& original) { - std::unique_lock lock(StringsMutex); + TGuard lock(StringsMutex); auto it = StringsCache.find(original); if (it == StringsCache.end()) { it = StringsCache.emplace(original).first; @@ -35,16 +58,16 @@ class TSchemaObjectsCache { void RegisterField(const TString& fingerprint, const std::shared_ptr& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString()); - std::unique_lock lock(FieldsMutex); + TGuard lock(FieldsMutex); AFL_VERIFY(Fields.emplace(fingerprint, f).second); } void RegisterColumnFeatures(const TString& fingerprint, const std::shared_ptr& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_column_features")("fp", fingerprint)("info", f->DebugString()); - std::unique_lock lock(FeaturesMutex); + TGuard lock(FeaturesMutex); AFL_VERIFY(ColumnFeatures.emplace(fingerprint, f).second); } std::shared_ptr GetField(const TString& fingerprint) const { - std::unique_lock lock(FieldsMutex); + TGuard lock(FieldsMutex); auto it = Fields.find(fingerprint); if (it == Fields.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_field_miss")("fp", fingerprint)("count", Fields.size())( @@ -59,7 +82,7 @@ class TSchemaObjectsCache { } template TConclusion> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) { - std::unique_lock lock(FeaturesMutex); + TGuard lock(FeaturesMutex); auto it = ColumnFeatures.find(fingerprint); if (it == ColumnFeatures.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))( @@ -79,7 +102,7 @@ class TSchemaObjectsCache { return it->second; } - std::shared_ptr GetIndexInfoCache(TIndexInfo&& indexInfo); + std::shared_ptr UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo); }; class TSchemaCachesManager { @@ -87,18 +110,22 @@ class TSchemaCachesManager { THashMap> CacheByTableOwner; TMutex Mutex; -public: - std::shared_ptr GetCache(const ui64 ownerPathId) { + std::shared_ptr GetCacheImpl(const ui64 ownerPathId) { if (!ownerPathId) { return std::make_shared(); } - std::unique_lock lock(Mutex); + TGuard lock(Mutex); auto findCache = CacheByTableOwner.FindPtr(ownerPathId); if (findCache) { return *findCache; } return CacheByTableOwner.emplace(ownerPathId, std::make_shared()).first->second; } + +public: + static std::shared_ptr GetCache(const ui64 ownerPathId) { + return Singleton()->GetCacheImpl(ownerPathId); + } }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index bab1841c61f9..9635c24b3325 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); for (auto&& i : paths) { engine.RegisterTable(i); } @@ -609,7 +609,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -710,7 +710,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -736,7 +736,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's overloaded after reload TColumnEngineForLogs tmpEngine( - 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), 0, TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.TestingLoad(db); } @@ -768,7 +768,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's not overloaded after reload TColumnEngineForLogs tmpEngine( - 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), 0, TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.TestingLoad(db); } @@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); { TColumnEngineForLogs engine( - 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -868,7 +868,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // load TColumnEngineForLogs engine( - 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); diff --git a/ydb/core/tx/columnshard/loading/stages.cpp b/ydb/core/tx/columnshard/loading/stages.cpp index 622d9faa0fc7..a0db85a87aaf 100644 --- a/ydb/core/tx/columnshard/loading/stages.cpp +++ b/ydb/core/tx/columnshard/loading/stages.cpp @@ -195,7 +195,7 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { NIceDb::TNiceDb db(txc.DB); TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), - Singleton()->GetCache(Self->OwnerPathId), Self->TabletID()); + NOlap::TSchemaCachesManager::GetCache(Self->OwnerPathId), Self->TabletID()); { TMemoryProfileGuard g("TTxInit/TTablesManager"); if (!tablesManagerLocal.InitFromDB(db)) { diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index e1669dfe2bde..f50eeedd1ffe 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -179,12 +179,12 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info); if (!PrimaryIndex) { PrimaryIndex = std::make_unique( - TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, version, schemaInitializationData); + TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, version, preset->Id, schemaInitializationData); } else if (PrimaryIndex->GetVersionedIndex().IsEmpty() || info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) { - PrimaryIndex->RegisterSchemaVersion(version, schemaInitializationData); + PrimaryIndex->RegisterSchemaVersion(version, preset->Id, schemaInitializationData); } else { - PrimaryIndex->RegisterOldSchemaVersion(version, schemaInitializationData); + PrimaryIndex->RegisterOldSchemaVersion(version, preset->Id, schemaInitializationData); } if (!rowset.Next()) { @@ -291,7 +291,7 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo); if (!PrimaryIndex) { PrimaryIndex = std::make_unique(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, - version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); + version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); for (auto&& i : Tables) { PrimaryIndex->RegisterTable(i.first); } @@ -299,7 +299,7 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho PrimaryIndex->OnTieringModified(Ttl); } } else { - PrimaryIndex->RegisterSchemaVersion(version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); + PrimaryIndex->RegisterSchemaVersion(version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); } } From 0f8246dfd7cfc85e8801d8e04e89ae922916bbd9 Mon Sep 17 00:00:00 2001 From: Semyon Yentsov Date: Thu, 19 Dec 2024 15:03:15 +0300 Subject: [PATCH 8/8] clean up unused cache entries --- .../scheme/abstract/schema_version.cpp | 3 + .../engines/scheme/abstract/schema_version.h | 38 ++++++++++ .../engines/scheme/abstract/ya.make | 1 + .../engines/scheme/common/cache.cpp | 3 + .../columnshard/engines/scheme/common/cache.h | 70 +++++++++++++++++++ .../columnshard/engines/scheme/common/ya.make | 13 ++++ .../engines/scheme/objects_cache.cpp | 14 +--- .../engines/scheme/objects_cache.h | 32 ++------- .../scheme/versions/snapshot_scheme.cpp | 6 +- .../engines/scheme/versions/snapshot_scheme.h | 6 +- .../scheme/versions/versioned_index.cpp | 4 +- .../engines/scheme/versions/versioned_index.h | 5 +- .../engines/scheme/versions/ya.make | 1 + .../tx/columnshard/engines/scheme/ya.make | 1 + 14 files changed, 151 insertions(+), 46 deletions(-) create mode 100644 ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp create mode 100644 ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h create mode 100644 ydb/core/tx/columnshard/engines/scheme/common/cache.cpp create mode 100644 ydb/core/tx/columnshard/engines/scheme/common/cache.h create mode 100644 ydb/core/tx/columnshard/engines/scheme/common/ya.make diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp new file mode 100644 index 000000000000..8f630cd397d1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp @@ -0,0 +1,3 @@ +#include "schema_version.h" + +namespace NKikimr::NOlap {} diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h new file mode 100644 index 000000000000..0f759d9c47c7 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h @@ -0,0 +1,38 @@ +#pragma once + +#include + +#include + +namespace NKikimr::NOlap { + +class TSchemaVersionId { +private: + YDB_READONLY_DEF(ui64, PresetId); + YDB_READONLY_DEF(ui64, Version); + +public: + struct THash { + ui64 operator()(const TSchemaVersionId& object) const { + return CombineHashes(object.PresetId, object.Version); + } + }; + + bool operator==(const TSchemaVersionId& other) const { + return std::tie(PresetId, Version) == std::tie(other.PresetId, other.Version); + } + + TSchemaVersionId(const ui64 presetId, const ui64 version) + : PresetId(presetId) + , Version(version) { + } +}; + +} + +template <> +struct THash { + inline size_t operator()(const NKikimr::NOlap::TSchemaVersionId& key) const { + return CombineHashes(key.GetPresetId(), key.GetVersion()); + } +}; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make index 709793c4e38d..bf3aac5302b7 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( index_info.cpp column_ids.cpp + schema_version.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp b/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp new file mode 100644 index 000000000000..9be4dd958459 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp @@ -0,0 +1,3 @@ +#include "cache.h" + +namespace NKikimr::NOlap {} diff --git a/ydb/core/tx/columnshard/engines/scheme/common/cache.h b/ydb/core/tx/columnshard/engines/scheme/common/cache.h new file mode 100644 index 000000000000..3c277894641f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/common/cache.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include +#include + +#include + +namespace NKikimr::NOlap { + +template +class TObjectCache { +private: + THashMap> Objects; + mutable TMutex Mutex; + +public: + class TEntryGuard { + private: + TKey Key; + std::shared_ptr Object; + TObjectCache& Cache; + + public: + TEntryGuard(TKey key, const std::shared_ptr object, TObjectCache& cache) + : Key(key) + , Object(object) + , Cache(cache) { + } + + const TObject* operator->() const { + return Object.get(); + } + const TObject& operator*() const { + return *Object; + } + + ~TEntryGuard() { + Object.reset(); + Cache.TryFree(Key); + } + }; + +public: + TEntryGuard Upsert(TKey key, TObject&& object) { + TGuard lock(Mutex); + auto* findSchema = Objects.FindPtr(key); + std::shared_ptr cachedObject; + if (findSchema) { + cachedObject = findSchema->lock(); + } + if (!cachedObject) { + cachedObject = std::make_shared(std::move(object)); + Objects[key] = cachedObject; + } + return TEntryGuard(std::move(key), cachedObject, *this); + } + + void TryFree(const TKey& key) { + TGuard lock(Mutex); + auto findObject = Objects.FindPtr(key); + if (findObject) { + if (findObject->expired()) { + Objects.erase(key); + } + } + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/common/ya.make b/ydb/core/tx/columnshard/engines/scheme/common/ya.make new file mode 100644 index 000000000000..6d84704af41d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/common/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + cache.cpp +) + +PEERDIR( + ydb/library/actors/core +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp index a044fa284bd6..8b8d2f44b022 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp @@ -4,19 +4,9 @@ namespace NKikimr::NOlap { -std::shared_ptr TSchemaObjectsCache::UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo) { +TSchemaObjectsCache::TSchemasCache::TEntryGuard TSchemaObjectsCache::UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo) { const TSchemaVersionId versionId(presetId, indexInfo.GetVersion()); - TGuard lock(SchemasMutex); - auto* findSchema = SchemasByVersion.FindPtr(versionId); - std::shared_ptr cachedSchema; - if (findSchema) { - cachedSchema = findSchema->lock(); - } - if (!cachedSchema) { - cachedSchema = std::make_shared(std::move(indexInfo)); - SchemasByVersion[versionId] = cachedSchema; - } - return cachedSchema; + return SchemasByVersion.Upsert(versionId, std::move(indexInfo)); } } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h index 06ae8d249fce..fabd90894383 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h @@ -1,6 +1,9 @@ #pragma once #include "column_features.h" +#include +#include + #include #include #include @@ -8,29 +11,6 @@ namespace NKikimr::NOlap { class TSchemaObjectsCache { -public: - class TSchemaVersionId { - private: - YDB_READONLY_DEF(ui64, PresetId); - YDB_READONLY_DEF(ui64, Version); - - public: - struct THash { - ui64 operator()(const TSchemaVersionId& object) const { - return CombineHashes(object.PresetId, object.Version); - } - }; - - bool operator==(const TSchemaVersionId& other) const { - return std::tie(PresetId, Version) == std::tie(other.PresetId, other.Version); - } - - TSchemaVersionId(const ui64 presetId, const ui64 version) - : PresetId(presetId) - , Version(version) { - } - }; - private: THashMap> Fields; mutable ui64 AcceptionFieldsCount = 0; @@ -40,8 +20,8 @@ class TSchemaObjectsCache { mutable ui64 AcceptionFeaturesCount = 0; mutable TMutex FeaturesMutex; - THashMap, TSchemaVersionId::THash> SchemasByVersion; - mutable TMutex SchemasMutex; + using TSchemasCache = TObjectCache; + TSchemasCache SchemasByVersion; THashSet StringsCache; mutable TMutex StringsMutex; @@ -102,7 +82,7 @@ class TSchemaObjectsCache { return it->second; } - std::shared_ptr UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo); + TSchemasCache::TEntryGuard UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo); }; class TSchemaCachesManager { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp index 6904a117cbdf..451145bee0f4 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp @@ -1,12 +1,12 @@ #include "snapshot_scheme.h" +#include namespace NKikimr::NOlap { -TSnapshotSchema::TSnapshotSchema(const std::shared_ptr& indexInfo, const TSnapshot& snapshot) +TSnapshotSchema::TSnapshotSchema(TObjectCache::TEntryGuard&& indexInfo, const TSnapshot& snapshot) : IndexInfo(std::move(indexInfo)) , Schema(IndexInfo->ArrowSchemaWithSpecials()) - , Snapshot(snapshot) -{ + , Snapshot(snapshot) { } TColumnSaver TSnapshotSchema::GetColumnSaver(const ui32 columnId) const { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h index 04e175ceb609..0cf6aa147d66 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h @@ -4,11 +4,13 @@ #include +#include + namespace NKikimr::NOlap { class TSnapshotSchema: public ISnapshotSchema { private: - std::shared_ptr IndexInfo; + TObjectCache::TEntryGuard IndexInfo; std::shared_ptr Schema; TSnapshot Snapshot; protected: @@ -21,7 +23,7 @@ class TSnapshotSchema: public ISnapshotSchema { ; } public: - TSnapshotSchema(const std::shared_ptr& indexInfo, const TSnapshot& snapshot); + TSnapshotSchema(TObjectCache::TEntryGuard&& indexInfo, const TSnapshot& snapshot); virtual TColumnIdsView GetColumnIds() const override { return IndexInfo->GetColumnIds(); diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp index 43df84d588f2..1642449df51f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp @@ -6,7 +6,7 @@ namespace NKikimr::NOlap { -const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, const std::shared_ptr& indexInfo) { +const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TObjectCache::TEntryGuard&& indexInfo) { if (Snapshots.empty()) { PrimaryKey = indexInfo->GetPrimaryKey(); } else { @@ -15,7 +15,7 @@ const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, const std const bool needActualization = indexInfo->GetSchemeNeedActualization(); auto newVersion = indexInfo->GetVersion(); - auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared(indexInfo, snapshot)); + auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared(std::move(indexInfo), snapshot)); if (!itVersion.second) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion); } else if (needActualization) { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h index fe49b1715eb6..81a57cd65eab 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h @@ -1,5 +1,8 @@ #pragma once #include "abstract_scheme.h" + +#include +#include #include namespace NKikimr::NOlap { @@ -123,7 +126,7 @@ class TVersionedIndex { return PrimaryKey; } - const TIndexInfo* AddIndex(const TSnapshot& snapshot, const std::shared_ptr& indexInfo); + const TIndexInfo* AddIndex(const TSnapshot& snapshot, TObjectCache::TEntryGuard&& indexInfo); bool LoadShardingInfo(IDbWrapper& db); }; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/ya.make b/ydb/core/tx/columnshard/engines/scheme/versions/ya.make index 63dc44a74899..5b9cc7eff7c5 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/versions/ya.make @@ -9,6 +9,7 @@ SRCS( PEERDIR( ydb/core/tx/columnshard/engines/scheme/abstract + ydb/core/tx/columnshard/engines/scheme/common ) END() diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make index a8b2572ac574..295da3556bb9 100644 --- a/ydb/core/tx/columnshard/engines/scheme/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/ya.make @@ -22,6 +22,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/scheme/versions ydb/core/tx/columnshard/engines/scheme/tiering ydb/core/tx/columnshard/engines/scheme/column + ydb/core/tx/columnshard/engines/scheme/common ydb/core/tx/columnshard/engines/scheme/defaults ydb/core/formats/arrow/accessor ydb/core/tx/columnshard/blobs_action/abstract