diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index 033705facc03..9983915c314f 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -84,7 +84,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod()) , StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval()) , InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters()) - , TablesManager(StoragesManager, std::make_shared(nullptr), info->TabletID) + , TablesManager(StoragesManager, std::make_shared(nullptr), + std::make_shared(), info->TabletID) , Subscribers(std::make_shared(*this)) , PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig())) , InsertTable(std::make_unique()) @@ -354,15 +355,13 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const NIceDb::TNiceDb db(txc.DB); - if (proto.HasOwnerPathId()) { - OwnerPathId = proto.GetOwnerPathId(); - Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); - } + AFL_VERIFY(proto.HasOwnerPathId()); + OwnerPathId = proto.GetOwnerPathId(); + Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId); - if (proto.HasOwnerPath()) { - OwnerPath = proto.GetOwnerPath(); - Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath); - } + AFL_VERIFY(proto.HasOwnerPathId()); + OwnerPath = proto.GetOwnerPath(); + Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath); for (auto& createTable : proto.GetTables()) { RunEnsureTable(createTable, version, txc); diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index d25abc4941fd..1c1c61a6b0ff 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -26,12 +26,13 @@ namespace NKikimr::NOlap { -TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, +TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema) : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) { @@ -39,12 +40,13 @@ TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, RegisterSchemaVersion(snapshot, schema); } -TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, +TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema) : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) { @@ -139,18 +141,19 @@ void TColumnEngineForLogs::UpdatePortionStats( } void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { + std::shared_ptr cachedIndexInfo = SchemaObjectsCache->GetIndexInfoCache(std::move(indexInfo)); AFL_VERIFY(DataAccessorsManager); bool switchOptimizer = false; bool switchAccessorsManager = false; if (!VersionedIndex.IsEmpty()) { const NOlap::TIndexInfo& lastIndexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo(); - Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(indexInfo)); - switchOptimizer = !indexInfo.GetCompactionPlannerConstructor()->IsEqualTo(lastIndexInfo.GetCompactionPlannerConstructor()); - switchAccessorsManager = !indexInfo.GetMetadataManagerConstructor()->IsEqualTo(*lastIndexInfo.GetMetadataManagerConstructor()); + Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(*cachedIndexInfo)); + switchOptimizer = !cachedIndexInfo->GetCompactionPlannerConstructor()->IsEqualTo(lastIndexInfo.GetCompactionPlannerConstructor()); + switchAccessorsManager = !cachedIndexInfo->GetMetadataManagerConstructor()->IsEqualTo(*lastIndexInfo.GetMetadataManagerConstructor()); } - const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization(); - auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo)); + const bool isCriticalScheme = cachedIndexInfo->GetSchemeNeedActualization(); + auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(cachedIndexInfo)); if (isCriticalScheme) { StartActualization({}); for (auto&& i : GranulesStorage->GetTables()) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 70ec8a852ba7..d260b522bcaa 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -61,7 +61,7 @@ class TColumnEngineForLogs: public IColumnEngine { std::shared_ptr StoragesManager; std::shared_ptr ActualizationController; - std::shared_ptr SchemaObjectsCache = std::make_shared(); + std::shared_ptr SchemaObjectsCache; TVersionedIndex VersionedIndex; std::shared_ptr VersionedIndexCopy; @@ -98,9 +98,11 @@ class TColumnEngineForLogs: public IColumnEngine { ADD, }; - TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& dataAccessorsManager, + TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, + const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema); - TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& dataAccessorsManager, + TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, + const std::shared_ptr& dataAccessorsManager, const std::shared_ptr& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema); void OnTieringModified(const std::optional& ttl, const ui64 pathId) override; diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp index 9b3da8861191..63801ec5fec6 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp @@ -1,5 +1,17 @@ #include "objects_cache.h" +#include + namespace NKikimr::NOlap { +std::shared_ptr TSchemaObjectsCache::GetIndexInfoCache(TIndexInfo&& indexInfo) { + const ui64 schemaVersion = indexInfo.GetVersion(); + std::unique_lock lock(SchemasMutex); + auto* findSchema = SchemasByVersion.FindPtr(schemaVersion); + if (!findSchema || findSchema->expired()) { + SchemasByVersion[schemaVersion] = std::make_shared(std::move(indexInfo)); + } + return findSchema->lock(); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h index a203623b6025..f8c968a214ff 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h @@ -10,10 +10,18 @@ namespace NKikimr::NOlap { class TSchemaObjectsCache { private: THashMap> Fields; - THashMap> ColumnFeatures; - THashSet StringsCache; mutable ui64 AcceptionFieldsCount = 0; + mutable TMutex FieldsMutex; + + THashMap> ColumnFeatures; mutable ui64 AcceptionFeaturesCount = 0; + mutable TMutex FeaturesMutex; + + THashMap> SchemasByVersion; + mutable TMutex SchemasMutex; + + THashSet StringsCache; + mutable TMutex StringsMutex; public: const TString& GetStringCache(const TString& original) { @@ -26,13 +34,16 @@ class TSchemaObjectsCache { void RegisterField(const TString& fingerprint, const std::shared_ptr& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString()); + std::unique_lock lock(FieldsMutex); AFL_VERIFY(Fields.emplace(fingerprint, f).second); } void RegisterColumnFeatures(const TString& fingerprint, const std::shared_ptr& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_column_features")("fp", fingerprint)("info", f->DebugString()); + std::unique_lock lock(FeaturesMutex); AFL_VERIFY(ColumnFeatures.emplace(fingerprint, f).second); } std::shared_ptr GetField(const TString& fingerprint) const { + std::unique_lock lock(FieldsMutex); auto it = Fields.find(fingerprint); if (it == Fields.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_field_miss")("fp", fingerprint)("count", Fields.size())( @@ -47,6 +58,7 @@ class TSchemaObjectsCache { } template TConclusion> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) { + std::unique_lock lock(FeaturesMutex); auto it = ColumnFeatures.find(fingerprint); if (it == ColumnFeatures.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))( @@ -65,6 +77,25 @@ class TSchemaObjectsCache { } return it->second; } + + std::shared_ptr GetIndexInfoCache(TIndexInfo&& indexInfo); +}; + +class TSchemaCachesManager { +private: + THashMap> CacheByTableOwner; + TMutex Mutex; + +public: + std::shared_ptr GetCache(const ui64 ownerPathId) { + AFL_VERIFY(ownerPathId); + std::unique_lock lock(Mutex); + auto findCache = CacheByTableOwner.FindPtr(ownerPathId); + if (findCache) { + return *findCache; + } + return CacheByTableOwner.emplace(ownerPathId, std::make_shared()).first->second; + } }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp index 05277b7b8967..6904a117cbdf 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp @@ -2,31 +2,31 @@ namespace NKikimr::NOlap { -TSnapshotSchema::TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot) +TSnapshotSchema::TSnapshotSchema(const std::shared_ptr& indexInfo, const TSnapshot& snapshot) : IndexInfo(std::move(indexInfo)) - , Schema(IndexInfo.ArrowSchemaWithSpecials()) + , Schema(IndexInfo->ArrowSchemaWithSpecials()) , Snapshot(snapshot) { } TColumnSaver TSnapshotSchema::GetColumnSaver(const ui32 columnId) const { - return IndexInfo.GetColumnSaver(columnId); + return IndexInfo->GetColumnSaver(columnId); } std::shared_ptr TSnapshotSchema::GetColumnLoaderOptional(const ui32 columnId) const { - return IndexInfo.GetColumnLoaderOptional(columnId); + return IndexInfo->GetColumnLoaderOptional(columnId); } std::optional TSnapshotSchema::GetColumnIdOptional(const std::string& columnName) const { - return IndexInfo.GetColumnIdOptional(columnName); + return IndexInfo->GetColumnIdOptional(columnName); } ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const { - return IndexInfo.GetColumnIdVerified(columnName); + return IndexInfo->GetColumnIdVerified(columnName); } int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const { - return IndexInfo.GetColumnIndexOptional(columnId).value_or(-1); + return IndexInfo->GetColumnIndexOptional(columnId).value_or(-1); } const std::shared_ptr& TSnapshotSchema::GetSchema() const { @@ -34,7 +34,7 @@ const std::shared_ptr& TSnapshotSchema::GetSchema() const { } const TIndexInfo& TSnapshotSchema::GetIndexInfo() const { - return IndexInfo; + return *IndexInfo; } const TSnapshot& TSnapshotSchema::GetSnapshot() const { @@ -46,7 +46,7 @@ ui32 TSnapshotSchema::GetColumnsCount() const { } ui64 TSnapshotSchema::GetVersion() const { - return IndexInfo.GetVersion(); + return IndexInfo->GetVersion(); } } diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h index 5246d3926750..04e175ceb609 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h @@ -8,7 +8,7 @@ namespace NKikimr::NOlap { class TSnapshotSchema: public ISnapshotSchema { private: - TIndexInfo IndexInfo; + std::shared_ptr IndexInfo; std::shared_ptr Schema; TSnapshot Snapshot; protected: @@ -16,15 +16,15 @@ class TSnapshotSchema: public ISnapshotSchema { return TStringBuilder() << "(" "schema=" << Schema->ToString() << ";" << "snapshot=" << Snapshot.DebugString() << ";" << - "index_info=" << IndexInfo.DebugString() << ";" << + "index_info=" << IndexInfo->DebugString() << ";" << ")" ; } public: - TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot); + TSnapshotSchema(const std::shared_ptr& indexInfo, const TSnapshot& snapshot); virtual TColumnIdsView GetColumnIds() const override { - return IndexInfo.GetColumnIds(); + return IndexInfo->GetColumnIds(); } TColumnSaver GetColumnSaver(const ui32 columnId) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp index d9a858f349c3..43df84d588f2 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp @@ -6,16 +6,16 @@ namespace NKikimr::NOlap { -const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { +const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, const std::shared_ptr& indexInfo) { if (Snapshots.empty()) { - PrimaryKey = indexInfo.GetPrimaryKey(); + PrimaryKey = indexInfo->GetPrimaryKey(); } else { - Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo.GetPrimaryKey())); + Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo->GetPrimaryKey())); } - const bool needActualization = indexInfo.GetSchemeNeedActualization(); - auto newVersion = indexInfo.GetVersion(); - auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared(std::move(indexInfo), snapshot)); + const bool needActualization = indexInfo->GetSchemeNeedActualization(); + auto newVersion = indexInfo->GetVersion(); + auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared(indexInfo, snapshot)); if (!itVersion.second) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion); } else if (needActualization) { diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h index e4e22071f1ed..fe49b1715eb6 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h @@ -123,7 +123,7 @@ class TVersionedIndex { return PrimaryKey; } - const TIndexInfo* AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo); + const TIndexInfo* AddIndex(const TSnapshot& snapshot, const std::shared_ptr& indexInfo); bool LoadShardingInfo(IDbWrapper& db); }; diff --git a/ydb/core/tx/columnshard/loading/stages.cpp b/ydb/core/tx/columnshard/loading/stages.cpp index ac4ab092d4db..622d9faa0fc7 100644 --- a/ydb/core/tx/columnshard/loading/stages.cpp +++ b/ydb/core/tx/columnshard/loading/stages.cpp @@ -194,7 +194,8 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { NIceDb::TNiceDb db(txc.DB); - TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), Self->TabletID()); + TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), + Singleton()->GetCache(Self->OwnerPathId), Self->TabletID()); { TMemoryProfileGuard g("TTxInit/TTablesManager"); if (!tablesManagerLocal.InitFromDB(db)) { diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp index a2c382e5dd68..b6681250975c 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp @@ -72,7 +72,8 @@ TConclusion> TNormalizer::DoInit( return TConclusionStatus::Fail("Not ready"); } - TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), 0); + TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), + std::make_shared(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index 410c3ca0b108..7c349b8f1228 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -178,8 +178,8 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { "version", info.GetSchema().GetVersion()); NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique(TabletId, DataAccessorsManager, StoragesManager, - version, schemaInitializationData); + PrimaryIndex = std::make_unique( + TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, version, schemaInitializationData); } else if (PrimaryIndex->GetVersionedIndex().IsEmpty() || info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) { PrimaryIndex->RegisterSchemaVersion(version, schemaInitializationData); @@ -290,8 +290,8 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique( - TabletId, DataAccessorsManager, StoragesManager, version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); + PrimaryIndex = std::make_unique(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, + version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); for (auto&& i : Tables) { PrimaryIndex->RegisterTable(i.first); } @@ -355,10 +355,12 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& } TTablesManager::TTablesManager(const std::shared_ptr& storagesManager, - const std::shared_ptr& dataAccessorsManager, const ui64 tabletId) + const std::shared_ptr& dataAccessorsManager, + const std::shared_ptr& schemaCache, const ui64 tabletId) : StoragesManager(storagesManager) , DataAccessorsManager(dataAccessorsManager) , LoadTimeCounters(std::make_unique()) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) { } diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index f44ca4c872ce..d9894dc06d94 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -145,13 +145,15 @@ class TTablesManager { std::shared_ptr StoragesManager; std::shared_ptr DataAccessorsManager; std::unique_ptr LoadTimeCounters; + std::shared_ptr SchemaObjectsCache; ui64 TabletId = 0; public: friend class TTxInit; TTablesManager(const std::shared_ptr& storagesManager, - const std::shared_ptr& dataAccessorsManager, const ui64 tabletId); + const std::shared_ptr& dataAccessorsManager, + const std::shared_ptr& schemaCache, const ui64 tabletId); const std::unique_ptr& GetLoadTimeCounters() const { return LoadTimeCounters;