Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

share schemas between CS on same node #12673

Merged
merged 10 commits into from
Dec 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), info->TabletID)
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr),
std::make_shared<NOlap::TSchemaObjectsCache>(), info->TabletID)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc,
auto& index = Self->TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>();

for (auto& info : SchemeHistory) {
index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetSchema());
index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetProto().GetId(), info.GetSchema());
}

TDbWrapper dbWrapper(txc.DB, nullptr);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class TPathFieldsInfo {
if (!Schemas.contains(data.GetSchemaVersion())) {
Schemas.emplace(data.GetSchemaVersion(), blobSchema);
}
auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
TColumnIdsView columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end());
if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) {
filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
Expand Down Expand Up @@ -247,7 +247,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
{
const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange);

auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
NArrow::TSchemaLiteView blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
auto batchSchema =
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end()));
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,9 @@ class IColumnEngine {
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept = 0;
virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0;
virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0;
virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0;

virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0;
virtual const TColumnEngineStats& GetTotalStats() = 0;
Expand Down
26 changes: 14 additions & 12 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,32 @@

namespace NKikimr::NOlap {

TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema)
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
, DataAccessorsManager(dataAccessorsManager)
, StoragesManager(storagesManager)
, SchemaObjectsCache(schemaCache)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0) {
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, schema);
RegisterSchemaVersion(snapshot, presetId, schema);
}

TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema)
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
, DataAccessorsManager(dataAccessorsManager)
, StoragesManager(storagesManager)
, SchemaObjectsCache(schemaCache)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0) {
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, std::move(schema));
RegisterSchemaVersion(snapshot, presetId, std::move(schema));
}

const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& TColumnEngineForLogs::GetStats() const {
Expand Down Expand Up @@ -138,7 +140,7 @@ void TColumnEngineForLogs::UpdatePortionStats(
}
}

void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& indexInfo) {
AFL_VERIFY(DataAccessorsManager);
bool switchOptimizer = false;
bool switchAccessorsManager = false;
Expand All @@ -150,7 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
}

const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization();
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo));
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(indexInfo)));
if (isCriticalScheme) {
StartActualization({});
for (auto&& i : GranulesStorage->GetTables()) {
Expand All @@ -170,7 +172,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
}
}

void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) {
AFL_VERIFY(VersionedIndex.IsEmpty() || schema.GetVersion() >= VersionedIndex.GetLastSchema()->GetVersion())("empty", VersionedIndex.IsEmpty())("current", schema.GetVersion())(
"last", VersionedIndex.GetLastSchema()->GetVersion());

Expand All @@ -184,10 +186,10 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons
indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema.GetSchemaVerified(), StoragesManager, SchemaObjectsCache);
}
AFL_VERIFY(indexInfoOptional);
RegisterSchemaVersion(snapshot, std::move(*indexInfoOptional));
RegisterSchemaVersion(snapshot, presetId, std::move(*indexInfoOptional));
}

void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) {
AFL_VERIFY(!VersionedIndex.IsEmpty());

ui64 version = schema.GetVersion();
Expand Down Expand Up @@ -215,7 +217,7 @@ void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, c
}

AFL_VERIFY(indexInfoOptional);
VersionedIndex.AddIndex(snapshot, std::move(*indexInfoOptional));
VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(*indexInfoOptional)));
}

std::shared_ptr<ITxReader> TColumnEngineForLogs::BuildLoader(const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector) {
Expand Down
19 changes: 11 additions & 8 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class TColumnEngineForLogs: public IColumnEngine {
std::shared_ptr<IStoragesManager> StoragesManager;

std::shared_ptr<NActualizer::TController> ActualizationController;
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>();
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache;
TVersionedIndex VersionedIndex;
std::shared_ptr<TVersionedIndex> VersionedIndexCopy;

Expand Down Expand Up @@ -98,10 +98,13 @@ class TColumnEngineForLogs: public IColumnEngine {
ADD,
};

TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema);
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema);
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId,
const TSchemaInitializationData& schema);
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema);

void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) override;
void OnTieringModified(const THashMap<ui64, NOlap::TTiering>& ttl) override;
Expand Down Expand Up @@ -157,9 +160,9 @@ class TColumnEngineForLogs: public IColumnEngine {
virtual bool ApplyChangesOnExecute(
IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override;

void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override;
void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override;
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override;
void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) override;
void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;

std::shared_ptr<TSelectInfo> Select(
ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "schema_version.h"

namespace NKikimr::NOlap {}
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#pragma once

#include <ydb/library/accessor/accessor.h>

#include <util/digest/numeric.h>

namespace NKikimr::NOlap {

class TSchemaVersionId {
private:
YDB_READONLY_DEF(ui64, PresetId);
YDB_READONLY_DEF(ui64, Version);

public:
bool operator==(const TSchemaVersionId& other) const {
return std::tie(PresetId, Version) == std::tie(other.PresetId, other.Version);
}

TSchemaVersionId(const ui64 presetId, const ui64 version)
: PresetId(presetId)
, Version(version) {
}
};

}

template <>
struct THash<NKikimr::NOlap::TSchemaVersionId> {
inline size_t operator()(const NKikimr::NOlap::TSchemaVersionId& key) const {
return CombineHashes(key.GetPresetId(), key.GetVersion());
}
};
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/scheme/abstract/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ LIBRARY()
SRCS(
index_info.cpp
column_ids.cpp
schema_version.cpp
)

PEERDIR(
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/common/cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "cache.h"

namespace NKikimr::NOlap {}
72 changes: 72 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/common/cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#pragma once

#include <util/generic/hash.h>
#include <util/system/guard.h>
#include <util/system/mutex.h>

#include <memory>

namespace NKikimr::NOlap {

template <typename TKey, typename TObject>
class TObjectCache : std::enable_shared_from_this<TObjectCache<TKey, TObject>> {
private:
THashMap<TKey, std::weak_ptr<const TObject>> Objects;
mutable TMutex Mutex;

public:
class TEntryGuard {
private:
TKey Key;
std::shared_ptr<const TObject> Object;
std::weak_ptr<TObjectCache> Cache;

public:
TEntryGuard(TKey key, const std::shared_ptr<const TObject> object, TObjectCache* cache)
: Key(key)
, Object(object)
, Cache(cache->weak_from_this()) {
}

const TObject* operator->() const {
return Object.get();
}
const TObject& operator*() const {
return *Object;
}

~TEntryGuard() {
Object.reset();
if (auto cache = Cache.lock()) {
cache->TryFree(Key);
}
}
};

public:
TEntryGuard Upsert(TKey key, TObject&& object) {
TGuard lock(Mutex);
auto* findSchema = Objects.FindPtr(key);
std::shared_ptr<const TObject> cachedObject;
if (findSchema) {
cachedObject = findSchema->lock();
}
if (!cachedObject) {
cachedObject = std::make_shared<const TObject>(std::move(object));
Objects[key] = cachedObject;
}
return TEntryGuard(std::move(key), cachedObject, this);
}

void TryFree(const TKey& key) {
TGuard lock(Mutex);
auto findObject = Objects.FindPtr(key);
if (findObject) {
if (findObject->expired()) {
Objects.erase(key);
}
}
}
};

} // namespace NKikimr::NOlap
13 changes: 13 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/common/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
LIBRARY()

SRCS(
cache.cpp
)

PEERDIR(
ydb/library/actors/core
)

YQL_LAST_ABI_VERSION()

END()
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) co
}

std::vector<std::string> TIndexInfo::GetColumnSTLNames(const bool withSpecial) const {
const auto ids = GetColumnIds(withSpecial);
const TColumnIdsView ids = GetColumnIds(withSpecial);
std::vector<std::string> out;
out.reserve(ids.size());
for (ui32 id : ids) {
Expand Down Expand Up @@ -457,7 +457,7 @@ std::shared_ptr<NIndexes::NCountMinSketch::TIndexMeta> TIndexInfo::GetIndexMetaC
}

std::vector<ui32> TIndexInfo::GetEntityIds() const {
const auto columnIds = GetColumnIds(true);
const TColumnIdsView columnIds = GetColumnIds(true);
std::vector<ui32> result(columnIds.begin(), columnIds.end());
for (auto&& i : Indexes) {
result.emplace_back(i.first);
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include "objects_cache.h"

#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>

namespace NKikimr::NOlap {

TSchemaObjectsCache::TSchemasCache::TEntryGuard TSchemaObjectsCache::UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo) {
const TSchemaVersionId versionId(presetId, indexInfo.GetVersion());
return SchemasByVersion.Upsert(versionId, std::move(indexInfo));
}

} // namespace NKikimr::NOlap
Loading
Loading