Skip to content

Commit

Permalink
share schemas between CS on same node
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 committed Dec 17, 2024
1 parent b2d56de commit a0c1c59
Show file tree
Hide file tree
Showing 13 changed files with 102 additions and 49 deletions.
17 changes: 8 additions & 9 deletions ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), info->TabletID)
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr),
std::make_shared<NOlap::TSchemaObjectsCache>(), info->TabletID)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
Expand Down Expand Up @@ -354,15 +355,13 @@ void TColumnShard::RunInit(const NKikimrTxColumnShard::TInitShard& proto, const

NIceDb::TNiceDb db(txc.DB);

if (proto.HasOwnerPathId()) {
OwnerPathId = proto.GetOwnerPathId();
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);
}
AFL_VERIFY(proto.HasOwnerPathId());
OwnerPathId = proto.GetOwnerPathId();
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPathId, OwnerPathId);

if (proto.HasOwnerPath()) {
OwnerPath = proto.GetOwnerPath();
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath);
}
AFL_VERIFY(proto.HasOwnerPathId());
OwnerPath = proto.GetOwnerPath();
Schema::SaveSpecialValue(db, Schema::EValueIds::OwnerPath, OwnerPath);

for (auto& createTable : proto.GetTables()) {
RunEnsureTable(createTable, version, txc);
Expand Down
17 changes: 10 additions & 7 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,25 +26,27 @@

namespace NKikimr::NOlap {

TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
, DataAccessorsManager(dataAccessorsManager)
, StoragesManager(storagesManager)
, SchemaObjectsCache(schemaCache)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0) {
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, schema);
}

TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
, DataAccessorsManager(dataAccessorsManager)
, StoragesManager(storagesManager)
, SchemaObjectsCache(schemaCache)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0) {
Expand Down Expand Up @@ -139,18 +141,19 @@ void TColumnEngineForLogs::UpdatePortionStats(
}

void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
std::shared_ptr<const TIndexInfo> cachedIndexInfo = SchemaObjectsCache->GetIndexInfoCache(std::move(indexInfo));
AFL_VERIFY(DataAccessorsManager);
bool switchOptimizer = false;
bool switchAccessorsManager = false;
if (!VersionedIndex.IsEmpty()) {
const NOlap::TIndexInfo& lastIndexInfo = VersionedIndex.GetLastSchema()->GetIndexInfo();
Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(indexInfo));
switchOptimizer = !indexInfo.GetCompactionPlannerConstructor()->IsEqualTo(lastIndexInfo.GetCompactionPlannerConstructor());
switchAccessorsManager = !indexInfo.GetMetadataManagerConstructor()->IsEqualTo(*lastIndexInfo.GetMetadataManagerConstructor());
Y_ABORT_UNLESS(lastIndexInfo.CheckCompatible(*cachedIndexInfo));
switchOptimizer = !cachedIndexInfo->GetCompactionPlannerConstructor()->IsEqualTo(lastIndexInfo.GetCompactionPlannerConstructor());
switchAccessorsManager = !cachedIndexInfo->GetMetadataManagerConstructor()->IsEqualTo(*lastIndexInfo.GetMetadataManagerConstructor());
}

const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization();
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo));
const bool isCriticalScheme = cachedIndexInfo->GetSchemeNeedActualization();
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(cachedIndexInfo));
if (isCriticalScheme) {
StartActualization({});
for (auto&& i : GranulesStorage->GetTables()) {
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class TColumnEngineForLogs: public IColumnEngine {
std::shared_ptr<IStoragesManager> StoragesManager;

std::shared_ptr<NActualizer::TController> ActualizationController;
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>();
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache;
TVersionedIndex VersionedIndex;
std::shared_ptr<TVersionedIndex> VersionedIndexCopy;

Expand Down Expand Up @@ -98,9 +98,11 @@ class TColumnEngineForLogs: public IColumnEngine {
ADD,
};

TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema);
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema);

void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) override;
Expand Down
12 changes: 12 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,17 @@
#include "objects_cache.h"

#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>

namespace NKikimr::NOlap {

std::shared_ptr<const TIndexInfo> TSchemaObjectsCache::GetIndexInfoCache(TIndexInfo&& indexInfo) {
const ui64 schemaVersion = indexInfo.GetVersion();
std::unique_lock lock(SchemasMutex);
auto* findSchema = SchemasByVersion.FindPtr(schemaVersion);
if (!findSchema || findSchema->expired()) {
SchemasByVersion[schemaVersion] = std::make_shared<TIndexInfo>(std::move(indexInfo));
}
return findSchema->lock();
}

} // namespace NKikimr::NOlap
35 changes: 33 additions & 2 deletions ydb/core/tx/columnshard/engines/scheme/objects_cache.h
Original file line number Diff line number Diff line change
Expand Up @@ -10,10 +10,18 @@ namespace NKikimr::NOlap {
class TSchemaObjectsCache {
private:
THashMap<TString, std::shared_ptr<arrow::Field>> Fields;
THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
THashSet<TString> StringsCache;
mutable ui64 AcceptionFieldsCount = 0;
mutable TMutex FieldsMutex;

THashMap<TString, std::shared_ptr<TColumnFeatures>> ColumnFeatures;
mutable ui64 AcceptionFeaturesCount = 0;
mutable TMutex FeaturesMutex;

THashMap<ui64, std::weak_ptr<const TIndexInfo>> SchemasByVersion;
mutable TMutex SchemasMutex;

THashSet<TString> StringsCache;
mutable TMutex StringsMutex;

public:
const TString& GetStringCache(const TString& original) {
Expand All @@ -26,13 +34,16 @@ class TSchemaObjectsCache {

void RegisterField(const TString& fingerprint, const std::shared_ptr<arrow::Field>& f) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_field")("fp", fingerprint)("f", f->ToString());
std::unique_lock lock(FieldsMutex);
AFL_VERIFY(Fields.emplace(fingerprint, f).second);
}
void RegisterColumnFeatures(const TString& fingerprint, const std::shared_ptr<TColumnFeatures>& f) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "register_column_features")("fp", fingerprint)("info", f->DebugString());
std::unique_lock lock(FeaturesMutex);
AFL_VERIFY(ColumnFeatures.emplace(fingerprint, f).second);
}
std::shared_ptr<arrow::Field> GetField(const TString& fingerprint) const {
std::unique_lock lock(FieldsMutex);
auto it = Fields.find(fingerprint);
if (it == Fields.end()) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_field_miss")("fp", fingerprint)("count", Fields.size())(
Expand All @@ -47,6 +58,7 @@ class TSchemaObjectsCache {
}
template <class TConstructor>
TConclusion<std::shared_ptr<TColumnFeatures>> GetOrCreateColumnFeatures(const TString& fingerprint, const TConstructor& constructor) {
std::unique_lock lock(FeaturesMutex);
auto it = ColumnFeatures.find(fingerprint);
if (it == ColumnFeatures.end()) {
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD)("event", "get_column_features_miss")("fp", UrlEscapeRet(fingerprint))(
Expand All @@ -65,6 +77,25 @@ class TSchemaObjectsCache {
}
return it->second;
}

std::shared_ptr<const TIndexInfo> GetIndexInfoCache(TIndexInfo&& indexInfo);
};

class TSchemaCachesManager {
private:
THashMap<ui64, std::shared_ptr<TSchemaObjectsCache>> CacheByTableOwner;
TMutex Mutex;

public:
std::shared_ptr<TSchemaObjectsCache> GetCache(const ui64 ownerPathId) {
AFL_VERIFY(ownerPathId);
std::unique_lock lock(Mutex);
auto findCache = CacheByTableOwner.FindPtr(ownerPathId);
if (findCache) {
return *findCache;
}
return CacheByTableOwner.emplace(ownerPathId, std::make_shared<TSchemaObjectsCache>()).first->second;
}
};

} // namespace NKikimr::NOlap
Original file line number Diff line number Diff line change
Expand Up @@ -2,39 +2,39 @@

namespace NKikimr::NOlap {

TSnapshotSchema::TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot)
TSnapshotSchema::TSnapshotSchema(const std::shared_ptr<const TIndexInfo>& indexInfo, const TSnapshot& snapshot)
: IndexInfo(std::move(indexInfo))
, Schema(IndexInfo.ArrowSchemaWithSpecials())
, Schema(IndexInfo->ArrowSchemaWithSpecials())
, Snapshot(snapshot)
{
}

TColumnSaver TSnapshotSchema::GetColumnSaver(const ui32 columnId) const {
return IndexInfo.GetColumnSaver(columnId);
return IndexInfo->GetColumnSaver(columnId);
}

std::shared_ptr<TColumnLoader> TSnapshotSchema::GetColumnLoaderOptional(const ui32 columnId) const {
return IndexInfo.GetColumnLoaderOptional(columnId);
return IndexInfo->GetColumnLoaderOptional(columnId);
}

std::optional<ui32> TSnapshotSchema::GetColumnIdOptional(const std::string& columnName) const {
return IndexInfo.GetColumnIdOptional(columnName);
return IndexInfo->GetColumnIdOptional(columnName);
}

ui32 TSnapshotSchema::GetColumnIdVerified(const std::string& columnName) const {
return IndexInfo.GetColumnIdVerified(columnName);
return IndexInfo->GetColumnIdVerified(columnName);
}

int TSnapshotSchema::GetFieldIndex(const ui32 columnId) const {
return IndexInfo.GetColumnIndexOptional(columnId).value_or(-1);
return IndexInfo->GetColumnIndexOptional(columnId).value_or(-1);
}

const std::shared_ptr<NArrow::TSchemaLite>& TSnapshotSchema::GetSchema() const {
return Schema;
}

const TIndexInfo& TSnapshotSchema::GetIndexInfo() const {
return IndexInfo;
return *IndexInfo;
}

const TSnapshot& TSnapshotSchema::GetSnapshot() const {
Expand All @@ -46,7 +46,7 @@ ui32 TSnapshotSchema::GetColumnsCount() const {
}

ui64 TSnapshotSchema::GetVersion() const {
return IndexInfo.GetVersion();
return IndexInfo->GetVersion();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -8,23 +8,23 @@ namespace NKikimr::NOlap {

class TSnapshotSchema: public ISnapshotSchema {
private:
TIndexInfo IndexInfo;
std::shared_ptr<const TIndexInfo> IndexInfo;
std::shared_ptr<NArrow::TSchemaLite> Schema;
TSnapshot Snapshot;
protected:
virtual TString DoDebugString() const override {
return TStringBuilder() << "("
"schema=" << Schema->ToString() << ";" <<
"snapshot=" << Snapshot.DebugString() << ";" <<
"index_info=" << IndexInfo.DebugString() << ";" <<
"index_info=" << IndexInfo->DebugString() << ";" <<
")"
;
}
public:
TSnapshotSchema(TIndexInfo&& indexInfo, const TSnapshot& snapshot);
TSnapshotSchema(const std::shared_ptr<const TIndexInfo>& indexInfo, const TSnapshot& snapshot);

virtual TColumnIdsView GetColumnIds() const override {
return IndexInfo.GetColumnIds();
return IndexInfo->GetColumnIds();
}

TColumnSaver GetColumnSaver(const ui32 columnId) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@

namespace NKikimr::NOlap {

const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
const TIndexInfo* TVersionedIndex::AddIndex(const TSnapshot& snapshot, const std::shared_ptr<const TIndexInfo>& indexInfo) {
if (Snapshots.empty()) {
PrimaryKey = indexInfo.GetPrimaryKey();
PrimaryKey = indexInfo->GetPrimaryKey();
} else {
Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo.GetPrimaryKey()));
Y_ABORT_UNLESS(PrimaryKey->Equals(indexInfo->GetPrimaryKey()));
}

const bool needActualization = indexInfo.GetSchemeNeedActualization();
auto newVersion = indexInfo.GetVersion();
auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(std::move(indexInfo), snapshot));
const bool needActualization = indexInfo->GetSchemeNeedActualization();
auto newVersion = indexInfo->GetVersion();
auto itVersion = SnapshotByVersion.emplace(newVersion, std::make_shared<TSnapshotSchema>(indexInfo, snapshot));
if (!itVersion.second) {
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("message", "Skip registered version")("version", LastSchemaVersion);
} else if (needActualization) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ class TVersionedIndex {
return PrimaryKey;
}

const TIndexInfo* AddIndex(const TSnapshot& snapshot, TIndexInfo&& indexInfo);
const TIndexInfo* AddIndex(const TSnapshot& snapshot, const std::shared_ptr<const TIndexInfo>& indexInfo);

bool LoadShardingInfo(IDbWrapper& db);
};
Expand Down
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/loading/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,8 @@ bool TSpecialValuesInitializer::DoPrecharge(NTabletFlatExecutor::TTransactionCon

bool TTablesManagerInitializer::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(), Self->TabletID());
TTablesManager tablesManagerLocal(Self->StoragesManager, Self->DataAccessorsManager.GetObjectPtrVerified(),
Singleton<NOlap::TSchemaCachesManager>()->GetCache(Self->OwnerPathId), Self->TabletID());
{
TMemoryProfileGuard g("TTxInit/TTablesManager");
if (!tablesManagerLocal.InitFromDB(db)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,8 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
return TConclusionStatus::Fail("Not ready");
}

TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr), 0);
TTablesManager tablesManager(controller.GetStoragesManager(), std::make_shared<NDataAccessorControl::TLocalManager>(nullptr),
std::make_shared<TSchemaObjectsCache>(), 0);
if (!tablesManager.InitFromDB(db)) {
ACFL_TRACE("normalizer", "TChunksNormalizer")("error", "can't initialize tables manager");
return TConclusionStatus::Fail("Can't load index");
Expand Down
12 changes: 7 additions & 5 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -178,8 +178,8 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
"version", info.GetSchema().GetVersion());
NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info);
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, DataAccessorsManager, StoragesManager,
version, schemaInitializationData);
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(
TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager, version, schemaInitializationData);
} else if (PrimaryIndex->GetVersionedIndex().IsEmpty() ||
info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) {
PrimaryIndex->RegisterSchemaVersion(version, schemaInitializationData);
Expand Down Expand Up @@ -290,8 +290,8 @@ void TTablesManager::AddSchemaVersion(const ui32 presetId, const NOlap::TSnapsho

Schema::SaveSchemaPresetVersionInfo(db, presetId, version, versionInfo);
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(
TabletId, DataAccessorsManager, StoragesManager, version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo));
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, SchemaObjectsCache, DataAccessorsManager, StoragesManager,
version, NOlap::IColumnEngine::TSchemaInitializationData(versionInfo));
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
}
Expand Down Expand Up @@ -355,10 +355,12 @@ void TTablesManager::AddTableVersion(const ui64 pathId, const NOlap::TSnapshot&
}

TTablesManager::TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, const ui64 tabletId)
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<NOlap::TSchemaObjectsCache>& schemaCache, const ui64 tabletId)
: StoragesManager(storagesManager)
, DataAccessorsManager(dataAccessorsManager)
, LoadTimeCounters(std::make_unique<TTableLoadTimeCounters>())
, SchemaObjectsCache(schemaCache)
, TabletId(tabletId) {
}

Expand Down
4 changes: 3 additions & 1 deletion ydb/core/tx/columnshard/tables_manager.h
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,15 @@ class TTablesManager {
std::shared_ptr<NOlap::IStoragesManager> StoragesManager;
std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager> DataAccessorsManager;
std::unique_ptr<TTableLoadTimeCounters> LoadTimeCounters;
std::shared_ptr<NOlap::TSchemaObjectsCache> SchemaObjectsCache;
ui64 TabletId = 0;

public:
friend class TTxInit;

TTablesManager(const std::shared_ptr<NOlap::IStoragesManager>& storagesManager,
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager, const ui64 tabletId);
const std::shared_ptr<NOlap::NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<NOlap::TSchemaObjectsCache>& schemaCache, const ui64 tabletId);

const std::unique_ptr<TTableLoadTimeCounters>& GetLoadTimeCounters() const {
return LoadTimeCounters;
Expand Down

0 comments on commit a0c1c59

Please sign in to comment.