Skip to content

Commit

Permalink
Merge 65981ef into efaaa8a
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 19, 2024
2 parents efaaa8a + 65981ef commit b448be5
Show file tree
Hide file tree
Showing 30 changed files with 282 additions and 82 deletions.
3 changes: 2 additions & 1 deletion ydb/core/tx/columnshard/columnshard_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,8 @@ TColumnShard::TColumnShard(TTabletStorageInfo* info, const TActorId& tablet)
, PeriodicWakeupActivationPeriod(NYDBTest::TControllers::GetColumnShardController()->GetPeriodicWakeupActivationPeriod())
, StatsReportInterval(NYDBTest::TControllers::GetColumnShardController()->GetStatsReportInterval())
, InFlightReadsTracker(StoragesManager, Counters.GetRequestsTracingCounters())
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr), info->TabletID)
, TablesManager(StoragesManager, std::make_shared<NOlap::NDataAccessorControl::TLocalManager>(nullptr),
std::make_shared<NOlap::TSchemaObjectsCache>(), info->TabletID)
, Subscribers(std::make_shared<NSubscriber::TManager>(*this))
, PipeClientCache(NTabletPipe::CreateBoundedClientCache(new NTabletPipe::TBoundedClientCacheConfig(), GetPipeClientConfig()))
, InsertTable(std::make_unique<NOlap::TInsertTable>())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ bool TTxDataFromSource::DoExecute(NTabletFlatExecutor::TTransactionContext& txc,
auto& index = Self->TablesManager.MutablePrimaryIndexAsVerified<NOlap::TColumnEngineForLogs>();

for (auto& info : SchemeHistory) {
index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetSchema());
index.RegisterOldSchemaVersion(info.GetSnapshot(), info.GetProto().GetId(), info.GetSchema());
}

TDbWrapper dbWrapper(txc.DB, nullptr);
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/changes/indexation.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ class TPathFieldsInfo {
if (!Schemas.contains(data.GetSchemaVersion())) {
Schemas.emplace(data.GetSchemaVersion(), blobSchema);
}
auto columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
TColumnIdsView columnIds = blobSchema->GetIndexInfo().GetColumnIds(false);
std::vector<ui32> filteredIds = data.GetMeta().GetSchemaSubset().Apply(columnIds.begin(), columnIds.end());
if (data.GetMeta().GetModificationType() == NEvWrite::EModificationType::Delete) {
filteredIds.emplace_back((ui32)IIndexInfo::ESpecialColumn::DELETE_FLAG);
Expand Down Expand Up @@ -247,7 +247,7 @@ TConclusionStatus TInsertColumnEngineChanges::DoConstructBlobs(TConstructionCont
{
const auto blobData = Blobs.Extract(IStoragesManager::DefaultStorageId, blobRange);

auto blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
NArrow::TSchemaLiteView blobSchemaView = blobSchema->GetIndexInfo().ArrowSchema();
auto batchSchema =
std::make_shared<arrow::Schema>(inserted.GetMeta().GetSchemaSubset().Apply(blobSchemaView.begin(), blobSchemaView.end()));
batch = std::make_shared<NArrow::TGeneralContainer>(NArrow::DeserializeBatch(blobData, batchSchema));
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -354,9 +354,9 @@ class IColumnEngine {
const std::shared_ptr<NDataLocks::TManager>& dataLocksManager, const ui64 memoryUsageLimit) noexcept = 0;
virtual bool ApplyChangesOnTxCreate(std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual bool ApplyChangesOnExecute(IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> changes, const TSnapshot& snapshot) noexcept = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0;
virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) = 0;
virtual void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0;
virtual void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) = 0;

virtual const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& GetStats() const = 0;
virtual const TColumnEngineStats& GetTotalStats() = 0;
Expand Down
26 changes: 14 additions & 12 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,30 +26,32 @@

namespace NKikimr::NOlap {

TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema)
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
, DataAccessorsManager(dataAccessorsManager)
, StoragesManager(storagesManager)
, SchemaObjectsCache(schemaCache)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0) {
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, schema);
RegisterSchemaVersion(snapshot, presetId, schema);
}

TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId,
TColumnEngineForLogs::TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema)
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema)
: GranulesStorage(std::make_shared<TGranulesStorage>(SignalCounters, dataAccessorsManager, storagesManager))
, DataAccessorsManager(dataAccessorsManager)
, StoragesManager(storagesManager)
, SchemaObjectsCache(schemaCache)
, TabletId(tabletId)
, LastPortion(0)
, LastGranule(0) {
ActualizationController = std::make_shared<NActualizer::TController>();
RegisterSchemaVersion(snapshot, std::move(schema));
RegisterSchemaVersion(snapshot, presetId, std::move(schema));
}

const TMap<ui64, std::shared_ptr<TColumnEngineStats>>& TColumnEngineForLogs::GetStats() const {
Expand Down Expand Up @@ -138,7 +140,7 @@ void TColumnEngineForLogs::UpdatePortionStats(
}
}

void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& indexInfo) {
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& indexInfo) {
AFL_VERIFY(DataAccessorsManager);
bool switchOptimizer = false;
bool switchAccessorsManager = false;
Expand All @@ -150,7 +152,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
}

const bool isCriticalScheme = indexInfo.GetSchemeNeedActualization();
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, std::move(indexInfo));
auto* indexInfoActual = VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(indexInfo)));
if (isCriticalScheme) {
StartActualization({});
for (auto&& i : GranulesStorage->GetTables()) {
Expand All @@ -170,7 +172,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
}
}

void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) {
AFL_VERIFY(VersionedIndex.IsEmpty() || schema.GetVersion() >= VersionedIndex.GetLastSchema()->GetVersion())("empty", VersionedIndex.IsEmpty())("current", schema.GetVersion())(
"last", VersionedIndex.GetLastSchema()->GetVersion());

Expand All @@ -184,10 +186,10 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, cons
indexInfoOptional = NOlap::TIndexInfo::BuildFromProto(schema.GetSchemaVerified(), StoragesManager, SchemaObjectsCache);
}
AFL_VERIFY(indexInfoOptional);
RegisterSchemaVersion(snapshot, std::move(*indexInfoOptional));
RegisterSchemaVersion(snapshot, presetId, std::move(*indexInfoOptional));
}

void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) {
AFL_VERIFY(!VersionedIndex.IsEmpty());

ui64 version = schema.GetVersion();
Expand Down Expand Up @@ -215,7 +217,7 @@ void TColumnEngineForLogs::RegisterOldSchemaVersion(const TSnapshot& snapshot, c
}

AFL_VERIFY(indexInfoOptional);
VersionedIndex.AddIndex(snapshot, std::move(*indexInfoOptional));
VersionedIndex.AddIndex(snapshot, SchemaObjectsCache->UpsertIndexInfo(presetId, std::move(*indexInfoOptional)));
}

std::shared_ptr<ITxReader> TColumnEngineForLogs::BuildLoader(const std::shared_ptr<IBlobGroupSelector>& dsGroupSelector) {
Expand Down
19 changes: 11 additions & 8 deletions ydb/core/tx/columnshard/engines/column_engine_logs.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ class TColumnEngineForLogs: public IColumnEngine {
std::shared_ptr<IStoragesManager> StoragesManager;

std::shared_ptr<NActualizer::TController> ActualizationController;
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache = std::make_shared<TSchemaObjectsCache>();
std::shared_ptr<TSchemaObjectsCache> SchemaObjectsCache;
TVersionedIndex VersionedIndex;
std::shared_ptr<TVersionedIndex> VersionedIndexCopy;

Expand Down Expand Up @@ -98,10 +98,13 @@ class TColumnEngineForLogs: public IColumnEngine {
ADD,
};

TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const TSchemaInitializationData& schema);
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, TIndexInfo&& schema);
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId,
const TSchemaInitializationData& schema);
TColumnEngineForLogs(const ui64 tabletId, const std::shared_ptr<TSchemaObjectsCache>& schemaCache,
const std::shared_ptr<NDataAccessorControl::IDataAccessorsManager>& dataAccessorsManager,
const std::shared_ptr<IStoragesManager>& storagesManager, const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& schema);

void OnTieringModified(const std::optional<NOlap::TTiering>& ttl, const ui64 pathId) override;
void OnTieringModified(const THashMap<ui64, NOlap::TTiering>& ttl) override;
Expand Down Expand Up @@ -157,9 +160,9 @@ class TColumnEngineForLogs: public IColumnEngine {
virtual bool ApplyChangesOnExecute(
IDbWrapper& db, std::shared_ptr<TColumnEngineChanges> indexChanges, const TSnapshot& snapshot) noexcept override;

void RegisterSchemaVersion(const TSnapshot& snapshot, TIndexInfo&& info) override;
void RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override;
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) override;
void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, TIndexInfo&& info) override;
void RegisterSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;
void RegisterOldSchemaVersion(const TSnapshot& snapshot, const ui64 presetId, const TSchemaInitializationData& schema) override;

std::shared_ptr<TSelectInfo> Select(
ui64 pathId, TSnapshot snapshot, const TPKRangesFilter& pkRangesFilter, const bool withUncommitted) const override;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "schema_version.h"

namespace NKikimr::NOlap {}
38 changes: 38 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/abstract/schema_version.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
#pragma once

#include <ydb/library/accessor/accessor.h>

#include <util/digest/numeric.h>

namespace NKikimr::NOlap {

class TSchemaVersionId {
private:
YDB_READONLY_DEF(ui64, PresetId);
YDB_READONLY_DEF(ui64, Version);

public:
struct THash {
ui64 operator()(const TSchemaVersionId& object) const {
return CombineHashes(object.PresetId, object.Version);
}
};

bool operator==(const TSchemaVersionId& other) const {
return std::tie(PresetId, Version) == std::tie(other.PresetId, other.Version);
}

TSchemaVersionId(const ui64 presetId, const ui64 version)
: PresetId(presetId)
, Version(version) {
}
};

}

template <>
struct THash<NKikimr::NOlap::TSchemaVersionId> {
inline size_t operator()(const NKikimr::NOlap::TSchemaVersionId& key) const {
return CombineHashes(key.GetPresetId(), key.GetVersion());
}
};
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/scheme/abstract/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ LIBRARY()
SRCS(
index_info.cpp
column_ids.cpp
schema_version.cpp
)

PEERDIR(
Expand Down
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/common/cache.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
#include "cache.h"

namespace NKikimr::NOlap {}
72 changes: 72 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/common/cache.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
#pragma once

#include <util/generic/hash.h>
#include <util/system/guard.h>
#include <util/system/mutex.h>

#include <memory>

namespace NKikimr::NOlap {

template <typename TKey, typename TObject>
class TObjectCache : std::enable_shared_from_this<TObjectCache<TKey, TObject>> {
private:
THashMap<TKey, std::weak_ptr<const TObject>> Objects;
mutable TMutex Mutex;

public:
class TEntryGuard {
private:
TKey Key;
std::shared_ptr<const TObject> Object;
std::weak_ptr<TObjectCache> Cache;

public:
TEntryGuard(TKey key, const std::shared_ptr<const TObject> object, TObjectCache* cache)
: Key(key)
, Object(object)
, Cache(cache->weak_from_this()) {
}

const TObject* operator->() const {
return Object.get();
}
const TObject& operator*() const {
return *Object;
}

~TEntryGuard() {
Object.reset();
if (auto cache = Cache.lock()) {
cache->TryFree(Key);
}
}
};

public:
TEntryGuard Upsert(TKey key, TObject&& object) {
TGuard lock(Mutex);
auto* findSchema = Objects.FindPtr(key);
std::shared_ptr<const TObject> cachedObject;
if (findSchema) {
cachedObject = findSchema->lock();
}
if (!cachedObject) {
cachedObject = std::make_shared<const TObject>(std::move(object));
Objects[key] = cachedObject;
}
return TEntryGuard(std::move(key), cachedObject, this);
}

void TryFree(const TKey& key) {
TGuard lock(Mutex);
auto findObject = Objects.FindPtr(key);
if (findObject) {
if (findObject->expired()) {
Objects.erase(key);
}
}
}
};

} // namespace NKikimr::NOlap
13 changes: 13 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/common/ya.make
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
LIBRARY()

SRCS(
cache.cpp
)

PEERDIR(
ydb/library/actors/core
)

YQL_LAST_ABI_VERSION()

END()
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/scheme/index_info.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ std::vector<TString> TIndexInfo::GetColumnNames(const std::vector<ui32>& ids) co
}

std::vector<std::string> TIndexInfo::GetColumnSTLNames(const bool withSpecial) const {
const auto ids = GetColumnIds(withSpecial);
const TColumnIdsView ids = GetColumnIds(withSpecial);
std::vector<std::string> out;
out.reserve(ids.size());
for (ui32 id : ids) {
Expand Down Expand Up @@ -457,7 +457,7 @@ std::shared_ptr<NIndexes::NCountMinSketch::TIndexMeta> TIndexInfo::GetIndexMetaC
}

std::vector<ui32> TIndexInfo::GetEntityIds() const {
const auto columnIds = GetColumnIds(true);
const TColumnIdsView columnIds = GetColumnIds(true);
std::vector<ui32> result(columnIds.begin(), columnIds.end());
for (auto&& i : Indexes) {
result.emplace_back(i.first);
Expand Down
7 changes: 7 additions & 0 deletions ydb/core/tx/columnshard/engines/scheme/objects_cache.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
#include "objects_cache.h"

#include <ydb/core/tx/columnshard/engines/scheme/index_info.h>

namespace NKikimr::NOlap {

TSchemaObjectsCache::TSchemasCache::TEntryGuard TSchemaObjectsCache::UpsertIndexInfo(const ui64 presetId, TIndexInfo&& indexInfo) {
const TSchemaVersionId versionId(presetId, indexInfo.GetVersion());
return SchemasByVersion.Upsert(versionId, std::move(indexInfo));
}

} // namespace NKikimr::NOlap
Loading

0 comments on commit b448be5

Please sign in to comment.