Skip to content

Commit

Permalink
commit processing fixes (ydb-platform#12519)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent 53b0ce9 commit ff30e1b
Show file tree
Hide file tree
Showing 15 changed files with 111 additions and 80 deletions.
31 changes: 23 additions & 8 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
}

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
Y_ABORT_UNLESS(Self->ProgressTxInFlight);
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tablet_id", Self->TabletID())(
"tx_state", "TTxProgressTx::Execute")("tx_current", Self->ProgressTxInFlight);
if (!Self->ProgressTxInFlight) {
AbortedThroughRemoveExpired = true;
return true;
}
Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TX_COMPLETE_LAG, Self->GetTxCompleteLag().MilliSeconds());

const size_t removedCount = Self->ProgressTxController->CleanExpiredTxs(txc);
Expand All @@ -45,15 +48,24 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
const auto plannedItem = Self->ProgressTxController->GetFirstPlannedTx();
if (!!plannedItem) {
PlannedQueueItem.emplace(plannedItem->PlanStep, plannedItem->TxId);
ui64 step = plannedItem->PlanStep;
ui64 txId = plannedItem->TxId;
const ui64 step = plannedItem->PlanStep;
const ui64 txId = plannedItem->TxId;
NActors::TLogContextGuard logGuardTx = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tx_id", txId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart");
TxOperator = Self->ProgressTxController->GetTxOperatorVerified(txId);
if (auto txPrepare = TxOperator->BuildTxPrepareForProgress(Self)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "BuildTxPrepareForProgress");
AbortedThroughRemoveExpired = true;
Self->ProgressTxInFlight = txId;
Self->Execute(txPrepare.release(), ctx);
return true;
} else if (TxOperator->IsInProgress()) {
AbortedThroughRemoveExpired = true;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemContinue");
AFL_VERIFY(Self->ProgressTxInFlight == txId);
return true;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "PopFirstPlannedTx");
Self->ProgressTxController->PopFirstPlannedTx();
}
StartExecution = TMonotonic::Now();
Expand All @@ -80,8 +92,9 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
if (AbortedThroughRemoveExpired) {
return;
}
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)(
"tablet_id", Self->TabletID())(
"tx_state", "TTxProgressTx::Complete");
if (TxOperator) {
TxOperator->ProgressOnComplete(*Self, ctx);
Self->RescheduleWaitingReads();
Expand All @@ -104,11 +117,13 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
};

void TColumnShard::EnqueueProgressTx(const TActorContext& ctx, const std::optional<ui64> continueTxId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "EnqueueProgressTx")("tablet_id", TabletID());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTx")("tablet_id", TabletID())("tx_id", continueTxId);
if (continueTxId) {
AFL_VERIFY(!ProgressTxInFlight || ProgressTxInFlight == continueTxId)("current", ProgressTxInFlight)("expected", continueTxId);
}
if (!ProgressTxInFlight || ProgressTxInFlight == continueTxId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTxStart")("tablet_id", TabletID())("tx_id", continueTxId)(
"tx_current", ProgressTxInFlight);
ProgressTxInFlight = continueTxId.value_or(0);
Execute(new TTxProgressTx(this), ctx);
}
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/column_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ void IColumnEngine::FetchDataAccessors(const std::shared_ptr<TDataAccessorsReque

TSelectInfo::TStats TSelectInfo::Stats() const {
TStats out;
out.Portions = PortionsOrderedPK.size();
out.Portions = Portions.size();

THashSet<TUnifiedBlobId> uniqBlob;
for (auto& portionInfo : PortionsOrderedPK) {
for (auto& portionInfo : Portions) {
out.Rows += portionInfo->GetRecordsCount();
for (auto& blobId : portionInfo->GetBlobIds()) {
out.Bytes += blobId.BlobSize();
Expand All @@ -53,10 +53,10 @@ TSelectInfo::TStats TSelectInfo::Stats() const {

TString TSelectInfo::DebugString() const {
TStringBuilder result;
result << "count:" << PortionsOrderedPK.size() << ";";
if (PortionsOrderedPK.size()) {
result << "count:" << Portions.size() << ";";
if (Portions.size()) {
result << "portions:";
for (auto& portionInfo : PortionsOrderedPK) {
for (auto& portionInfo : Portions) {
result << portionInfo->DebugString();
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct TSelectInfo {
}
};

std::vector<std::shared_ptr<TPortionInfo>> PortionsOrderedPK;
std::vector<std::shared_ptr<TPortionInfo>> Portions;

TStats Stats() const;

Expand Down
24 changes: 14 additions & 10 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,18 +502,22 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
return out;
}

if (withUncommitted) {
for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
AFL_VERIFY(portionInfo->HasInsertWriteId());
AFL_VERIFY(!portionInfo->HasCommitSnapshot());
const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
"portion", portionInfo->DebugString());
if (skipPortion) {
for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
AFL_VERIFY(portionInfo->HasInsertWriteId());
if (withUncommitted) {
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
continue;
}
out->PortionsOrderedPK.emplace_back(portionInfo);
} else if (!portionInfo->HasCommitSnapshot()) {
continue;
}
const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
"portion", portionInfo->DebugString());
if (skipPortion) {
continue;
}
out->Portions.emplace_back(portionInfo);
}
for (const auto& [_, portionInfo] : spg->GetPortions()) {
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
Expand All @@ -525,7 +529,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
if (skipPortion) {
continue;
}
out->PortionsOrderedPK.emplace_back(portionInfo);
out->Portions.emplace_back(portionInfo);
}

return out;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ TConclusionStatus TReadMetadata::Init(

SelectInfo = dataAccessor.Select(readDescription, !!LockId);
if (LockId) {
for (auto&& i : SelectInfo->PortionsOrderedPK) {
for (auto&& i : SelectInfo->Portions) {
if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) {
if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) {
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TReadMetadata: public NCommon::TReadMetadata {
std::vector<TCommittedBlob> CommittedBlobs;
virtual bool Empty() const override {
Y_ABORT_UNLESS(SelectInfo);
return SelectInfo->PortionsOrderedPK.empty() && CommittedBlobs.empty();
return SelectInfo->Portions.empty() && CommittedBlobs.empty();
}

virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
const auto& portions = GetReadMetadata()->SelectInfo->Portions;
const auto& committed = GetReadMetadata()->CommittedBlobs;
ui64 compactedPortionsBytes = 0;
ui64 insertedPortionsBytes = 0;
Expand Down Expand Up @@ -49,7 +49,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);

auto& stats = GetReadMetadata()->ReadStats;
stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size();
stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
stats->CommittedBatches = GetReadMetadata()->CommittedBlobs.size();
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TReadMetadata: public NCommon::TReadMetadata {

virtual bool Empty() const override {
Y_ABORT_UNLESS(SelectInfo);
return SelectInfo->PortionsOrderedPK.empty();
return SelectInfo->Portions.empty();
}

virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
const auto& portions = GetReadMetadata()->SelectInfo->Portions;
ui64 compactedPortionsBytes = 0;
ui64 insertedPortionsBytes = 0;
for (auto&& i : portions) {
Expand All @@ -26,7 +26,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);

auto& stats = GetReadMetadata()->ReadStats;
stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size();
stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
stats->InsertedPortionsBytes = insertedPortionsBytes;
Expand Down
22 changes: 11 additions & 11 deletions ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,28 +566,28 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
ui64 planStep = 1;
ui64 txId = 0;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}

{ // select from snap between insert (greater txId)
ui64 planStep = 1;
ui64 txId = 2;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}

{ // select from snap after insert (greater planStep)
ui64 planStep = 2;
ui64 txId = 1;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1);
}

{ // select another pathId
ui64 planStep = 2;
ui64 txId = 1;
auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}
}

Expand Down Expand Up @@ -657,7 +657,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
}

// predicates
Expand All @@ -671,7 +671,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
NOlap::TPKRangesFilter pkFilter(false);
Y_ABORT_UNLESS(pkFilter.Add(gt10k, nullptr, indexInfo.GetReplaceKey()));
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
}

{
Expand All @@ -683,7 +683,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
NOlap::TPKRangesFilter pkFilter(false);
Y_ABORT_UNLESS(pkFilter.Add(nullptr, lt10k, indexInfo.GetReplaceKey()));
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 9);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 9);
}
}

Expand Down Expand Up @@ -841,7 +841,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
}

// Cleanup
Expand All @@ -850,7 +850,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
}

// TTL
Expand All @@ -866,7 +866,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
}
}
{
Expand All @@ -882,7 +882,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
}
}
}
Expand Down
Loading

0 comments on commit ff30e1b

Please sign in to comment.