Skip to content

Commit

Permalink
fix v1 chunks processing (ydb-platform#11180)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 8, 2025
1 parent 4883088 commit d37fdbb
Showing 1 changed file with 25 additions and 16 deletions.
41 changes: 25 additions & 16 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
@@ -1,8 +1,10 @@
#include "defs.h"
#include "db_wrapper.h"
#include "defs.h"

#include "portions/constructor.h"
#include <ydb/core/protos/flat_scheme_op.pb.h>

#include <ydb/core/protos/config.pb.h>
#include <ydb/core/protos/flat_scheme_op.pb.h>
#include <ydb/core/tx/columnshard/columnshard_schema.h>
#include <ydb/core/tx/sharding/sharding.h>

Expand Down Expand Up @@ -38,8 +40,7 @@ void TDbWrapper::EraseAborted(const TInsertedData& data) {
NColumnShard::Schema::InsertTable_EraseAborted(db, data);
}

bool TDbWrapper::Load(TInsertTableAccessor& insertTable,
const TInstant& loadTime) {
bool TDbWrapper::Load(TInsertTableAccessor& insertTable, const TInstant& loadTime) {
NIceDb::TNiceDb db(Database);
return NColumnShard::Schema::InsertTable_Load(db, DsGroupSelector, insertTable, loadTime);
}
Expand All @@ -54,8 +55,7 @@ void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRe
.Key(portion.GetPathId(), portion.GetPortionId(), row.ColumnId, row.Chunk)
.Update(NIceDb::TUpdate<IndexColumnsV1::BlobIdx>(row.GetBlobRange().GetBlobIdxVerified()),
NIceDb::TUpdate<IndexColumnsV1::Metadata>(rowProto.SerializeAsString()),
NIceDb::TUpdate<IndexColumnsV1::Offset>(row.BlobRange.Offset),
NIceDb::TUpdate<IndexColumnsV1::Size>(row.BlobRange.Size));
NIceDb::TUpdate<IndexColumnsV1::Offset>(row.BlobRange.Offset), NIceDb::TUpdate<IndexColumnsV1::Size>(row.BlobRange.Size));
}
if (AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) {
if (row.GetChunkIdx() == 0 && row.GetColumnId() == firstPKColumnId) {
Expand Down Expand Up @@ -105,9 +105,17 @@ void TDbWrapper::ErasePortion(const NOlap::TPortionInfo& portion) {

void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) {
NIceDb::TNiceDb db(Database);
using IndexColumns = NColumnShard::Schema::IndexColumns;
db.Table<IndexColumns>().Key(0, 0, row.ColumnId,
portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(), portion.GetPortionId(), row.Chunk).Delete();
if (AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()) {
using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1;
db.Table<IndexColumnsV1>().Key(portion.GetPathId(), portion.GetPortionId(), row.ColumnId, row.Chunk).Delete();
}
if (AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) {
using IndexColumns = NColumnShard::Schema::IndexColumns;
db.Table<IndexColumns>()
.Key(0, 0, row.ColumnId, portion.GetMinSnapshotDeprecated().GetPlanStep(), portion.GetMinSnapshotDeprecated().GetTxId(),
portion.GetPortionId(), row.Chunk)
.Delete();
}
}

bool TDbWrapper::LoadColumns(const std::function<void(const TColumnChunkLoadContextV1&)>& callback) {
Expand All @@ -129,7 +137,8 @@ bool TDbWrapper::LoadColumns(const std::function<void(const TColumnChunkLoadCont
return true;
}

bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) {
bool TDbWrapper::LoadPortions(
const std::function<void(NOlap::TPortionInfoConstructor&&, const NKikimrTxColumnShard::TIndexPortionMeta&)>& callback) {
NIceDb::TNiceDb db(Database);
using IndexPortions = NColumnShard::Schema::IndexPortions;
auto rowset = db.Table<IndexPortions>().Select();
Expand All @@ -154,8 +163,7 @@ bool TDbWrapper::LoadPortions(const std::function<void(NOlap::TPortionInfoConstr
}
if (rowset.GetValueOrDefault<IndexPortions::CommitPlanStep>(0)) {
AFL_VERIFY(rowset.GetValueOrDefault<IndexPortions::CommitTxId>(0));
portion.SetCommitSnapshot(
TSnapshot(rowset.GetValue<IndexPortions::CommitPlanStep>(), rowset.GetValue<IndexPortions::CommitTxId>()));
portion.SetCommitSnapshot(TSnapshot(rowset.GetValue<IndexPortions::CommitPlanStep>(), rowset.GetValue<IndexPortions::CommitTxId>()));
} else {
AFL_VERIFY(!rowset.GetValueOrDefault<IndexPortions::CommitTxId>(0));
}
Expand Down Expand Up @@ -185,8 +193,8 @@ void TDbWrapper::WriteIndex(const TPortionInfo& portion, const TIndexChunk& row)
} else if (auto bData = row.GetBlobDataOptional()) {
db.Table<IndexIndexes>()
.Key(portion.GetPathId(), portion.GetPortionId(), row.GetIndexId(), row.GetChunkIdx())
.Update(NIceDb::TUpdate<IndexIndexes::BlobData>(*bData),
NIceDb::TUpdate<IndexIndexes::RecordsCount>(row.GetRecordsCount()), NIceDb::TUpdate<IndexIndexes::RawBytes>(row.GetRawBytes()));
.Update(NIceDb::TUpdate<IndexIndexes::BlobData>(*bData), NIceDb::TUpdate<IndexIndexes::RecordsCount>(row.GetRecordsCount()),
NIceDb::TUpdate<IndexIndexes::RawBytes>(row.GetRawBytes()));
} else {
AFL_VERIFY(false);
}
Expand Down Expand Up @@ -240,7 +248,8 @@ TConclusion<THashMap<ui64, std::map<NOlap::TSnapshot, TGranuleShardingInfo>>> TD
snapshot.DeserializeFromString(rowset.GetValue<Schema::ShardingInfo::Snapshot>()).Validate();
NSharding::TGranuleShardingLogicContainer logic;
logic.DeserializeFromString(rowset.GetValue<Schema::ShardingInfo::Logic>()).Validate();
TGranuleShardingInfo gShardingInfo(logic, snapshot, rowset.GetValue<Schema::ShardingInfo::VersionId>(), rowset.GetValue<Schema::ShardingInfo::PathId>());
TGranuleShardingInfo gShardingInfo(
logic, snapshot, rowset.GetValue<Schema::ShardingInfo::VersionId>(), rowset.GetValue<Schema::ShardingInfo::PathId>());
AFL_VERIFY(result[gShardingInfo.GetPathId()].emplace(gShardingInfo.GetSinceSnapshot(), gShardingInfo).second);

if (!rowset.Next()) {
Expand All @@ -250,4 +259,4 @@ TConclusion<THashMap<ui64, std::map<NOlap::TSnapshot, TGranuleShardingInfo>>> TD
return result;
}

}
} // namespace NKikimr::NOlap

0 comments on commit d37fdbb

Please sign in to comment.