Skip to content

Commit

Permalink
actualize local db for columnshards (ydb-platform#11115)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored Oct 31, 2024
1 parent b9d09c3 commit 21fa904
Show file tree
Hide file tree
Showing 33 changed files with 703 additions and 282 deletions.
1 change: 0 additions & 1 deletion .github/config/muted_ya.txt
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ ydb/core/kqp/ut/sysview KqpSystemView.PartitionStatsFollower
ydb/core/mind/hive/ut THiveTest.DrainWithHiveRestart
ydb/core/persqueue/ut [*/*] chunk chunk
ydb/core/quoter/ut QuoterWithKesusTest.PrefetchCoefficient
ydb/core/tx/columnshard/ut_rw Normalizers.CleanEmptyPortionsNormalizer
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithData
ydb/core/tx/schemeshard/ut_move_reboots TSchemeShardMoveRebootsTest.WithDataAndPersistentPartitionStats
ydb/core/tx/schemeshard/ut_pq_reboots TPqGroupTestReboots.AlterWithReboots-PQConfigTransactionsAtSchemeShard-false
Expand Down
67 changes: 53 additions & 14 deletions ydb/core/tx/columnshard/columnshard_schema.h
Original file line number Diff line number Diff line change
Expand Up @@ -901,57 +901,94 @@ struct Schema : NIceDb::Schema {
}

namespace NKikimr::NOlap {
class TPortionLoadContext {
private:
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexPortionMeta, MetaProto);

public:
template <class TSource>
TPortionLoadContext(const TSource& rowset) {
PathId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PathId>();
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexPortions::PortionId>();
const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexPortions::Metadata>();
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
}
};

class TColumnChunkLoadContext {
private:
YDB_READONLY_DEF(TBlobRange, BlobRange);
TChunkAddress Address;
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
YDB_READONLY_DEF(NKikimrTxColumnShard::TIndexColumnMeta, MetaProto);
YDB_READONLY(TSnapshot, RemoveSnapshot, TSnapshot::Zero());
YDB_READONLY(TSnapshot, MinSnapshotDeprecated, TSnapshot::Zero());

public:
const TChunkAddress& GetAddress() const {
return Address;
}

TColumnChunkLoadContext(const TChunkAddress& address, const TBlobRange& bRange, const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
TColumnChunkLoadContext(const ui64 pathId, const ui64 portionId, const TChunkAddress& address, const TBlobRange& bRange,
const NKikimrTxColumnShard::TIndexColumnMeta& metaProto)
: BlobRange(bRange)
, Address(address)
, MetaProto(metaProto)
{

, PathId(pathId)
, PortionId(portionId)
, MetaProto(metaProto) {
}

template <class TSource>
TColumnChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
: Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(), rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>()) {
: Address(rowset.template GetValue<NColumnShard::Schema::IndexColumns::ColumnIdx>(),
rowset.template GetValue<NColumnShard::Schema::IndexColumns::Chunk>())
, RemoveSnapshot(rowset.template GetValue<NColumnShard::Schema::IndexColumns::XPlanStep>(),
rowset.template GetValue<NColumnShard::Schema::IndexColumns::XTxId>())
, MinSnapshotDeprecated(rowset.template GetValue<NColumnShard::Schema::IndexColumns::PlanStep>(),
rowset.template GetValue<NColumnShard::Schema::IndexColumns::TxId>())
{
AFL_VERIFY(Address.GetColumnId())("event", "incorrect address")("address", Address.DebugString());
TString strBlobId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Blob>();
Y_ABORT_UNLESS(strBlobId.size() == sizeof(TLogoBlobID), "Size %" PRISZT " doesn't match TLogoBlobID", strBlobId.size());
TLogoBlobID logoBlobId((const ui64*)strBlobId.data());
BlobRange.BlobId = NOlap::TUnifiedBlobId(dsGroupSelector->GetGroup(logoBlobId), logoBlobId);
BlobRange.Offset = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Offset>();
BlobRange.Size = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Size>();
PathId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::PathId>();
PortionId = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Portion>();
AFL_VERIFY(BlobRange.BlobId.IsValid() && BlobRange.Size)("event", "incorrect blob")("blob", BlobRange.ToString());

const TString metadata = rowset.template GetValue<NColumnShard::Schema::IndexColumns::Metadata>();
AFL_VERIFY(MetaProto.ParseFromArray(metadata.data(), metadata.size()))("event", "cannot parse metadata as protobuf");
}

const NKikimrTxColumnShard::TIndexPortionMeta* GetPortionMeta() const {
if (MetaProto.HasPortionMeta()) {
return &MetaProto.GetPortionMeta();
} else {
return nullptr;
}
}
};

class TIndexChunkLoadContext {
private:
YDB_READONLY_DEF(std::optional<TBlobRange>, BlobRange);
YDB_READONLY_DEF(std::optional<TString>, BlobData);
YDB_READONLY(ui64, PathId, 0);
YDB_READONLY(ui64, PortionId, 0);
TChunkAddress Address;
const ui32 RecordsCount;
const ui32 RawBytes;
public:
ui32 GetRawBytes() const {
return RawBytes;
}

ui32 GetDataSize() const {
if (BlobRange) {
return BlobRange->GetSize();
} else {
AFL_VERIFY(!!BlobData);
return BlobData->size();
}
}

TIndexChunk BuildIndexChunk(const TBlobRangeLink16::TLinkId blobLinkId) const {
AFL_VERIFY(BlobRange);
return TIndexChunk(Address.GetColumnId(), Address.GetChunkIdx(), RecordsCount, RawBytes, BlobRange->BuildLink(blobLinkId));
Expand All @@ -964,7 +1001,9 @@ class TIndexChunkLoadContext {

template <class TSource>
TIndexChunkLoadContext(const TSource& rowset, const IBlobGroupSelector* dsGroupSelector)
: Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
: PathId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PathId>())
, PortionId(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::PortionId>())
, Address(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::IndexId>(), rowset.template GetValue<NColumnShard::Schema::IndexIndexes::ChunkIdx>())
, RecordsCount(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RecordsCount>())
, RawBytes(rowset.template GetValue<NColumnShard::Schema::IndexIndexes::RawBytes>())
{
Expand Down
8 changes: 8 additions & 0 deletions ydb/core/tx/columnshard/common/blob.h
Original file line number Diff line number Diff line change
Expand Up @@ -189,6 +189,14 @@ struct TBlobRange {
ui32 Offset;
ui32 Size;

ui32 GetSize() const {
return Size;
}

ui32 GetOffset() const {
return Offset;
}

TString GetData(const TString& blobData) const;

bool operator<(const TBlobRange& br) const {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ NKikimr::TConclusionStatus TDestinationSession::DataReceived(
AFL_VERIFY(it != PathIds.end())("path_id_undefined", i.first);
for (auto&& portion : i.second.DetachPortions()) {
portion.MutablePortionInfo().SetPathId(it->second);
index.AppendPortion(portion.GetPortionInfo());
index.AppendPortion(portion.MutablePortionInfoPtr());
}
}
return TConclusionStatus::Success();
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/changes/with_appended.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ void TChangesWithAppend::DoWriteIndexOnComplete(NColumnShard::TColumnShard* self
context.EngineLogs.AddCleanupPortion(i);
}
for (auto& portionBuilder : AppendedPortions) {
context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().GetPortionInfo());
context.EngineLogs.AppendPortion(portionBuilder.GetPortionResult().MutablePortionInfoPtr());
}
}
}
Expand Down
Loading

0 comments on commit 21fa904

Please sign in to comment.