diff --git a/ydb/core/tx/columnshard/columnshard_impl.cpp b/ydb/core/tx/columnshard/columnshard_impl.cpp index dd74874db49f..65e179c8c391 100644 --- a/ydb/core/tx/columnshard/columnshard_impl.cpp +++ b/ydb/core/tx/columnshard/columnshard_impl.cpp @@ -83,7 +83,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet) , PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod()) , StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval()) , InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters()) - , TablesManager(StoragesManager, std::make_shared(nullptr), info->TabletID) + , TablesManager(StoragesManager, std::make_shared(nullptr), + std::make_shared(), info->TabletID) , Subscribers(std::make_shared(*this)) , PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig())) , InsertTable(std::make_unique()) diff --git a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp index d8f33c2906e1..28b13a7eebd0 100644 --- a/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp +++ b/ydb/core/tx/columnshard/data_sharing/destination/transactions/tx_data_from_source.cpp @@ -15,7 +15,7 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, auto& index = Self->TablesManager.MutablePrimaryIndexAsVerified(); for (auto& info : SchemeHistory) { - index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetSchema()); + index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetProto().GetId(), info.GetSchema()); } TDbWrapper dbWrapper(txc.DB, nullptr); diff --git a/ydb/core/tx/columnshard/engines/changes/indexation.cpp b/ydb/core/tx/columnshard/engines/changes/indexation.cpp index fe62107aa05f..3d3d5fb8c6c9 100644 --- a/ydb/core/tx/columnshard/engines/changes/indexation.cpp +++ b/ydb/core/tx/columnshard/engines/changes/indexation.cpp @@ -108,7 +108,7 @@ class TPathFieldsInfo { if (!Schemas.contains(data.GetSchemaVersion())) { Schemas.emplace(data.GetSchemaVersion(), blobSchema); } - auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false); + TColumnIdsView columnIds = blobSchema->GetIndexInfo().GetColumnIds(false); std::vector filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end()); if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) { filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG); @@ -247,7 +247,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont { const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange); - auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema(); + NArrow::TSchemaLiteView blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema(); auto batchSchema = std::make_shared(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end())); batch = std::make_shared(NArrow::DeserializeBatch(blobData, batchSchema)); diff --git a/ydb/core/tx/columnshard/engines/column_engine.h b/ydb/core/tx/columnshard/engines/column_engine.h index 13d5c954920b..b3beefc25aa9 100644 --- a/ydb/core/tx/columnshard/engines/column_engine.h +++ b/ydb/core/tx/columnshard/engines/column_engine.h @@ -354,9 +354,9 @@ class IColumnEngine { const std::shared_ptr& dataLocksManager, const ui64 memoryUsageLimit) noexcept = 0; virtual bool ApplyChangesOnTxCreate(std::shared_ptr changes, const TSnapshot& snapshot) noexcept = 0; virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr changes, const TSnapshot& snapshot) noexcept = 0; - virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0; - virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0; - virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0; + virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) = 0; + virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0; + virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0; virtual const TMap>& GetStats() const = 0; virtual const TColumnEngineStats& GetTotalStats() = 0; diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp index d25abc4941fd..aa588965e964 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.cpp +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.cpp @@ -26,30 +26,32 @@ namespace NKikimr::NOlap { -TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, +TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, - const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema) + const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) { ActualizationController = std::make_shared(); - RegisterSchemaVersion(snapshot, schema); + RegisterSchemaVersion(snapshot, presetId, schema); } -TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, +TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, const std::shared_ptr& dataAccessorsManager, - const std::shared_ptr& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema) + const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema) : GranulesStorage(std::make_shared(SignalCounters, dataAccessorsManager, storagesManager)) , DataAccessorsManager(dataAccessorsManager) , StoragesManager(storagesManager) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) , LastPortion(0) , LastGranule(0) { ActualizationController = std::make_shared(); - RegisterSchemaVersion(snapshot, std::move(schema)); + RegisterSchemaVersion(snapshot, presetId, std::move(schema)); } const TMap>& TColumnEngineForLogs::GetStats() const { @@ -138,7 +140,7 @@ void TColumnEngineForLogs::UpdatePortionStats( } } -void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { +void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& indexInfo) { AFL_VERIFY(DataAccessorsManager); bool switchOptimizer = false; bool switchAccessorsManager = false; @@ -150,7 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd } const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization(); - auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo)); + auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(indexInfo))); if (isCriticalScheme) { StartActualization({}); for (auto&& i : GranulesStorage->GetTables()) { @@ -170,7 +172,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd } } -void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) { +void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) { AFL_VERIFY(VersionedIndex.IsEmpty() || schema.GetVersion() >= VersionedIndex.GetLastSchema()->GetVersion())("empty", VersionedIndex.IsEmpty())("current", schema.GetVersion())( "last", VersionedIndex.GetLastSchema()->GetVersion()); @@ -184,10 +186,10 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema.GetSchemaVerified(), StoragesManager, SchemaObjectsCache); } AFL_VERIFY(indexInfoOptional); - RegisterSchemaVersion(snapshot, std::move(*indexInfoOptional)); + RegisterSchemaVersion(snapshot, presetId, std::move(*indexInfoOptional)); } -void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) { +void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) { AFL_VERIFY(!VersionedIndex.IsEmpty()); ui64 version = schema.GetVersion(); @@ -215,7 +217,7 @@ void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, c } AFL_VERIFY(indexInfoOptional); - VersionedIndex.AddIndex(snapshot, std::move(*indexInfoOptional)); + VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(*indexInfoOptional))); } std::shared_ptr TColumnEngineForLogs::BuildLoader(const std::shared_ptr& dsGroupSelector) { diff --git a/ydb/core/tx/columnshard/engines/column_engine_logs.h b/ydb/core/tx/columnshard/engines/column_engine_logs.h index 70ec8a852ba7..b62ce870dc30 100644 --- a/ydb/core/tx/columnshard/engines/column_engine_logs.h +++ b/ydb/core/tx/columnshard/engines/column_engine_logs.h @@ -61,7 +61,7 @@ class TColumnEngineForLogs: public IColumnEngine { std::shared_ptr StoragesManager; std::shared_ptr ActualizationController; - std::shared_ptr SchemaObjectsCache = std::make_shared(); + std::shared_ptr SchemaObjectsCache; TVersionedIndex VersionedIndex; std::shared_ptr VersionedIndexCopy; @@ -98,10 +98,13 @@ class TColumnEngineForLogs: public IColumnEngine { ADD, }; - TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& dataAccessorsManager, - const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema); - TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& dataAccessorsManager, - const std::shared_ptr& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema); + TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, + const std::shared_ptr& dataAccessorsManager, + const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, + const TSchemaInitializationData& schema); + TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr& schemaCache, + const std::shared_ptr& dataAccessorsManager, + const std::shared_ptr& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema); void OnTieringModified(const std::optional& ttl, const ui64 pathId) override; void OnTieringModified(const THashMap& ttl) override; @@ -157,9 +160,9 @@ class TColumnEngineForLogs: public IColumnEngine { virtual bool ApplyChangesOnExecute( IDbWrapper& db, std::shared_ptr indexChanges, const TSnapshot& snapshot) noexcept override; - void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override; - void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override; - void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override; + void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) override; + void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override; + void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override; std::shared_ptr Select( ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp new file mode 100644 index 000000000000..8f630cd397d1 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.cpp @@ -0,0 +1,3 @@ +#include "schema_version.h" + +namespace NKikimr::NOlap {} diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h new file mode 100644 index 000000000000..0f759d9c47c7 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h @@ -0,0 +1,38 @@ +#pragma once + +#include + +#include + +namespace NKikimr::NOlap { + +class TSchemaVersionId { +private: + YDB_READONLY_DEF(ui64, PresetId); + YDB_READONLY_DEF(ui64, Version); + +public: + struct THash { + ui64 operator()(const TSchemaVersionId& object) const { + return CombineHashes(object.PresetId, object.Version); + } + }; + + bool operator==(const TSchemaVersionId& other) const { + return std::tie(PresetId, Version) == std::tie(other.PresetId, other.Version); + } + + TSchemaVersionId(const ui64 presetId, const ui64 version) + : PresetId(presetId) + , Version(version) { + } +}; + +} + +template <> +struct THash { + inline size_t operator()(const NKikimr::NOlap::TSchemaVersionId& key) const { + return CombineHashes(key.GetPresetId(), key.GetVersion()); + } +}; diff --git a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make index 709793c4e38d..bf3aac5302b7 100644 --- a/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/abstract/ya.make @@ -3,6 +3,7 @@ LIBRARY() SRCS( index_info.cpp column_ids.cpp + schema_version.cpp ) PEERDIR( diff --git a/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp b/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp new file mode 100644 index 000000000000..9be4dd958459 --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/common/cache.cpp @@ -0,0 +1,3 @@ +#include "cache.h" + +namespace NKikimr::NOlap {} diff --git a/ydb/core/tx/columnshard/engines/scheme/common/cache.h b/ydb/core/tx/columnshard/engines/scheme/common/cache.h new file mode 100644 index 000000000000..3c277894641f --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/common/cache.h @@ -0,0 +1,70 @@ +#pragma once + +#include +#include +#include + +#include + +namespace NKikimr::NOlap { + +template +class TObjectCache { +private: + THashMap> Objects; + mutable TMutex Mutex; + +public: + class TEntryGuard { + private: + TKey Key; + std::shared_ptr Object; + TObjectCache& Cache; + + public: + TEntryGuard(TKey key, const std::shared_ptr object, TObjectCache& cache) + : Key(key) + , Object(object) + , Cache(cache) { + } + + const TObject* operator->() const { + return Object.get(); + } + const TObject& operator*() const { + return *Object; + } + + ~TEntryGuard() { + Object.reset(); + Cache.TryFree(Key); + } + }; + +public: + TEntryGuard Upsert(TKey key, TObject&& object) { + TGuard lock(Mutex); + auto* findSchema = Objects.FindPtr(key); + std::shared_ptr cachedObject; + if (findSchema) { + cachedObject = findSchema->lock(); + } + if (!cachedObject) { + cachedObject = std::make_shared(std::move(object)); + Objects[key] = cachedObject; + } + return TEntryGuard(std::move(key), cachedObject, *this); + } + + void TryFree(const TKey& key) { + TGuard lock(Mutex); + auto findObject = Objects.FindPtr(key); + if (findObject) { + if (findObject->expired()) { + Objects.erase(key); + } + } + } +}; + +} // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/common/ya.make b/ydb/core/tx/columnshard/engines/scheme/common/ya.make new file mode 100644 index 000000000000..6d84704af41d --- /dev/null +++ b/ydb/core/tx/columnshard/engines/scheme/common/ya.make @@ -0,0 +1,13 @@ +LIBRARY() + +SRCS( + cache.cpp +) + +PEERDIR( + ydb/library/actors/core +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp index 9974e027bfa3..29d40e0032ee 100644 --- a/ydb/core/tx/columnshard/engines/scheme/index_info.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/index_info.cpp @@ -76,7 +76,7 @@ std::vector TIndexInfo::GetColumnNames(const std::vector& ids) co } std::vector TIndexInfo::GetColumnSTLNames(const bool withSpecial) const { - const auto ids = GetColumnIds(withSpecial); + const TColumnIdsView ids = GetColumnIds(withSpecial); std::vector out; out.reserve(ids.size()); for (ui32 id : ids) { @@ -457,7 +457,7 @@ std::shared_ptr TIndexInfo::GetIndexMetaC } std::vector TIndexInfo::GetEntityIds() const { - const auto columnIds = GetColumnIds(true); + const TColumnIdsView columnIds = GetColumnIds(true); std::vector result(columnIds.begin(), columnIds.end()); for (auto&& i : Indexes) { result.emplace_back(i.first); diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp index 9b3da8861191..8b8d2f44b022 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp @@ -1,5 +1,12 @@ #include "objects_cache.h" +#include + namespace NKikimr::NOlap { +TSchemaObjectsCache::TSchemasCache::TEntryGuard TSchemaObjectsCache::UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo) { + const TSchemaVersionId versionId(presetId, indexInfo.GetVersion()); + return SchemasByVersion.Upsert(versionId, std::move(indexInfo)); +} + } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h index a203623b6025..fabd90894383 100644 --- a/ydb/core/tx/columnshard/engines/scheme/objects_cache.h +++ b/ydb/core/tx/columnshard/engines/scheme/objects_cache.h @@ -1,6 +1,9 @@ #pragma once #include "column_features.h" +#include +#include + #include #include #include @@ -10,13 +13,22 @@ namespace NKikimr::NOlap { class TSchemaObjectsCache { private: THashMap> Fields; - THashMap> ColumnFeatures; - THashSet StringsCache; mutable ui64 AcceptionFieldsCount = 0; + mutable TMutex FieldsMutex; + + THashMap> ColumnFeatures; mutable ui64 AcceptionFeaturesCount = 0; + mutable TMutex FeaturesMutex; + + using TSchemasCache = TObjectCache; + TSchemasCache SchemasByVersion; + + THashSet StringsCache; + mutable TMutex StringsMutex; public: const TString& GetStringCache(const TString& original) { + TGuard lock(StringsMutex); auto it = StringsCache.find(original); if (it == StringsCache.end()) { it = StringsCache.emplace(original).first; @@ -26,13 +38,16 @@ class TSchemaObjectsCache { void RegisterField(const TString& fingerprint, const std::shared_ptr& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString()); + TGuard lock(FieldsMutex); AFL_VERIFY(Fields.emplace(fingerprint, f).second); } void RegisterColumnFeatures(const TString& fingerprint, const std::shared_ptr& f) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_column_features")("fp", fingerprint)("info", f->DebugString()); + TGuard lock(FeaturesMutex); AFL_VERIFY(ColumnFeatures.emplace(fingerprint, f).second); } std::shared_ptr GetField(const TString& fingerprint) const { + TGuard lock(FieldsMutex); auto it = Fields.find(fingerprint); if (it == Fields.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_field_miss")("fp", fingerprint)("count", Fields.size())( @@ -47,6 +62,7 @@ class TSchemaObjectsCache { } template TConclusion> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) { + TGuard lock(FeaturesMutex); auto it = ColumnFeatures.find(fingerprint); if (it == ColumnFeatures.end()) { AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))( @@ -65,6 +81,31 @@ class TSchemaObjectsCache { } return it->second; } + + TSchemasCache::TEntryGuard UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo); +}; + +class TSchemaCachesManager { +private: + THashMap> CacheByTableOwner; + TMutex Mutex; + + std::shared_ptr GetCacheImpl(const ui64 ownerPathId) { + if (!ownerPathId) { + return std::make_shared(); + } + TGuard lock(Mutex); + auto findCache = CacheByTableOwner.FindPtr(ownerPathId); + if (findCache) { + return *findCache; + } + return CacheByTableOwner.emplace(ownerPathId, std::make_shared()).first->second; + } + +public: + static std::shared_ptr GetCache(const ui64 ownerPathId) { + return Singleton()->GetCacheImpl(ownerPathId); + } }; } // namespace NKikimr::NOlap diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp index 05277b7b8967..451145bee0f4 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.cpp @@ -1,32 +1,32 @@ #include "snapshot_scheme.h" +#include namespace NKikimr::NOlap { -TSnapshotSchema::TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot) +TSnapshotSchema::TSnapshotSchema(TObjectCache::TEntryGuard&& indexInfo, const TSnapshot& snapshot) : IndexInfo(std::move(indexInfo)) - , Schema(IndexInfo.ArrowSchemaWithSpecials()) - , Snapshot(snapshot) -{ + , Schema(IndexInfo->ArrowSchemaWithSpecials()) + , Snapshot(snapshot) { } TColumnSaver TSnapshotSchema::GetColumnSaver(const ui32 columnId) const { - return IndexInfo.GetColumnSaver(columnId); + return IndexInfo->GetColumnSaver(columnId); } std::shared_ptr TSnapshotSchema::GetColumnLoaderOptional(const ui32 columnId) const { - return IndexInfo.GetColumnLoaderOptional(columnId); + return IndexInfo->GetColumnLoaderOptional(columnId); } std::optional TSnapshotSchema::GetColumnIdOptional(const std::string& columnName) const { - return IndexInfo.GetColumnIdOptional(columnName); + return IndexInfo->GetColumnIdOptional(columnName); } ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const { - return IndexInfo.GetColumnIdVerified(columnName); + return IndexInfo->GetColumnIdVerified(columnName); } int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const { - return IndexInfo.GetColumnIndexOptional(columnId).value_or(-1); + return IndexInfo->GetColumnIndexOptional(columnId).value_or(-1); } const std::shared_ptr& TSnapshotSchema::GetSchema() const { @@ -34,7 +34,7 @@ const std::shared_ptr& TSnapshotSchema::GetSchema() const { } const TIndexInfo& TSnapshotSchema::GetIndexInfo() const { - return IndexInfo; + return *IndexInfo; } const TSnapshot& TSnapshotSchema::GetSnapshot() const { @@ -46,7 +46,7 @@ ui32 TSnapshotSchema::GetColumnsCount() const { } ui64 TSnapshotSchema::GetVersion() const { - return IndexInfo.GetVersion(); + return IndexInfo->GetVersion(); } } diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h index 5246d3926750..0cf6aa147d66 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/snapshot_scheme.h @@ -4,11 +4,13 @@ #include +#include + namespace NKikimr::NOlap { class TSnapshotSchema: public ISnapshotSchema { private: - TIndexInfo IndexInfo; + TObjectCache::TEntryGuard IndexInfo; std::shared_ptr Schema; TSnapshot Snapshot; protected: @@ -16,15 +18,15 @@ class TSnapshotSchema: public ISnapshotSchema { return TStringBuilder() << "(" "schema=" << Schema->ToString() << ";" << "snapshot=" << Snapshot.DebugString() << ";" << - "index_info=" << IndexInfo.DebugString() << ";" << + "index_info=" << IndexInfo->DebugString() << ";" << ")" ; } public: - TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot); + TSnapshotSchema(TObjectCache::TEntryGuard&& indexInfo, const TSnapshot& snapshot); virtual TColumnIdsView GetColumnIds() const override { - return IndexInfo.GetColumnIds(); + return IndexInfo->GetColumnIds(); } TColumnSaver GetColumnSaver(const ui32 columnId) const override; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp index d9a858f349c3..1642449df51f 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.cpp @@ -6,15 +6,15 @@ namespace NKikimr::NOlap { -const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) { +const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TObjectCache::TEntryGuard&& indexInfo) { if (Snapshots.empty()) { - PrimaryKey = indexInfo.GetPrimaryKey(); + PrimaryKey = indexInfo->GetPrimaryKey(); } else { - Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo.GetPrimaryKey())); + Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo->GetPrimaryKey())); } - const bool needActualization = indexInfo.GetSchemeNeedActualization(); - auto newVersion = indexInfo.GetVersion(); + const bool needActualization = indexInfo->GetSchemeNeedActualization(); + auto newVersion = indexInfo->GetVersion(); auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared(std::move(indexInfo), snapshot)); if (!itVersion.second) { AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion); diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h index e4e22071f1ed..81a57cd65eab 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h +++ b/ydb/core/tx/columnshard/engines/scheme/versions/versioned_index.h @@ -1,5 +1,8 @@ #pragma once #include "abstract_scheme.h" + +#include +#include #include namespace NKikimr::NOlap { @@ -123,7 +126,7 @@ class TVersionedIndex { return PrimaryKey; } - const TIndexInfo* AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo); + const TIndexInfo* AddIndex(const TSnapshot& snapshot, TObjectCache::TEntryGuard&& indexInfo); bool LoadShardingInfo(IDbWrapper& db); }; diff --git a/ydb/core/tx/columnshard/engines/scheme/versions/ya.make b/ydb/core/tx/columnshard/engines/scheme/versions/ya.make index 63dc44a74899..5b9cc7eff7c5 100644 --- a/ydb/core/tx/columnshard/engines/scheme/versions/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/versions/ya.make @@ -9,6 +9,7 @@ SRCS( PEERDIR( ydb/core/tx/columnshard/engines/scheme/abstract + ydb/core/tx/columnshard/engines/scheme/common ) END() diff --git a/ydb/core/tx/columnshard/engines/scheme/ya.make b/ydb/core/tx/columnshard/engines/scheme/ya.make index a8b2572ac574..295da3556bb9 100644 --- a/ydb/core/tx/columnshard/engines/scheme/ya.make +++ b/ydb/core/tx/columnshard/engines/scheme/ya.make @@ -22,6 +22,7 @@ PEERDIR( ydb/core/tx/columnshard/engines/scheme/versions ydb/core/tx/columnshard/engines/scheme/tiering ydb/core/tx/columnshard/engines/scheme/column + ydb/core/tx/columnshard/engines/scheme/common ydb/core/tx/columnshard/engines/scheme/defaults ydb/core/formats/arrow/accessor ydb/core/tx/columnshard/blobs_action/abstract diff --git a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp index 6d23f3de94a4..9635c24b3325 100644 --- a/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp +++ b/ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp @@ -524,7 +524,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { // load TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); for (auto&& i : paths) { engine.RegisterTable(i); } @@ -609,7 +609,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -710,7 +710,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -736,7 +736,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's overloaded after reload TColumnEngineForLogs tmpEngine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), 0, TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.TestingLoad(db); } @@ -768,7 +768,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // check it's not overloaded after reload TColumnEngineForLogs tmpEngine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, TSnapshot::Zero(), 0, TIndexInfo(tableInfo)); tmpEngine.RegisterTable(pathId); tmpEngine.TestingLoad(db); } @@ -789,7 +789,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { TSnapshot indexSnapshot(1, 1); { TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); @@ -868,7 +868,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) { { // load TColumnEngineForLogs engine( - 0, NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, TIndexInfo(tableInfo)); + 0, std::make_shared(), NDataAccessorControl::TLocalManager::BuildForTests(), CommonStoragesManager, indexSnapshot, 0, TIndexInfo(tableInfo)); engine.RegisterTable(pathId); engine.TestingLoad(db); diff --git a/ydb/core/tx/columnshard/loading/stages.cpp b/ydb/core/tx/columnshard/loading/stages.cpp index ac4ab092d4db..a0db85a87aaf 100644 --- a/ydb/core/tx/columnshard/loading/stages.cpp +++ b/ydb/core/tx/columnshard/loading/stages.cpp @@ -194,7 +194,8 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) { NIceDb::TNiceDb db(txc.DB); - TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), Self->TabletID()); + TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), + NOlap::TSchemaCachesManager::GetCache(Self->OwnerPathId), Self->TabletID()); { TMemoryProfileGuard g("TTxInit/TTablesManager"); if (!tablesManagerLocal.InitFromDB(db)) { diff --git a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp index 21a184f7fd99..e96a18d5e871 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/chunks.cpp @@ -140,7 +140,8 @@ TConclusion> TChunksNormalizer::DoInit( return tasks; } - TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), 0); + TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), + std::make_shared(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp index 811cae0957ae..7f4745f7bbc4 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp @@ -181,8 +181,8 @@ TConclusion> TLeakedBlobsNormalizer::DoInit( return TConclusionStatus::Fail("Not ready"); } - NColumnShard::TTablesManager tablesManager( - controller.GetStoragesManager(), std::make_shared(nullptr), TabletId); + NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), + std::make_shared(), TabletId); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp index bf4a6f5e89e1..f1ce6882b727 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp @@ -25,7 +25,8 @@ TConclusion> TPortionsNormalizerBase::DoInit( return TConclusionStatus::Fail("Not ready"); } - NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), 0); + NColumnShard::TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), + std::make_shared(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TPortionsNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp index a2c382e5dd68..b6681250975c 100644 --- a/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp +++ b/ydb/core/tx/columnshard/normalizer/portion/restore_portion_from_chunks.cpp @@ -72,7 +72,8 @@ TConclusion> TNormalizer::DoInit( return TConclusionStatus::Fail("Not ready"); } - TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), 0); + TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared(nullptr), + std::make_shared(), 0); if (!tablesManager.InitFromDB(db)) { ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager"); return TConclusionStatus::Fail("Can't load index"); diff --git a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp index 829f1cbe36a2..ceb87eb027e5 100644 --- a/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp +++ b/ydb/core/tx/columnshard/operations/batch_builder/merger.cpp @@ -48,14 +48,14 @@ NKikimr::TConclusionStatus TUpdateMerger::OnEqualKeys(const NArrow::NMerger::TSo AFL_VERIFY(Schema->GetIndexInfo().GetColumnIds(false).size() == exists.GetData().GetColumns().size()) ("index", Schema->GetIndexInfo().GetColumnIds(false).size())("exists", exists.GetData().GetColumns().size()); for (i32 columnIdx = 0; columnIdx < Schema->GetIndexInfo().ArrowSchema().num_fields(); ++columnIdx) { - const std::optional& incomingColumnIdx = IncomingColumnRemap[columnIdx]; - if (incomingColumnIdx && HasIncomingDataFlags[*incomingColumnIdx]->GetView(incoming.GetPosition())) { - const ui32 idxChunk = incoming.GetData().GetPositionInChunk(*incomingColumnIdx, incoming.GetPosition()); - rGuard.Add(*incoming.GetData().GetPositionAddress(*incomingColumnIdx).GetArray(), idxChunk); - } else { - const ui32 idxChunk = exists.GetData().GetPositionInChunk(columnIdx, exists.GetPosition()); - rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk); - } + const std::optional& incomingColumnIdx = IncomingColumnRemap[columnIdx]; + if (incomingColumnIdx && HasIncomingDataFlags[*incomingColumnIdx]->GetView(incoming.GetPosition())) { + const ui32 idxChunk = incoming.GetData().GetPositionInChunk(*incomingColumnIdx, incoming.GetPosition()); + rGuard.Add(*incoming.GetData().GetPositionAddress(*incomingColumnIdx).GetArray(), idxChunk); + } else { + const ui32 idxChunk = exists.GetData().GetPositionInChunk(columnIdx, exists.GetPosition()); + rGuard.Add(*exists.GetData().GetPositionAddress(columnIdx).GetArray(), idxChunk); + } } return TConclusionStatus::Success(); } diff --git a/ydb/core/tx/columnshard/tables_manager.cpp b/ydb/core/tx/columnshard/tables_manager.cpp index a1aee2a6a42b..63a74e12fb2b 100644 --- a/ydb/core/tx/columnshard/tables_manager.cpp +++ b/ydb/core/tx/columnshard/tables_manager.cpp @@ -178,13 +178,13 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) { "version", info.GetSchema().GetVersion()); NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique(TabletId, DataAccessorsManager, StoragesManager, - version, schemaInitializationData); + PrimaryIndex = std::make_unique( + TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, version, preset->Id, schemaInitializationData); } else if (PrimaryIndex->GetVersionedIndex().IsEmpty() || info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) { - PrimaryIndex->RegisterSchemaVersion(version, schemaInitializationData); + PrimaryIndex->RegisterSchemaVersion(version, preset->Id, schemaInitializationData); } else { - PrimaryIndex->RegisterOldSchemaVersion(version, schemaInitializationData); + PrimaryIndex->RegisterOldSchemaVersion(version, preset->Id, schemaInitializationData); } if (!rowset.Next()) { @@ -290,14 +290,14 @@ void TTablesManager::AddSchemaVersion( Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo); if (!PrimaryIndex) { - PrimaryIndex = std::make_unique( - TabletId, DataAccessorsManager, StoragesManager, version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); + PrimaryIndex = std::make_unique(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, + version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); for (auto&& i : Tables) { PrimaryIndex->RegisterTable(i.first); } PrimaryIndex->OnTieringModified(Ttl); } else { - PrimaryIndex->RegisterSchemaVersion(version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); + PrimaryIndex->RegisterSchemaVersion(version, presetId, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo)); } } @@ -352,11 +352,14 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot& } TTablesManager::TTablesManager(const std::shared_ptr& storagesManager, - const std::shared_ptr& dataAccessorsManager, const ui64 tabletId) + const std::shared_ptr& dataAccessorsManager, + const std::shared_ptr& schemaCache, const ui64 tabletId) : StoragesManager(storagesManager) , DataAccessorsManager(dataAccessorsManager) , LoadTimeCounters(std::make_unique()) + , SchemaObjectsCache(schemaCache) , TabletId(tabletId) { + AFL_VERIFY(SchemaObjectsCache); } bool TTablesManager::TryFinalizeDropPathOnExecute(NTable::TDatabase& dbTable, const ui64 pathId) const { diff --git a/ydb/core/tx/columnshard/tables_manager.h b/ydb/core/tx/columnshard/tables_manager.h index 05f1872c9234..2f8d41832814 100644 --- a/ydb/core/tx/columnshard/tables_manager.h +++ b/ydb/core/tx/columnshard/tables_manager.h @@ -145,13 +145,15 @@ class TTablesManager { std::shared_ptr StoragesManager; std::shared_ptr DataAccessorsManager; std::unique_ptr LoadTimeCounters; + std::shared_ptr SchemaObjectsCache; ui64 TabletId = 0; public: friend class TTxInit; TTablesManager(const std::shared_ptr& storagesManager, - const std::shared_ptr& dataAccessorsManager, const ui64 tabletId); + const std::shared_ptr& dataAccessorsManager, + const std::shared_ptr& schemaCache, const ui64 tabletId); const std::unique_ptr& GetLoadTimeCounters() const { return LoadTimeCounters;