Skip to content

Commit

Permalink
clean useless case for evwrite (ydb-platform#9716)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent 0aabe75 commit 7a3a76f
Show file tree
Hide file tree
Showing 13 changed files with 529 additions and 573 deletions.
11 changes: 0 additions & 11 deletions ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -97,17 +97,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) {
if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) {
auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID());
Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie());
} else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) {
NKikimrTxColumnShard::TCommitWriteTxBody proto;
proto.SetLockId(operation->GetLockId());
TString txBody;
Y_ABORT_UNLESS(proto.SerializeToString(&txBody));
auto op = Self->GetProgressTxController().StartProposeOnExecute(
TTxController::TTxInfo(
NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId(), writeMeta.GetSource(), operation->GetCookie(), {}),
txBody, txc);
AFL_VERIFY(!op->IsFail());
ResultOperators.emplace_back(op);
} else {
auto& info = Self->OperationsManager->GetLockVerified(operation->GetLockId());
NKikimrDataEvents::TLock lock;
Expand Down
18 changes: 8 additions & 10 deletions ydb/core/tx/columnshard/columnshard__write.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -462,12 +462,12 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
if (conclusionParse.IsFail()) {
sendError(conclusionParse.GetErrorMessage(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
} else {
if (commitOperation->NeedSyncLocks()) {
auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId());
if (!lockInfo) {
sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()),
NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED);
} else {
auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId());
if (!lockInfo) {
sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()),
NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST);
} else {
if (commitOperation->NeedSyncLocks()) {
if (lockInfo->GetGeneration() != commitOperation->GetGeneration()) {
sendError("tablet lock have another generation: " + ::ToString(lockInfo->GetGeneration()) + " != " +
::ToString(commitOperation->GetGeneration()), NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN);
Expand All @@ -479,9 +479,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
} else {
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
}
} else {
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
}
} else {
Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx);
}
}
return;
Expand Down Expand Up @@ -559,8 +559,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor
ui64 lockId = 0;
if (behaviour == EOperationBehaviour::NoTxWrite) {
lockId = BuildEphemeralTxId();
} else if (behaviour == EOperationBehaviour::InTxWrite) {
lockId = record.GetTxId();
} else {
lockId = record.GetLockTxId();
}
Expand Down
3 changes: 0 additions & 3 deletions ydb/core/tx/columnshard/operations/manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -255,9 +255,6 @@ TConclusion<EOperationBehaviour> TOperationsManager::GetBehaviour(const NEvents:
return EOperationBehaviour::NoTxWrite;
}

if (evWrite.Record.HasTxId() && evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_PREPARE) {
return EOperationBehaviour::InTxWrite;
}
AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("proto", evWrite.Record.DebugString())("event", "undefined behaviour");
return TConclusionStatus::Fail("undefined request for detect tx type");
}
Expand Down
1 change: 0 additions & 1 deletion ydb/core/tx/columnshard/operations/write.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ enum class EOperationStatus : ui32 {

enum class EOperationBehaviour : ui32 {
Undefined = 1,
InTxWrite = 2,
WriteWithLock = 3,
CommitWriteLock = 4,
AbortWriteLock = 5,
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString
return (res.GetStatus() == NKikimrTxColumnShard::PREPARED);
}

void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap) {
void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap) {
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
auto tx = plan->Record.AddTransactions();
tx->SetTxId(snap.GetTxId());
Expand All @@ -78,7 +78,7 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot
UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS);
}

void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult) {
auto plan = std::make_unique<TEvTxProcessing::TEvPlanStep>(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0);
auto tx = plan->Record.AddTransactions();
tx->SetTxId(snap.GetTxId());
Expand Down Expand Up @@ -229,7 +229,7 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, con
PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds);
}

void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId) {
void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId) {
auto wakeup = std::make_unique<TEvPrivate::TEvPeriodicWakeup>(true);
ForwardToTablet(runtime, shardId, sender, wakeup.release());
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h
Original file line number Diff line number Diff line change
Expand Up @@ -402,9 +402,9 @@ struct TTestSchema {

bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap);
void ProvideTieringSnapshot(TTestBasicRuntime& runtime, const TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot);
void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap);
void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap);

void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);
void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true);

bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data,
const std::vector<NArrow::NTest::TTestColumn>& ydbSchema, std::vector<ui64>* writeIds,
Expand Down Expand Up @@ -435,7 +435,7 @@ inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planSt
PlanCommit(runtime, sender, planStep, ids);
}

void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId);
void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId);

struct TTestBlobOptions {
THashSet<TString> NullColumns;
Expand Down
101 changes: 101 additions & 0 deletions ydb/core/tx/columnshard/test_helper/shard_reader.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
#include "shard_reader.h"

namespace NKikimr::NTxUT {

std::unique_ptr<NKikimr::TEvDataShard::TEvKqpScan> TShardReader::BuildStartEvent() const {
auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
ev->Record.SetLocalPathId(PathId);
ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep());
ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId());

ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL);
ev->Record.SetTxId(Snapshot.GetTxId());

ev->Record.SetReverse(Reverse);
ev->Record.SetItemsLimit(Limit);

ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);

auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(Ranges.size());
for (auto& range : Ranges) {
auto newRange = protoRanges->Add();
range.Serialize(*newRange);
}

if (ProgramProto) {
NKikimrSSA::TOlapProgram olapProgram;
{
TString programBytes;
TStringOutput stream(programBytes);
ProgramProto->SerializeToArcadiaStream(&stream);
olapProgram.SetProgram(programBytes);
}
{
TString programBytes;
TStringOutput stream(programBytes);
olapProgram.SerializeToArcadiaStream(&stream);
ev->Record.SetOlapProgram(programBytes);
}
ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
} else if (SerializedProgram) {
ev->Record.SetOlapProgram(*SerializedProgram);
ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS);
}

return ev;
}

NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumns(const std::vector<TString>& replyColumns) {
AFL_VERIFY(!SerializedProgram);
if (!ProgramProto) {
ProgramProto = NKikimrSSA::TProgram();
}
for (auto&& command : *ProgramProto->MutableCommand()) {
if (command.HasProjection()) {
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumns) {
proj.AddColumns()->SetName(i);
}
*command.MutableProjection() = proj;
return *this;
}
}
{
auto* command = ProgramProto->AddCommand();
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumns) {
proj.AddColumns()->SetName(i);
}
*command->MutableProjection() = proj;
}
return *this;
}

NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) {
AFL_VERIFY(!SerializedProgram);
if (!ProgramProto) {
ProgramProto = NKikimrSSA::TProgram();
}
for (auto&& command : *ProgramProto->MutableCommand()) {
if (command.HasProjection()) {
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumnIds) {
proj.AddColumns()->SetId(i);
}
*command.MutableProjection() = proj;
return *this;
}
}
{
auto* command = ProgramProto->AddCommand();
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumnIds) {
proj.AddColumns()->SetId(i);
}
*command->MutableProjection() = proj;
}
return *this;
}

}
100 changes: 3 additions & 97 deletions ydb/core/tx/columnshard/test_helper/shard_reader.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,53 +28,7 @@ class TShardReader {
std::vector<TString> ReplyColumns;
std::vector<TSerializedTableRange> Ranges;

std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const {
auto ev = std::make_unique<TEvDataShard::TEvKqpScan>();
ev->Record.SetLocalPathId(PathId);
ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep());
ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId());

ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL);
ev->Record.SetTxId(Snapshot.GetTxId());

ev->Record.SetReverse(Reverse);
ev->Record.SetItemsLimit(Limit);

ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW);

auto protoRanges = ev->Record.MutableRanges();
protoRanges->Reserve(Ranges.size());
for (auto& range : Ranges) {
auto newRange = protoRanges->Add();
range.Serialize(*newRange);
}

if (ProgramProto) {
NKikimrSSA::TOlapProgram olapProgram;
{
TString programBytes;
TStringOutput stream(programBytes);
ProgramProto->SerializeToArcadiaStream(&stream);
olapProgram.SetProgram(programBytes);
}
{
TString programBytes;
TStringOutput stream(programBytes);
olapProgram.SerializeToArcadiaStream(&stream);
ev->Record.SetOlapProgram(programBytes);
}
ev->Record.SetOlapProgramType(
NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
);
} else if (SerializedProgram) {
ev->Record.SetOlapProgram(*SerializedProgram);
ev->Record.SetOlapProgramType(
NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS
);
}

return ev;
}
std::unique_ptr<TEvDataShard::TEvKqpScan> BuildStartEvent() const;

std::vector<std::shared_ptr<arrow::RecordBatch>> ResultBatches;
YDB_READONLY(ui32, IterationsCount, 0);
Expand All @@ -100,57 +54,9 @@ class TShardReader {
return r ? r->num_rows() : 0;
}

TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns) {
AFL_VERIFY(!SerializedProgram);
if (!ProgramProto) {
ProgramProto = NKikimrSSA::TProgram();
}
for (auto&& command : *ProgramProto->MutableCommand()) {
if (command.HasProjection()) {
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumns) {
proj.AddColumns()->SetName(i);
}
*command.MutableProjection() = proj;
return *this;
}
}
{
auto* command = ProgramProto->AddCommand();
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumns) {
proj.AddColumns()->SetName(i);
}
*command->MutableProjection() = proj;
}
return *this;
}
TShardReader& SetReplyColumns(const std::vector<TString>& replyColumns);

TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds) {
AFL_VERIFY(!SerializedProgram);
if (!ProgramProto) {
ProgramProto = NKikimrSSA::TProgram();
}
for (auto&& command : *ProgramProto->MutableCommand()) {
if (command.HasProjection()) {
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumnIds) {
proj.AddColumns()->SetId(i);
}
*command.MutableProjection() = proj;
return *this;
}
}
{
auto* command = ProgramProto->AddCommand();
NKikimrSSA::TProgram::TProjection proj;
for (auto&& i : replyColumnIds) {
proj.AddColumns()->SetId(i);
}
*command->MutableProjection() = proj;
}
return *this;
}
TShardReader& SetReplyColumnIds(const std::vector<ui32>& replyColumnIds);

TShardReader& SetProgram(const NKikimrSSA::TProgram& p) {
AFL_VERIFY(!ProgramProto);
Expand Down
Loading

0 comments on commit 7a3a76f

Please sign in to comment.