Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

optimize memory allocation for schema versions on init #12533

Merged
merged 7 commits into from
Dec 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,9 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
}

void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
AFL_VERIFY(VersionedIndex.IsEmpty() || schema.GetVersion() >= VersionedIndex.GetLastSchema()->GetVersion())("empty", VersionedIndex.IsEmpty())("current", schema.GetVersion())(
"last", VersionedIndex.GetLastSchema()->GetVersion());

std::optional<NOlap::TIndexInfo> indexInfoOptional;
if (schema.GetDiff()) {
AFL_VERIFY(!VersionedIndex.IsEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ class TVersionedIndex {
}

ISnapshotSchema::TPtr GetLastSchemaBeforeOrEqualSnapshotOptional(const ui64 version) const {
ISnapshotSchema::TPtr res = nullptr;
for (auto it = SnapshotByVersion.rbegin(); it != SnapshotByVersion.rend(); ++it) {
if (it->first <= version) {
res = it->second;
break;
}
if (SnapshotByVersion.empty()) {
return nullptr;
}
auto upperBound = SnapshotByVersion.upper_bound(version);
if (upperBound == SnapshotByVersion.begin()) {
return nullptr;
}
return res;
return std::prev(upperBound)->second;
}

ISnapshotSchema::TPtr GetLastSchema() const {
Expand Down
69 changes: 32 additions & 37 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ bool TTablesManager::FillMonitoringReport(NTabletFlatExecutor::TTransactionConte
}

bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
THashMap<ui32, TSchemaPreset> schemaPresets;
{
TLoadTimeSignals::TLoadTimer timer = LoadTimeCounters->TableLoadTimeCounters.StartGuard();
TMemoryProfileGuard g("TTablesManager/InitFromDB::Tables");
Expand Down Expand Up @@ -73,7 +72,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
}
}

bool isFakePresetOnly = true;
std::optional<TSchemaPreset> preset;
{
TLoadTimeSignals::TLoadTimer timer = LoadTimeCounters->SchemaPresetLoadTimeCounters.StartGuard();
TMemoryProfileGuard g("TTablesManager/InitFromDB::SchemaPresets");
Expand All @@ -83,23 +82,25 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
return false;
}

while (!rowset.EndOfSet()) {
TSchemaPreset preset;
preset.InitFromDB(rowset);
if (!rowset.EndOfSet()) {
preset = TSchemaPreset();
preset->InitFromDB(rowset);

if (preset.IsStandaloneTable()) {
Y_VERIFY_S(!preset.GetName(), "Preset name: " + preset.GetName());
if (preset->IsStandaloneTable()) {
Y_VERIFY_S(!preset->GetName(), "Preset name: " + preset->GetName());
AFL_VERIFY(!preset->Id);
} else {
Y_VERIFY_S(preset.GetName() == "default", "Preset name: " + preset.GetName());
isFakePresetOnly = false;
Y_VERIFY_S(preset->GetName() == "default", "Preset name: " + preset->GetName());
AFL_VERIFY(preset->Id);
}
AFL_VERIFY(schemaPresets.emplace(preset.GetId(), preset).second);
AFL_VERIFY(SchemaPresetsIds.emplace(preset.GetId()).second);
AFL_VERIFY(SchemaPresetsIds.emplace(preset->GetId()).second);
if (!rowset.Next()) {
timer.AddLoadingFail();
return false;
}
}

AFL_VERIFY(rowset.EndOfSet())("reson", "multiple_presets_not_supported");
}

{
Expand All @@ -122,7 +123,8 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
NKikimrTxColumnShard::TTableVersionInfo versionInfo;
Y_ABORT_UNLESS(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "load_table_version")("path_id", pathId)("snapshot", version);
Y_ABORT_UNLESS(schemaPresets.contains(versionInfo.GetSchemaPresetId()));
AFL_VERIFY(preset);
AFL_VERIFY(preset->Id == versionInfo.GetSchemaPresetId())("preset", preset->Id)("table", versionInfo.GetSchemaPresetId());

if (!table.IsDropped()) {
auto& ttlSettings = versionInfo.GetTtlSettings();
Expand Down Expand Up @@ -152,6 +154,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
{
TLoadTimeSignals::TLoadTimer timer = LoadTimeCounters->SchemaPresetVersionsLoadTimeCounters.StartGuard();
TMemoryProfileGuard g("TTablesManager/InitFromDB::PresetVersions");

auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
if (!rowset.IsReady()) {
timer.AddLoadingFail();
Expand All @@ -160,46 +163,38 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {

while (!rowset.EndOfSet()) {
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
Y_ABORT_UNLESS(schemaPresets.contains(id));
auto& preset = schemaPresets[id];
AFL_VERIFY(preset);
AFL_VERIFY(preset->Id == id)("preset", preset->Id)("schema", id);
NOlap::TSnapshot version(
rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>());

TSchemaPreset::TSchemaPresetVersionInfo info;
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "load_preset")("preset_id", id)("snapshot", version)(
"version", info.HasSchema() ? info.GetSchema().GetVersion() : -1);
preset.AddVersion(version, info);
if (!rowset.Next()) {
timer.AddLoadingFail();
return false;
}
}
}

TMemoryProfileGuard g("TTablesManager/InitFromDB::Other");
for (auto& [id, preset] : schemaPresets) {
if (isFakePresetOnly) {
Y_ABORT_UNLESS(id == 0);
} else {
Y_ABORT_UNLESS(id > 0);
}
for (auto it = preset.MutableVersionsById().begin(); it != preset.MutableVersionsById().end();) {
const auto version = it->first;
const auto& schemaInfo = it->second;
AFL_VERIFY(schemaInfo.HasSchema());
AFL_VERIFY(info.HasSchema());
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "index_schema")("preset_id", id)("snapshot", version)(
"version", schemaInfo.GetSchema().GetVersion());
NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(schemaInfo);
"version", info.GetSchema().GetVersion());
NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info);
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, DataAccessorsManager, StoragesManager,
preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInitializationData);
version, schemaInitializationData);
} else if (PrimaryIndex->GetVersionedIndex().IsEmpty() ||
info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) {
PrimaryIndex->RegisterSchemaVersion(version, schemaInitializationData);
} else {
PrimaryIndex->RegisterSchemaVersion(preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInitializationData);
PrimaryIndex->RegisterOldSchemaVersion(version, schemaInitializationData);
}

if (!rowset.Next()) {
timer.AddLoadingFail();
return false;
}
it = preset.MutableVersionsById().erase(it);
}
}

TMemoryProfileGuard g("TTablesManager/InitFromDB::Other");
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
}
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,6 @@ class TSchemaVersionsCleaner : public NYDBTest::ILocalDBModifier {
db.Table<Schema::TableVersionInfo>().Key(1, 5, 1).Update(
NIceDb::TUpdate<Schema::TableVersionInfo::InfoProto>(versionInfo.SerializeAsString()));
}

db.Table<Schema::SchemaPresetInfo>().Key(10).Update(NIceDb::TUpdate<Schema::SchemaPresetInfo::Name>("default"));

}
};

Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/olap/store/store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ bool TOlapStoreInfo::ParseFromRequest(const NKikimrSchemeOp::TColumnStoreDescrip
return false;
}

if (descriptionProto.SchemaPresetsSize() > 1) {
errors.AddError("trying to create an OLAP store with multiple schema presets (not supported yet)");
return false;
}

Name = descriptionProto.GetName();
StorageConfig = descriptionProto.GetStorageConfig();
// Make it easier by having data channel count always specified internally
Expand Down
Loading