Skip to content

Commit

Permalink
Merge 66f4406 into 95f1072
Browse files Browse the repository at this point in the history
  • Loading branch information
swalrus1 authored Dec 12, 2024
2 parents 95f1072 + 66f4406 commit fc6e924
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 40 deletions.
1 change: 1 addition & 0 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -171,6 +171,7 @@ void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, TInd
}

void TColumnEngineForLogs::RegisterSchemaVersion(const TSnapshot& snapshot, const TSchemaInitializationData& schema) {
AFL_VERIFY(schema.GetVersion() >= VersionedIndex.GetLastSchema()->GetVersion())("current", schema.GetVersion())("last", VersionedIndex.GetLastSchema()->GetVersion());
std::optional<NOlap::TIndexInfo> indexInfoOptional;
if (schema.GetDiff()) {
AFL_VERIFY(!VersionedIndex.IsEmpty());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,14 +100,14 @@ class TVersionedIndex {
}

ISnapshotSchema::TPtr GetLastSchemaBeforeOrEqualSnapshotOptional(const ui64 version) const {
ISnapshotSchema::TPtr res = nullptr;
for (auto it = SnapshotByVersion.rbegin(); it != SnapshotByVersion.rend(); ++it) {
if (it->first <= version) {
res = it->second;
break;
}
if (SnapshotByVersion.empty()) {
return nullptr;
}
auto upperBound = SnapshotByVersion.upper_bound(version);
if (upperBound == SnapshotByVersion.begin()) {
return nullptr;
}
return res;
return std::prev(upperBound)->second;
}

ISnapshotSchema::TPtr GetLastSchema() const {
Expand Down
59 changes: 26 additions & 33 deletions ydb/core/tx/columnshard/tables_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,6 @@ bool TTablesManager::FillMonitoringReport(NTabletFlatExecutor::TTransactionConte
}

bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
THashMap<ui32, TSchemaPreset> schemaPresets;
{
TLoadTimeSignals::TLoadTimer timer = LoadTimeCounters->TableLoadTimeCounters.StartGuard();
TMemoryProfileGuard g("TTablesManager/InitFromDB::Tables");
Expand Down Expand Up @@ -73,7 +72,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
}
}

bool isFakePresetOnly = true;
std::optional<TSchemaPreset> preset;
{
TLoadTimeSignals::TLoadTimer timer = LoadTimeCounters->SchemaPresetLoadTimeCounters.StartGuard();
TMemoryProfileGuard g("TTablesManager/InitFromDB::SchemaPresets");
Expand All @@ -83,23 +82,24 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
return false;
}

while (!rowset.EndOfSet()) {
TSchemaPreset preset;
preset.InitFromDB(rowset);
if (!rowset.EndOfSet()) {
preset->InitFromDB(rowset);

if (preset.IsStandaloneTable()) {
Y_VERIFY_S(!preset.GetName(), "Preset name: " + preset.GetName());
if (preset->IsStandaloneTable()) {
Y_VERIFY_S(!preset->GetName(), "Preset name: " + preset->GetName());
AFL_VERIFY(!preset->Id);
} else {
Y_VERIFY_S(preset.GetName() == "default", "Preset name: " + preset.GetName());
isFakePresetOnly = false;
Y_VERIFY_S(preset->GetName() == "default", "Preset name: " + preset->GetName());
AFL_VERIFY(preset->Id);
}
AFL_VERIFY(schemaPresets.emplace(preset.GetId(), preset).second);
AFL_VERIFY(SchemaPresetsIds.emplace(preset.GetId()).second);
AFL_VERIFY(SchemaPresetsIds.emplace(preset->GetId()).second);
if (!rowset.Next()) {
timer.AddLoadingFail();
return false;
}
}

AFL_VERIFY(rowset.EndOfSet())("reson", "multiple_presets_not_supported");
}

{
Expand All @@ -122,7 +122,8 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
NKikimrTxColumnShard::TTableVersionInfo versionInfo;
Y_ABORT_UNLESS(versionInfo.ParseFromString(rowset.GetValue<Schema::TableVersionInfo::InfoProto>()));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "load_table_version")("path_id", pathId)("snapshot", version);
Y_ABORT_UNLESS(schemaPresets.contains(versionInfo.GetSchemaPresetId()));
AFL_VERIFY(preset);
AFL_VERIFY(preset->Id == versionInfo.GetSchemaPresetId())("preset", preset->Id)("table", versionInfo.GetSchemaPresetId());

if (!table.IsDropped()) {
auto& ttlSettings = versionInfo.GetTtlSettings();
Expand Down Expand Up @@ -152,6 +153,7 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {
{
TLoadTimeSignals::TLoadTimer timer = LoadTimeCounters->SchemaPresetVersionsLoadTimeCounters.StartGuard();
TMemoryProfileGuard g("TTablesManager/InitFromDB::PresetVersions");

auto rowset = db.Table<Schema::SchemaPresetVersionInfo>().Select();
if (!rowset.IsReady()) {
timer.AddLoadingFail();
Expand All @@ -160,46 +162,37 @@ bool TTablesManager::InitFromDB(NIceDb::TNiceDb& db) {

while (!rowset.EndOfSet()) {
const ui32 id = rowset.GetValue<Schema::SchemaPresetVersionInfo::Id>();
Y_ABORT_UNLESS(schemaPresets.contains(id));
auto& preset = schemaPresets[id];
AFL_VERIFY(preset);
AFL_VERIFY(preset->Id == id)("preset", preset->Id)("schema", id);
NOlap::TSnapshot version(
rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceStep>(), rowset.GetValue<Schema::SchemaPresetVersionInfo::SinceTxId>());

TSchemaPreset::TSchemaPresetVersionInfo info;
Y_ABORT_UNLESS(info.ParseFromString(rowset.GetValue<Schema::SchemaPresetVersionInfo::InfoProto>()));
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "load_preset")("preset_id", id)("snapshot", version)(
"version", info.HasSchema() ? info.GetSchema().GetVersion() : -1);
preset.AddVersion(version, info);
if (!rowset.Next()) {
timer.AddLoadingFail();
return false;
}
}
}

TMemoryProfileGuard g("TTablesManager/InitFromDB::Other");
for (auto& [id, preset] : schemaPresets) {
if (isFakePresetOnly) {
Y_ABORT_UNLESS(id == 0);
} else {
Y_ABORT_UNLESS(id > 0);
}
for (auto it = preset.MutableVersionsById().begin(); it != preset.MutableVersionsById().end();) {
const auto version = it->first;
const auto& schemaInfo = it->second;
AFL_VERIFY(schemaInfo.HasSchema());
AFL_VERIFY(info.HasSchema());
AFL_INFO(NKikimrServices::TX_COLUMNSHARD)("event", "index_schema")("preset_id", id)("snapshot", version)(
"version", schemaInfo.GetSchema().GetVersion());
NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(schemaInfo);
"version", info.GetSchema().GetVersion());
NOlap::IColumnEngine::TSchemaInitializationData schemaInitializationData(info);
if (!PrimaryIndex) {
PrimaryIndex = std::make_unique<NOlap::TColumnEngineForLogs>(TabletId, DataAccessorsManager, StoragesManager,
preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInitializationData);
version, schemaInitializationData);
} else if (PrimaryIndex->GetVersionedIndex().IsEmpty() ||
info.GetSchema().GetVersion() > PrimaryIndex->GetVersionedIndex().GetLastSchema()->GetVersion()) {
PrimaryIndex->RegisterSchemaVersion(version, schemaInitializationData);
} else {
PrimaryIndex->RegisterSchemaVersion(preset.GetMinVersionForId(schemaInfo.GetSchema().GetVersion()), schemaInitializationData);
PrimaryIndex->RegisterOldSchemaVersion(version, schemaInitializationData);
}
it = preset.MutableVersionsById().erase(it);
}
}

TMemoryProfileGuard g("TTablesManager/InitFromDB::Other");
for (auto&& i : Tables) {
PrimaryIndex->RegisterTable(i.first);
}
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/schemeshard/olap/store/store.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,11 @@ bool TOlapStoreInfo::ParseFromRequest(const NKikimrSchemeOp::TColumnStoreDescrip
return false;
}

if (descriptionProto.SchemaPresetsSize() > 1) {
errors.AddError("trying to create an OLAP store with multiple schema presets (not supported yet)");
return false;
}

Name = descriptionProto.GetName();
StorageConfig = descriptionProto.GetStorageConfig();
// Make it easier by having data channel count always specified internally
Expand Down

0 comments on commit fc6e924

Please sign in to comment.