Skip to content

Commit

Permalink
v2 portions usage only available (#12530)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Dec 12, 2024
1 parent 95f1072 commit 251a022
Show file tree
Hide file tree
Showing 14 changed files with 66 additions and 41 deletions.
6 changes: 6 additions & 0 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -1070,6 +1070,12 @@ class TColumnChunkLoadContextV2 {
MetadataProto = rowset.template GetValue<NColumnShard::Schema::IndexColumnsV2::Metadata>();
}

TColumnChunkLoadContextV2(const ui64 pathId, const ui64 portionId, const NKikimrTxColumnShard::TIndexPortionAccessor& proto)
: PathId(pathId)
, PortionId(portionId)
, MetadataProto(proto.SerializeAsString()) {
}

std::vector<TColumnChunkLoadContextV1> BuildRecordsV1() const {
std::vector<TColumnChunkLoadContextV1> records;
NKikimrTxColumnShard::TIndexPortionAccessor metaProto;
Expand Down
15 changes: 8 additions & 7 deletions ydb/core/tx/columnshard/engines/db_wrapper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,12 @@ bool TDbWrapper::Load(TInsertTableAccessor& insertTable, const TInstant& loadTim
}

void TDbWrapper::WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) {
if (!AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage() && !AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) {
return;
}
NIceDb::TNiceDb db(Database);
using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1;
auto rowProto = row.GetMeta().SerializeToProto();
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage() || AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage());
if (AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage()) {
db.Table<IndexColumnsV1>()
.Key(portion.GetPathId(), portion.GetPortionId(), row.ColumnId, row.Chunk)
Expand Down Expand Up @@ -118,16 +120,16 @@ void TDbWrapper::EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRe
}
}

bool TDbWrapper::LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV1&&)>& callback) {
bool TDbWrapper::LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV2&&)>& callback) {
NIceDb::TNiceDb db(Database);
using IndexColumnsV1 = NColumnShard::Schema::IndexColumnsV1;
using IndexColumnsV2 = NColumnShard::Schema::IndexColumnsV2;
const auto pred = [&](auto& rowset) {
if (!rowset.IsReady()) {
return false;
}

while (!rowset.EndOfSet()) {
NOlap::TColumnChunkLoadContextV1 chunkLoadContext(rowset);
NOlap::TColumnChunkLoadContextV2 chunkLoadContext(rowset);
callback(std::move(chunkLoadContext));

if (!rowset.Next()) {
Expand All @@ -137,10 +139,10 @@ bool TDbWrapper::LoadColumns(const std::optional<ui64> pathId, const std::functi
return true;
};
if (pathId) {
auto rowset = db.Table<IndexColumnsV1>().Prefix(*pathId).Select();
auto rowset = db.Table<IndexColumnsV2>().Prefix(*pathId).Select();
return pred(rowset);
} else {
auto rowset = db.Table<IndexColumnsV1>().Select();
auto rowset = db.Table<IndexColumnsV2>().Select();
return pred(rowset);
}
}
Expand Down Expand Up @@ -290,7 +292,6 @@ TConclusion<THashMap<ui64, std::map<NOlap::TSnapshot, TGranuleShardingInfo>>> TD
void TDbWrapper::WriteColumns(const NOlap::TPortionInfo& portion, const NKikimrTxColumnShard::TIndexPortionAccessor& proto) {
NIceDb::TNiceDb db(Database);
using IndexColumnsV2 = NColumnShard::Schema::IndexColumnsV2;
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());
db.Table<IndexColumnsV2>()
.Key(portion.GetPathId(), portion.GetPortionId())
.Update(NIceDb::TUpdate<IndexColumnsV2::Metadata>(proto.SerializeAsString()));
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/engines/db_wrapper.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ class TDatabase;

namespace NKikimr::NOlap {

class TColumnChunkLoadContextV1;
class TColumnChunkLoadContextV2;
class TIndexChunkLoadContext;
class TInsertedData;
class TCommittedData;
Expand Down Expand Up @@ -49,7 +49,7 @@ class IDbWrapper {

virtual void WriteColumn(const TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) = 0;
virtual void EraseColumn(const TPortionInfo& portion, const TColumnRecord& row) = 0;
virtual bool LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV1&&)>& callback) = 0;
virtual bool LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV2&&)>& callback) = 0;

virtual void WritePortion(const NOlap::TPortionInfo& portion) = 0;
virtual void ErasePortion(const NOlap::TPortionInfo& portion) = 0;
Expand Down Expand Up @@ -89,7 +89,7 @@ class TDbWrapper : public IDbWrapper {
void WriteColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row, const ui32 firstPKColumnId) override;
void WriteColumns(const NOlap::TPortionInfo& portion, const NKikimrTxColumnShard::TIndexPortionAccessor& proto) override;
void EraseColumn(const NOlap::TPortionInfo& portion, const TColumnRecord& row) override;
bool LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV1&&)>& callback) override;
bool LoadColumns(const std::optional<ui64> pathId, const std::function<void(TColumnChunkLoadContextV2&&)>& callback) override;

virtual void WriteIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
virtual void EraseIndex(const TPortionInfo& portion, const TIndexChunk& row) override;
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/engines/storage/granule/granule.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -243,9 +243,11 @@ bool TGranuleMeta::TestingLoad(IDbWrapper& db, const TVersionedIndex& versionedI

{
TPortionInfo::TSchemaCursor schema(versionedIndex);
if (!db.LoadColumns(PathId, [&](TColumnChunkLoadContextV1&& loadContext) {
if (!db.LoadColumns(PathId, [&](TColumnChunkLoadContextV2&& loadContext) {
auto* constructor = constructors.GetConstructorVerified(loadContext.GetPortionId());
constructor->LoadRecord(std::move(loadContext));
for (auto&& i : loadContext.BuildRecordsV1()) {
constructor->LoadRecord(std::move(i));
}
})) {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions ydb/core/tx/columnshard/engines/storage/granule/stages.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@ bool TGranuleColumnsReader::DoExecute(NTabletFlatExecutor::TTransactionContext&
TDbWrapper db(txc.DB, &*DsGroupSelector);
TPortionInfo::TSchemaCursor schema(*VersionedIndex);
Context->ClearRecords();
return db.LoadColumns(Self->GetPathId(), [&](TColumnChunkLoadContextV1&& loadContext) {
return db.LoadColumns(Self->GetPathId(), [&](TColumnChunkLoadContextV2&& loadContext) {
Context->Add(std::move(loadContext));
});
}

bool TGranuleColumnsReader::DoPrecharge(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
NIceDb::TNiceDb db(txc.DB);
return db.Table<NColumnShard::Schema::IndexColumnsV1>().Prefix(Self->GetPathId()).Select().IsReady();
return db.Table<NColumnShard::Schema::IndexColumnsV2>().Prefix(Self->GetPathId()).Select().IsReady();
}

bool TGranuleIndexesReader::DoExecute(NTabletFlatExecutor::TTransactionContext& txc, const TActorContext& /*ctx*/) {
Expand Down
5 changes: 5 additions & 0 deletions ydb/core/tx/columnshard/engines/storage/granule/stages.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,11 @@ class TPortionsLoadContext {
auto& constructor = MutableConstructor(chunk.GetPortionId());
constructor.MutableRecords().emplace_back(std::move(chunk));
}
void Add(TColumnChunkLoadContextV2&& chunk) {
for (auto&& i : chunk.BuildRecordsV1()) {
Add(std::move(i));
}
}
};

class TGranuleOnlyPortionsReader: public ITxReader {
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/ut/ut_insert_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ class TTestInsertTableDB : public IDbWrapper {
}
void EraseColumn(const TPortionInfo&, const TColumnRecord&) override {
}
bool LoadColumns(const std::optional<ui64> /*reqPathId*/, const std::function<void(TColumnChunkLoadContextV1&&)>&) override {
bool LoadColumns(const std::optional<ui64> /*reqPathId*/, const std::function<void(TColumnChunkLoadContextV2&&)>&) override {
return true;
}

Expand Down
30 changes: 13 additions & 17 deletions ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,11 +35,16 @@ std::shared_ptr<NDataLocks::TManager> EmptyDataLocksManager = std::make_shared<N

class TTestDbWrapper: public IDbWrapper {
private:
std::map<TPortionAddress, std::map<TChunkAddress, TColumnChunkLoadContextV1>> LoadContexts;
std::map<TPortionAddress, TColumnChunkLoadContextV2> LoadContexts;

public:
virtual void WriteColumns(const NOlap::TPortionInfo& /*portion*/, const NKikimrTxColumnShard::TIndexPortionAccessor& /*proto*/) override {

virtual void WriteColumns(const NOlap::TPortionInfo& portion, const NKikimrTxColumnShard::TIndexPortionAccessor& proto) override {
auto it = LoadContexts.find(portion.GetAddress());
if (it == LoadContexts.end()) {
LoadContexts.emplace(portion.GetAddress(), TColumnChunkLoadContextV2(portion.GetPathId(), portion.GetPortionId(), proto));
} else {
it->second = TColumnChunkLoadContextV2(portion.GetPathId(), portion.GetPortionId(), proto);
}
}

virtual const IBlobGroupSelector* GetDsGroupSelector() const override {
Expand Down Expand Up @@ -124,11 +129,6 @@ class TTestDbWrapper: public IDbWrapper {
}

auto& data = Indices[0].Columns[portion.GetPathId()];
NOlap::TColumnChunkLoadContextV1 loadContext(portion.GetPathId(), portion.GetPortionId(), row.GetAddress(), row.BlobRange, rowProto);
auto itInsertInfo = LoadContexts[portion.GetAddress()].emplace(row.GetAddress(), loadContext);
if (!itInsertInfo.second) {
itInsertInfo.first->second = loadContext;
}
auto it = data.find(portion.GetPortionId());
if (it == data.end()) {
it = data.emplace(portion.GetPortionId(), TPortionInfoConstructor(portion, true, true)).first;
Expand Down Expand Up @@ -173,7 +173,7 @@ class TTestDbWrapper: public IDbWrapper {
portionLocal.TestMutableRecords().swap(filtered);
}

bool LoadColumns(const std::optional<ui64> reqPathId, const std::function<void(TColumnChunkLoadContextV1&&)>& callback) override {
bool LoadColumns(const std::optional<ui64> reqPathId, const std::function<void(TColumnChunkLoadContextV2&&)>& callback) override {
auto& columns = Indices[0].Columns;
for (auto& [pathId, portions] : columns) {
if (pathId && *reqPathId != pathId) {
Expand All @@ -182,14 +182,10 @@ class TTestDbWrapper: public IDbWrapper {
for (auto& [portionId, portionLocal] : portions) {
auto copy = portionLocal.MakeCopy();
copy.TestMutableRecords().clear();
for (const auto& rec : portionLocal.GetRecords()) {
auto address = copy.GetPortionConstructor().GetAddress();
auto itContextLoader = LoadContexts[address].find(rec.GetAddress());
Y_ABORT_UNLESS(itContextLoader != LoadContexts[address].end());
auto copy = itContextLoader->second;
callback(std::move(copy));
LoadContexts[address].erase(itContextLoader);
}
auto it = LoadContexts.find(portionLocal.GetPortionConstructor().GetAddress());
AFL_VERIFY(it != LoadContexts.end());
callback(std::move(it->second));
LoadContexts.erase(it);
}
}
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,10 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
using namespace NColumnShard;
NIceDb::TNiceDb db(txc.DB);

if (!AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage()) {
return std::vector<INormalizerTask::TPtr>();
}

bool ready = true;
ready = ready & Schema::Precharge<Schema::IndexColumns>(db, txc.DB.GetScheme());
ready = ready & Schema::Precharge<Schema::IndexPortions>(db, txc.DB.GetScheme());
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/columnshard/normalizer/portion/clean_empty.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -234,13 +234,16 @@ std::optional<std::vector<std::vector<std::shared_ptr<IDBModifier>>>> GetPortion
std::vector<TPortionAddress> pack;
std::map<TPortionAddress, std::vector<TIterator>> iteration;
const bool v0Usage = AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage();
const ui32 SourcesCount = v0Usage ? 4 : 3;
const bool v1Usage = AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage();
ui32 SourcesCount = 2;
if (v0Usage) {
++SourcesCount;
if (v0Portions.size()) {
iteration[v0Portions.begin()->first].emplace_back(v0Portions);
}
}
{
if (v1Usage) {
++SourcesCount;
if (v1Portions.size()) {
iteration[v1Portions.begin()->first].emplace_back(v1Portions);
}
Expand Down Expand Up @@ -312,7 +315,6 @@ class TChanges: public INormalizerChanges {
TConclusion<std::vector<INormalizerTask::TPtr>> TCleanEmptyPortionsNormalizer::DoInit(
const TNormalizationController&, NTabletFlatExecutor::TTransactionContext& txc) {
using namespace NColumnShard;
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());
auto batchesToDelete = GetPortionsToDelete(txc, DsGroupSelector);
if (!batchesToDelete) {
return TConclusionStatus::Fail("Not ready");
Expand Down
6 changes: 4 additions & 2 deletions ydb/core/tx/columnshard/normalizer/portion/leaked_blobs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,9 +221,11 @@ TConclusionStatus TLeakedBlobsNormalizer::LoadPortionBlobIds(
}
if (Records.empty()) {
THashMap<ui64, std::vector<TColumnChunkLoadContextV1>> recordsLocal;
if (!wrapper.LoadColumns(std::nullopt, [&](TColumnChunkLoadContextV1&& chunk) {
if (!wrapper.LoadColumns(std::nullopt, [&](TColumnChunkLoadContextV2&& chunk) {
const ui64 portionId = chunk.GetPortionId();
recordsLocal[portionId].emplace_back(std::move(chunk));
for (auto&& i : chunk.BuildRecordsV1()) {
recordsLocal[portionId].emplace_back(std::move(i));
}
})) {
return TConclusionStatus::Fail("repeated read db");
}
Expand Down
8 changes: 5 additions & 3 deletions ydb/core/tx/columnshard/normalizer/portion/normalizer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns(
const NColumnShard::TTablesManager& tablesManager, NIceDb::TNiceDb& db, THashMap<ui64, TPortionAccessorConstructor>& portions) {
using namespace NColumnShard;
auto columnsFilter = GetColumnsFilter(tablesManager.GetPrimaryIndexSafe().GetVersionedIndex().GetLastSchema());
auto rowset = db.Table<Schema::IndexColumnsV1>().Select();
auto rowset = db.Table<Schema::IndexColumnsV2>().Select();
if (!rowset.IsReady()) {
return TConclusionStatus::Fail("Not ready");
}
Expand All @@ -126,8 +126,10 @@ TConclusionStatus TPortionsNormalizerBase::InitColumns(
};

while (!rowset.EndOfSet()) {
NOlap::TColumnChunkLoadContextV1 chunkLoadContext(rowset);
initPortion(std::move(chunkLoadContext));
NOlap::TColumnChunkLoadContextV2 chunkLoadContext(rowset);
for (auto&& i : chunkLoadContext.BuildRecordsV1()) {
initPortion(std::move(i));
}

if (!rowset.Next()) {
return TConclusionStatus::Fail("Not ready");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,7 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
}

AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV0Usage());
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());
{
std::vector<TPatchItemRemoveV1> package;
for (auto&& [portionId, chunkInfo] : columns1Remove) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,6 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
if (!ready) {
return TConclusionStatus::Fail("Not ready");
}
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());
THashSet<TPortionAddress> readyPortions;
THashMap<TPortionAddress, TV2BuildTask> buildPortions;
{
Expand Down Expand Up @@ -168,6 +167,11 @@ TConclusion<std::vector<INormalizerTask::TPtr>> TNormalizer::DoInit(
}

std::vector<INormalizerTask::TPtr> tasks;
if (buildPortions.empty()) {
return tasks;
}
AFL_VERIFY(AppDataVerified().ColumnShardConfig.GetColumnChunksV1Usage());

{
std::vector<TV2BuildTask> package;
for (auto&& [portionAddress, portionInfos] : buildPortions) {
Expand Down

0 comments on commit 251a022

Please sign in to comment.