Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

commit processing fixes #12519

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 23 additions & 8 deletions ydb/core/tx/columnshard/columnshard__progress_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
}

bool Execute(TTransactionContext& txc, const TActorContext& ctx) override {
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "execute");
Y_ABORT_UNLESS(Self->ProgressTxInFlight);
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tablet_id", Self->TabletID())(
"tx_state", "TTxProgressTx::Execute")("tx_current", Self->ProgressTxInFlight);
if (!Self->ProgressTxInFlight) {
AbortedThroughRemoveExpired = true;
return true;
}
Self->Counters.GetTabletCounters()->SetCounter(COUNTER_TX_COMPLETE_LAG, Self->GetTxCompleteLag().MilliSeconds());

const size_t removedCount = Self->ProgressTxController->CleanExpiredTxs(txc);
Expand All @@ -45,15 +48,24 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
const auto plannedItem = Self->ProgressTxController->GetFirstPlannedTx();
if (!!plannedItem) {
PlannedQueueItem.emplace(plannedItem->PlanStep, plannedItem->TxId);
ui64 step = plannedItem->PlanStep;
ui64 txId = plannedItem->TxId;
const ui64 step = plannedItem->PlanStep;
const ui64 txId = plannedItem->TxId;
NActors::TLogContextGuard logGuardTx = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)("tx_id", txId);
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart");
TxOperator = Self->ProgressTxController->GetTxOperatorVerified(txId);
if (auto txPrepare = TxOperator->BuildTxPrepareForProgress(Self)) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "BuildTxPrepareForProgress");
AbortedThroughRemoveExpired = true;
Self->ProgressTxInFlight = txId;
Self->Execute(txPrepare.release(), ctx);
return true;
} else if (TxOperator->IsInProgress()) {
AbortedThroughRemoveExpired = true;
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemContinue");
AFL_VERIFY(Self->ProgressTxInFlight == txId);
return true;
} else {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "PlannedItemStart")("details", "PopFirstPlannedTx");
Self->ProgressTxController->PopFirstPlannedTx();
}
StartExecution = TMonotonic::Now();
Expand All @@ -80,8 +92,9 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
if (AbortedThroughRemoveExpired) {
return;
}
NActors::TLogContextGuard logGuard =
NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("tablet_id", Self->TabletID())("tx_state", "complete");
NActors::TLogContextGuard logGuard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD_TX)(
"tablet_id", Self->TabletID())(
"tx_state", "TTxProgressTx::Complete");
if (TxOperator) {
TxOperator->ProgressOnComplete(*Self, ctx);
Self->RescheduleWaitingReads();
Expand All @@ -104,11 +117,13 @@ class TColumnShard::TTxProgressTx: public TTransactionBase<TColumnShard> {
};

void TColumnShard::EnqueueProgressTx(const TActorContext& ctx, const std::optional<ui64> continueTxId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD)("event", "EnqueueProgressTx")("tablet_id", TabletID());
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTx")("tablet_id", TabletID())("tx_id", continueTxId);
if (continueTxId) {
AFL_VERIFY(!ProgressTxInFlight || ProgressTxInFlight == continueTxId)("current", ProgressTxInFlight)("expected", continueTxId);
}
if (!ProgressTxInFlight || ProgressTxInFlight == continueTxId) {
AFL_DEBUG(NKikimrServices::TX_COLUMNSHARD_TX)("event", "EnqueueProgressTxStart")("tablet_id", TabletID())("tx_id", continueTxId)(
"tx_current", ProgressTxInFlight);
ProgressTxInFlight = continueTxId.value_or(0);
Execute(new TTxProgressTx(this), ctx);
}
Expand Down
10 changes: 5 additions & 5 deletions ydb/core/tx/columnshard/engines/column_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ void IColumnEngine::FetchDataAccessors(const std::shared_ptr<TDataAccessorsReque

TSelectInfo::TStats TSelectInfo::Stats() const {
TStats out;
out.Portions = PortionsOrderedPK.size();
out.Portions = Portions.size();

THashSet<TUnifiedBlobId> uniqBlob;
for (auto& portionInfo : PortionsOrderedPK) {
for (auto& portionInfo : Portions) {
out.Rows += portionInfo->GetRecordsCount();
for (auto& blobId : portionInfo->GetBlobIds()) {
out.Bytes += blobId.BlobSize();
Expand All @@ -53,10 +53,10 @@ TSelectInfo::TStats TSelectInfo::Stats() const {

TString TSelectInfo::DebugString() const {
TStringBuilder result;
result << "count:" << PortionsOrderedPK.size() << ";";
if (PortionsOrderedPK.size()) {
result << "count:" << Portions.size() << ";";
if (Portions.size()) {
result << "portions:";
for (auto& portionInfo : PortionsOrderedPK) {
for (auto& portionInfo : Portions) {
result << portionInfo->DebugString();
}
}
Expand Down
2 changes: 1 addition & 1 deletion ydb/core/tx/columnshard/engines/column_engine.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct TSelectInfo {
}
};

std::vector<std::shared_ptr<TPortionInfo>> PortionsOrderedPK;
std::vector<std::shared_ptr<TPortionInfo>> Portions;

TStats Stats() const;

Expand Down
24 changes: 14 additions & 10 deletions ydb/core/tx/columnshard/engines/column_engine_logs.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -502,18 +502,22 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
return out;
}

if (withUncommitted) {
for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
AFL_VERIFY(portionInfo->HasInsertWriteId());
AFL_VERIFY(!portionInfo->HasCommitSnapshot());
const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
"portion", portionInfo->DebugString());
if (skipPortion) {
for (const auto& [_, portionInfo] : spg->GetInsertedPortions()) {
AFL_VERIFY(portionInfo->HasInsertWriteId());
if (withUncommitted) {
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
continue;
}
out->PortionsOrderedPK.emplace_back(portionInfo);
} else if (!portionInfo->HasCommitSnapshot()) {
continue;
}
const bool skipPortion = !pkRangesFilter.IsPortionInUsage(*portionInfo);
AFL_TRACE(NKikimrServices::TX_COLUMNSHARD_SCAN)("event", skipPortion ? "portion_skipped" : "portion_selected")("pathId", pathId)(
"portion", portionInfo->DebugString());
if (skipPortion) {
continue;
}
out->Portions.emplace_back(portionInfo);
}
for (const auto& [_, portionInfo] : spg->GetPortions()) {
if (!portionInfo->IsVisible(snapshot, !withUncommitted)) {
Expand All @@ -525,7 +529,7 @@ std::shared_ptr<TSelectInfo> TColumnEngineForLogs::Select(
if (skipPortion) {
continue;
}
out->PortionsOrderedPK.emplace_back(portionInfo);
out->Portions.emplace_back(portionInfo);
}

return out;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ TConclusionStatus TReadMetadata::Init(

SelectInfo = dataAccessor.Select(readDescription, !!LockId);
if (LockId) {
for (auto&& i : SelectInfo->PortionsOrderedPK) {
for (auto&& i : SelectInfo->Portions) {
if (i->HasInsertWriteId() && !i->HasCommitSnapshot()) {
if (owner->HasLongTxWrites(i->GetInsertWriteIdVerified())) {
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ class TReadMetadata: public NCommon::TReadMetadata {
std::vector<TCommittedBlob> CommittedBlobs;
virtual bool Empty() const override {
Y_ABORT_UNLESS(SelectInfo);
return SelectInfo->PortionsOrderedPK.empty() && CommittedBlobs.empty();
return SelectInfo->Portions.empty() && CommittedBlobs.empty();
}

virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
const auto& portions = GetReadMetadata()->SelectInfo->Portions;
const auto& committed = GetReadMetadata()->CommittedBlobs;
ui64 compactedPortionsBytes = 0;
ui64 insertedPortionsBytes = 0;
Expand Down Expand Up @@ -49,7 +49,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);

auto& stats = GetReadMetadata()->ReadStats;
stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size();
stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
stats->CommittedBatches = GetReadMetadata()->CommittedBlobs.size();
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ class TReadMetadata: public NCommon::TReadMetadata {

virtual bool Empty() const override {
Y_ABORT_UNLESS(SelectInfo);
return SelectInfo->PortionsOrderedPK.empty();
return SelectInfo->Portions.empty();
}

virtual std::shared_ptr<IDataReader> BuildReader(const std::shared_ptr<TReadContext>& context) const override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
, SpecialReadContext(std::make_shared<TSpecialReadContext>(context)) {
ui32 sourceIdx = 0;
std::deque<std::shared_ptr<IDataSource>> sources;
const auto& portions = GetReadMetadata()->SelectInfo->PortionsOrderedPK;
const auto& portions = GetReadMetadata()->SelectInfo->Portions;
ui64 compactedPortionsBytes = 0;
ui64 insertedPortionsBytes = 0;
for (auto&& i : portions) {
Expand All @@ -26,7 +26,7 @@ TPlainReadData::TPlainReadData(const std::shared_ptr<TReadContext>& context)
Scanner = std::make_shared<TScanHead>(std::move(sources), SpecialReadContext);

auto& stats = GetReadMetadata()->ReadStats;
stats->IndexPortions = GetReadMetadata()->SelectInfo->PortionsOrderedPK.size();
stats->IndexPortions = GetReadMetadata()->SelectInfo->Portions.size();
stats->IndexBatches = GetReadMetadata()->NumIndexedBlobs();
stats->SchemaColumns = (*SpecialReadContext->GetProgramInputColumns() - *SpecialReadContext->GetSpecColumns()).GetColumnsCount();
stats->InsertedPortionsBytes = insertedPortionsBytes;
Expand Down
22 changes: 11 additions & 11 deletions ydb/core/tx/columnshard/engines/ut/ut_logs_engine.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -566,28 +566,28 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
ui64 planStep = 1;
ui64 txId = 0;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}

{ // select from snap between insert (greater txId)
ui64 planStep = 1;
ui64 txId = 2;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}

{ // select from snap after insert (greater planStep)
ui64 planStep = 2;
ui64 txId = 1;
auto selectInfo = engine.Select(paths[0], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 1);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 1);
}

{ // select another pathId
ui64 planStep = 2;
ui64 txId = 1;
auto selectInfo = engine.Select(paths[1], TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 0);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 0);
}
}

Expand Down Expand Up @@ -657,7 +657,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
}

// predicates
Expand All @@ -671,7 +671,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
NOlap::TPKRangesFilter pkFilter(false);
Y_ABORT_UNLESS(pkFilter.Add(gt10k, nullptr, indexInfo.GetReplaceKey()));
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
}

{
Expand All @@ -683,7 +683,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
NOlap::TPKRangesFilter pkFilter(false);
Y_ABORT_UNLESS(pkFilter.Add(nullptr, lt10k, indexInfo.GetReplaceKey()));
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), pkFilter, false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 9);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 9);
}
}

Expand Down Expand Up @@ -841,7 +841,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
}

// Cleanup
Expand All @@ -850,7 +850,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 20);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 20);
}

// TTL
Expand All @@ -866,7 +866,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
}
}
{
Expand All @@ -882,7 +882,7 @@ Y_UNIT_TEST_SUITE(TColumnEngineTestLogs) {
{ // full scan
ui64 txId = 1;
auto selectInfo = engine.Select(pathId, TSnapshot(planStep, txId), NOlap::TPKRangesFilter(false), false);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->PortionsOrderedPK.size(), 10);
UNIT_ASSERT_VALUES_EQUAL(selectInfo->Portions.size(), 10);
}
}
}
Expand Down
Loading
Loading