From 7a3a76f0bfe7039e0a65df4a919f12e7c821fa8b Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Wed, 25 Sep 2024 13:37:48 +0300 Subject: [PATCH] clean useless case for evwrite (#9716) --- .../blobs_action/transaction/tx_write.cpp | 11 - .../tx/columnshard/columnshard__write.cpp | 18 +- .../tx/columnshard/operations/manager.cpp | 3 - ydb/core/tx/columnshard/operations/write.h | 1 - .../test_helper/columnshard_ut_common.cpp | 6 +- .../test_helper/columnshard_ut_common.h | 6 +- .../columnshard/test_helper/shard_reader.cpp | 101 +++ .../tx/columnshard/test_helper/shard_reader.h | 100 +-- .../columnshard/test_helper/shard_writer.cpp | 63 ++ .../tx/columnshard/test_helper/shard_writer.h | 43 ++ ydb/core/tx/columnshard/test_helper/ya.make | 2 + .../ut_rw/ut_columnshard_read_write.cpp | 620 +++++++----------- .../tx/columnshard/ut_rw/ut_normalizer.cpp | 128 ++-- 13 files changed, 529 insertions(+), 573 deletions(-) create mode 100644 ydb/core/tx/columnshard/test_helper/shard_reader.cpp create mode 100644 ydb/core/tx/columnshard/test_helper/shard_writer.cpp create mode 100644 ydb/core/tx/columnshard/test_helper/shard_writer.h diff --git a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp index 3fab87ca4b5f..5b66a0587b5e 100644 --- a/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp +++ b/ydb/core/tx/columnshard/blobs_action/transaction/tx_write.cpp @@ -97,17 +97,6 @@ bool TTxWrite::Execute(TTransactionContext& txc, const TActorContext&) { if (operation->GetBehaviour() == EOperationBehaviour::NoTxWrite) { auto ev = NEvents::TDataEvents::TEvWriteResult::BuildCompleted(Self->TabletID()); Results.emplace_back(std::move(ev), writeMeta.GetSource(), operation->GetCookie()); - } else if (operation->GetBehaviour() == EOperationBehaviour::InTxWrite) { - NKikimrTxColumnShard::TCommitWriteTxBody proto; - proto.SetLockId(operation->GetLockId()); - TString txBody; - Y_ABORT_UNLESS(proto.SerializeToString(&txBody)); - auto op = Self->GetProgressTxController().StartProposeOnExecute( - TTxController::TTxInfo( - NKikimrTxColumnShard::TX_KIND_COMMIT_WRITE, operation->GetLockId(), writeMeta.GetSource(), operation->GetCookie(), {}), - txBody, txc); - AFL_VERIFY(!op->IsFail()); - ResultOperators.emplace_back(op); } else { auto& info = Self->OperationsManager->GetLockVerified(operation->GetLockId()); NKikimrDataEvents::TLock lock; diff --git a/ydb/core/tx/columnshard/columnshard__write.cpp b/ydb/core/tx/columnshard/columnshard__write.cpp index bab3ca229fdd..bc430cda234d 100644 --- a/ydb/core/tx/columnshard/columnshard__write.cpp +++ b/ydb/core/tx/columnshard/columnshard__write.cpp @@ -462,12 +462,12 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor if (conclusionParse.IsFail()) { sendError(conclusionParse.GetErrorMessage(), NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST); } else { - if (commitOperation->NeedSyncLocks()) { - auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId()); - if (!lockInfo) { - sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()), - NKikimrDataEvents::TEvWriteResult::STATUS_ABORTED); - } else { + auto* lockInfo = OperationsManager->GetLockOptional(commitOperation->GetLockId()); + if (!lockInfo) { + sendError("haven't lock for commit: " + ::ToString(commitOperation->GetLockId()), + NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST); + } else { + if (commitOperation->NeedSyncLocks()) { if (lockInfo->GetGeneration() != commitOperation->GetGeneration()) { sendError("tablet lock have another generation: " + ::ToString(lockInfo->GetGeneration()) + " != " + ::ToString(commitOperation->GetGeneration()), NKikimrDataEvents::TEvWriteResult::STATUS_LOCKS_BROKEN); @@ -479,9 +479,9 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor } else { Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx); } + } else { + Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx); } - } else { - Execute(new TProposeWriteTransaction(this, commitOperation, source, cookie), ctx); } } return; @@ -559,8 +559,6 @@ void TColumnShard::Handle(NEvents::TDataEvents::TEvWrite::TPtr& ev, const TActor ui64 lockId = 0; if (behaviour == EOperationBehaviour::NoTxWrite) { lockId = BuildEphemeralTxId(); - } else if (behaviour == EOperationBehaviour::InTxWrite) { - lockId = record.GetTxId(); } else { lockId = record.GetLockTxId(); } diff --git a/ydb/core/tx/columnshard/operations/manager.cpp b/ydb/core/tx/columnshard/operations/manager.cpp index 1527ec5d028d..2fdb5d0e181b 100644 --- a/ydb/core/tx/columnshard/operations/manager.cpp +++ b/ydb/core/tx/columnshard/operations/manager.cpp @@ -255,9 +255,6 @@ TConclusion TOperationsManager::GetBehaviour(const NEvents: return EOperationBehaviour::NoTxWrite; } - if (evWrite.Record.HasTxId() && evWrite.Record.GetTxMode() == NKikimrDataEvents::TEvWrite::MODE_PREPARE) { - return EOperationBehaviour::InTxWrite; - } AFL_WARN(NKikimrServices::TX_COLUMNSHARD)("proto", evWrite.Record.DebugString())("event", "undefined behaviour"); return TConclusionStatus::Fail("undefined request for detect tx type"); } diff --git a/ydb/core/tx/columnshard/operations/write.h b/ydb/core/tx/columnshard/operations/write.h index 5251402347c0..f806a313ec49 100644 --- a/ydb/core/tx/columnshard/operations/write.h +++ b/ydb/core/tx/columnshard/operations/write.h @@ -37,7 +37,6 @@ enum class EOperationStatus : ui32 { enum class EOperationBehaviour : ui32 { Undefined = 1, - InTxWrite = 2, WriteWithLock = 3, CommitWriteLock = 4, AbortWriteLock = 5, diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp index 31de6ffef8a5..3ebdb21fb67e 100644 --- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.cpp @@ -63,7 +63,7 @@ bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString return (res.GetStatus() == NKikimrTxColumnShard::PREPARED); } -void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap) { +void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap) { auto plan = std::make_unique(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0); auto tx = plan->Record.AddTransactions(); tx->SetTxId(snap.GetTxId()); @@ -78,7 +78,7 @@ void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot UNIT_ASSERT_EQUAL(res.GetStatus(), NKikimrTxColumnShard::SUCCESS); } -void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult) { +void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult) { auto plan = std::make_unique(snap.GetPlanStep(), 0, TTestTxConfig::TxTablet0); auto tx = plan->Record.AddTransactions(); tx->SetTxId(snap.GetTxId()); @@ -229,7 +229,7 @@ void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planStep, con PlanCommit(runtime, sender, TTestTxConfig::TxTablet0, planStep, txIds); } -void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId) { +void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId) { auto wakeup = std::make_unique(true); ForwardToTablet(runtime, shardId, sender, wakeup.release()); } diff --git a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h index 7594be5da952..8a8369252829 100644 --- a/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h +++ b/ydb/core/tx/columnshard/test_helper/columnshard_ut_common.h @@ -402,9 +402,9 @@ struct TTestSchema { bool ProposeSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, const TString& txBody, NOlap::TSnapshot snap); void ProvideTieringSnapshot(TTestBasicRuntime& runtime, const TActorId& sender, NMetadata::NFetcher::ISnapshot::TPtr snapshot); -void PlanSchemaTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap); +void PlanSchemaTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap); -void PlanWriteTx(TTestBasicRuntime& runtime, TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true); +void PlanWriteTx(TTestBasicRuntime& runtime, const TActorId& sender, NOlap::TSnapshot snap, bool waitResult = true); bool WriteData(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId, const ui64 writeId, const ui64 tableId, const TString& data, const std::vector& ydbSchema, std::vector* writeIds, @@ -435,7 +435,7 @@ inline void PlanCommit(TTestBasicRuntime& runtime, TActorId& sender, ui64 planSt PlanCommit(runtime, sender, planStep, ids); } -void Wakeup(TTestBasicRuntime& runtime, TActorId& sender, const ui64 shardId); +void Wakeup(TTestBasicRuntime& runtime, const TActorId& sender, const ui64 shardId); struct TTestBlobOptions { THashSet NullColumns; diff --git a/ydb/core/tx/columnshard/test_helper/shard_reader.cpp b/ydb/core/tx/columnshard/test_helper/shard_reader.cpp new file mode 100644 index 000000000000..6b3ce1a5a1b4 --- /dev/null +++ b/ydb/core/tx/columnshard/test_helper/shard_reader.cpp @@ -0,0 +1,101 @@ +#include "shard_reader.h" + +namespace NKikimr::NTxUT { + +std::unique_ptr TShardReader::BuildStartEvent() const { + auto ev = std::make_unique(); + ev->Record.SetLocalPathId(PathId); + ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep()); + ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId()); + + ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL); + ev->Record.SetTxId(Snapshot.GetTxId()); + + ev->Record.SetReverse(Reverse); + ev->Record.SetItemsLimit(Limit); + + ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW); + + auto protoRanges = ev->Record.MutableRanges(); + protoRanges->Reserve(Ranges.size()); + for (auto& range : Ranges) { + auto newRange = protoRanges->Add(); + range.Serialize(*newRange); + } + + if (ProgramProto) { + NKikimrSSA::TOlapProgram olapProgram; + { + TString programBytes; + TStringOutput stream(programBytes); + ProgramProto->SerializeToArcadiaStream(&stream); + olapProgram.SetProgram(programBytes); + } + { + TString programBytes; + TStringOutput stream(programBytes); + olapProgram.SerializeToArcadiaStream(&stream); + ev->Record.SetOlapProgram(programBytes); + } + ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS); + } else if (SerializedProgram) { + ev->Record.SetOlapProgram(*SerializedProgram); + ev->Record.SetOlapProgramType(NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS); + } + + return ev; +} + +NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumns(const std::vector& replyColumns) { + AFL_VERIFY(!SerializedProgram); + if (!ProgramProto) { + ProgramProto = NKikimrSSA::TProgram(); + } + for (auto&& command : *ProgramProto->MutableCommand()) { + if (command.HasProjection()) { + NKikimrSSA::TProgram::TProjection proj; + for (auto&& i : replyColumns) { + proj.AddColumns()->SetName(i); + } + *command.MutableProjection() = proj; + return *this; + } + } + { + auto* command = ProgramProto->AddCommand(); + NKikimrSSA::TProgram::TProjection proj; + for (auto&& i : replyColumns) { + proj.AddColumns()->SetName(i); + } + *command->MutableProjection() = proj; + } + return *this; +} + +NKikimr::NTxUT::TShardReader& TShardReader::SetReplyColumnIds(const std::vector& replyColumnIds) { + AFL_VERIFY(!SerializedProgram); + if (!ProgramProto) { + ProgramProto = NKikimrSSA::TProgram(); + } + for (auto&& command : *ProgramProto->MutableCommand()) { + if (command.HasProjection()) { + NKikimrSSA::TProgram::TProjection proj; + for (auto&& i : replyColumnIds) { + proj.AddColumns()->SetId(i); + } + *command.MutableProjection() = proj; + return *this; + } + } + { + auto* command = ProgramProto->AddCommand(); + NKikimrSSA::TProgram::TProjection proj; + for (auto&& i : replyColumnIds) { + proj.AddColumns()->SetId(i); + } + *command->MutableProjection() = proj; + } + return *this; +} + +} diff --git a/ydb/core/tx/columnshard/test_helper/shard_reader.h b/ydb/core/tx/columnshard/test_helper/shard_reader.h index 2beaa5a782d9..eb9f041062a6 100644 --- a/ydb/core/tx/columnshard/test_helper/shard_reader.h +++ b/ydb/core/tx/columnshard/test_helper/shard_reader.h @@ -28,53 +28,7 @@ class TShardReader { std::vector ReplyColumns; std::vector Ranges; - std::unique_ptr BuildStartEvent() const { - auto ev = std::make_unique(); - ev->Record.SetLocalPathId(PathId); - ev->Record.MutableSnapshot()->SetStep(Snapshot.GetPlanStep()); - ev->Record.MutableSnapshot()->SetTxId(Snapshot.GetTxId()); - - ev->Record.SetStatsMode(NYql::NDqProto::DQ_STATS_MODE_FULL); - ev->Record.SetTxId(Snapshot.GetTxId()); - - ev->Record.SetReverse(Reverse); - ev->Record.SetItemsLimit(Limit); - - ev->Record.SetDataFormat(NKikimrDataEvents::FORMAT_ARROW); - - auto protoRanges = ev->Record.MutableRanges(); - protoRanges->Reserve(Ranges.size()); - for (auto& range : Ranges) { - auto newRange = protoRanges->Add(); - range.Serialize(*newRange); - } - - if (ProgramProto) { - NKikimrSSA::TOlapProgram olapProgram; - { - TString programBytes; - TStringOutput stream(programBytes); - ProgramProto->SerializeToArcadiaStream(&stream); - olapProgram.SetProgram(programBytes); - } - { - TString programBytes; - TStringOutput stream(programBytes); - olapProgram.SerializeToArcadiaStream(&stream); - ev->Record.SetOlapProgram(programBytes); - } - ev->Record.SetOlapProgramType( - NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS - ); - } else if (SerializedProgram) { - ev->Record.SetOlapProgram(*SerializedProgram); - ev->Record.SetOlapProgramType( - NKikimrSchemeOp::EOlapProgramType::OLAP_PROGRAM_SSA_PROGRAM_WITH_PARAMETERS - ); - } - - return ev; - } + std::unique_ptr BuildStartEvent() const; std::vector> ResultBatches; YDB_READONLY(ui32, IterationsCount, 0); @@ -100,57 +54,9 @@ class TShardReader { return r ? r->num_rows() : 0; } - TShardReader& SetReplyColumns(const std::vector& replyColumns) { - AFL_VERIFY(!SerializedProgram); - if (!ProgramProto) { - ProgramProto = NKikimrSSA::TProgram(); - } - for (auto&& command : *ProgramProto->MutableCommand()) { - if (command.HasProjection()) { - NKikimrSSA::TProgram::TProjection proj; - for (auto&& i : replyColumns) { - proj.AddColumns()->SetName(i); - } - *command.MutableProjection() = proj; - return *this; - } - } - { - auto* command = ProgramProto->AddCommand(); - NKikimrSSA::TProgram::TProjection proj; - for (auto&& i : replyColumns) { - proj.AddColumns()->SetName(i); - } - *command->MutableProjection() = proj; - } - return *this; - } + TShardReader& SetReplyColumns(const std::vector& replyColumns); - TShardReader& SetReplyColumnIds(const std::vector& replyColumnIds) { - AFL_VERIFY(!SerializedProgram); - if (!ProgramProto) { - ProgramProto = NKikimrSSA::TProgram(); - } - for (auto&& command : *ProgramProto->MutableCommand()) { - if (command.HasProjection()) { - NKikimrSSA::TProgram::TProjection proj; - for (auto&& i : replyColumnIds) { - proj.AddColumns()->SetId(i); - } - *command.MutableProjection() = proj; - return *this; - } - } - { - auto* command = ProgramProto->AddCommand(); - NKikimrSSA::TProgram::TProjection proj; - for (auto&& i : replyColumnIds) { - proj.AddColumns()->SetId(i); - } - *command->MutableProjection() = proj; - } - return *this; - } + TShardReader& SetReplyColumnIds(const std::vector& replyColumnIds); TShardReader& SetProgram(const NKikimrSSA::TProgram& p) { AFL_VERIFY(!ProgramProto); diff --git a/ydb/core/tx/columnshard/test_helper/shard_writer.cpp b/ydb/core/tx/columnshard/test_helper/shard_writer.cpp new file mode 100644 index 000000000000..92e262d2f776 --- /dev/null +++ b/ydb/core/tx/columnshard/test_helper/shard_writer.cpp @@ -0,0 +1,63 @@ +#include "shard_writer.h" + +#include +#include +#include +#include +#include + +namespace NKikimr::NTxUT { + +NKikimrDataEvents::TEvWriteResult::EStatus TShardWriter::StartCommit(const ui64 txId) { + auto evCommit = std::make_unique(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + evCommit->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); + auto* lock = evCommit->Record.MutableLocks()->AddLocks(); + lock->SetLockId(LockId); + ForwardToTablet(Runtime, TTestTxConfig::TxTablet0, Sender, evCommit.release()); + + TAutoPtr handle; + auto event = Runtime.GrabEdgeEvent(handle); + AFL_VERIFY(event); + + return event->Record.GetStatus(); +} + +NKikimrDataEvents::TEvWriteResult::EStatus TShardWriter::Abort(const ui64 txId) { + auto evCommit = std::make_unique(txId, NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + evCommit->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Rollback); + auto* lock = evCommit->Record.MutableLocks()->AddLocks(); + lock->SetLockId(LockId); + ForwardToTablet(Runtime, TTestTxConfig::TxTablet0, Sender, evCommit.release()); + + TAutoPtr handle; + auto event = Runtime.GrabEdgeEvent(handle); + AFL_VERIFY(event); + + return event->Record.GetStatus(); +} + +NKikimrDataEvents::TEvWriteResult::EStatus TShardWriter::Write( + const std::shared_ptr& batch, const std::vector& columnIds, const ui64 txId) { + TString blobData = NArrow::SerializeBatchNoCompression(batch); +// AFL_VERIFY(blobData.size() < NColumnShard::TLimits::GetMaxBlobSize()); + + auto evWrite = std::make_unique(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); + evWrite->SetTxId(txId); + evWrite->SetLockId(LockId, LockNodeId); + const ui64 payloadIndex = NEvWrite::TPayloadWriter(*evWrite).AddDataToPayload(std::move(blobData)); + evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, { OwnerId, PathId, SchemaVersion }, columnIds, + payloadIndex, NKikimrDataEvents::FORMAT_ARROW); + + ForwardToTablet(Runtime, TabletId, Sender, evWrite.release()); + + TAutoPtr handle; + auto event = Runtime.GrabEdgeEvent(handle); + AFL_VERIFY(event); + + AFL_VERIFY(event->Record.GetOrigin() == TabletId); + AFL_VERIFY(event->Record.GetTxId() == LockId); + + return event->Record.GetStatus(); +} + +} // namespace NKikimr::NTxUT diff --git a/ydb/core/tx/columnshard/test_helper/shard_writer.h b/ydb/core/tx/columnshard/test_helper/shard_writer.h new file mode 100644 index 000000000000..b43e9749a69b --- /dev/null +++ b/ydb/core/tx/columnshard/test_helper/shard_writer.h @@ -0,0 +1,43 @@ +#pragma once +#include +#include + +#include + +#include + +namespace NKikimr::NTxUT { + +class TShardWriter { +private: + TTestBasicRuntime& Runtime; + const ui64 TabletId; + const ui64 PathId; + const ui64 LockId; + YDB_ACCESSOR(ui64, SchemaVersion, 1); + YDB_ACCESSOR(ui64, OwnerId, 0); + YDB_ACCESSOR(ui64, LockNodeId, 1); + const TActorId Sender; + +public: + TShardWriter(TTestBasicRuntime& runtime, const ui64 tabletId, const ui64 pathId, const ui64 lockId) + : Runtime(runtime) + , TabletId(tabletId) + , PathId(pathId) + , LockId(lockId) + , Sender(Runtime.AllocateEdgeActor()) + { + } + + const TActorId& GetSender() const { + return Sender; + } + + [[nodiscard]] NKikimrDataEvents::TEvWriteResult::EStatus StartCommit(const ui64 txId); + [[nodiscard]] NKikimrDataEvents::TEvWriteResult::EStatus Abort(const ui64 txId); + + [[nodiscard]] NKikimrDataEvents::TEvWriteResult::EStatus Write( + const std::shared_ptr& batch, const std::vector& columnIds, const ui64 txId); +}; + +} // namespace NKikimr::NTxUT diff --git a/ydb/core/tx/columnshard/test_helper/ya.make b/ydb/core/tx/columnshard/test_helper/ya.make index cab4937293dd..d4b96709720b 100644 --- a/ydb/core/tx/columnshard/test_helper/ya.make +++ b/ydb/core/tx/columnshard/test_helper/ya.make @@ -14,6 +14,8 @@ SRCS( helper.cpp controllers.cpp columnshard_ut_common.cpp + shard_reader.cpp + shard_writer.cpp ) IF (OS_WINDOWS) diff --git a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp index ad5ec1f688fd..a0716bd0925b 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_columnshard_read_write.cpp @@ -1,25 +1,28 @@ -#include -#include #include -#include -#include -#include -#include +#include #include -#include -#include #include -#include +#include +#include +#include #include #include -#include +#include +#include #include #include +#include + #include -#include #include #include +#include +#include + +#include +#include #include +#include namespace NKikimr { @@ -27,8 +30,7 @@ using namespace NColumnShard; using namespace Tests; using namespace NTxUT; -namespace -{ +namespace { namespace NTypeIds = NScheme::NTypeIds; using TTypeId = NScheme::TTypeId; @@ -37,8 +39,8 @@ using TTypeInfo = NScheme::TTypeInfo; using TDefaultTestsController = NKikimr::NYDBTest::NColumnShard::TController; template -bool DataHas(const std::vector>& batches, std::pair range, - bool requireUniq = false, const std::string& columnName = "timestamp") { +bool DataHas(const std::vector>& batches, std::pair range, bool requireUniq = false, + const std::string& columnName = "timestamp") { static constexpr const bool isStrKey = std::is_same_v; THashMap keys; @@ -94,9 +96,8 @@ bool DataHas(const std::vector>& batches, st } template -bool DataHas(const std::vector& blobs, const TString& srtSchema, std::pair range, - bool requireUniq = false, const std::string& columnName = "timestamp") { - +bool DataHas(const std::vector& blobs, const TString& srtSchema, std::pair range, bool requireUniq = false, + const std::string& columnName = "timestamp") { auto schema = NArrow::DeserializeSchema(srtSchema); std::vector> batches; for (auto& blob : blobs) { @@ -350,7 +351,7 @@ void TestWrite(const TestTableDescription& table) { const auto& ydbSchema = table.Schema; - bool ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, ydbSchema), ydbSchema); + bool ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, ydbSchema), ydbSchema); UNIT_ASSERT(ok); auto schema = ydbSchema; @@ -364,13 +365,13 @@ void TestWrite(const TestTableDescription& table) { TTestBlobOptions optsNulls; optsNulls.NullColumns.emplace("timestamp"); - ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, ydbSchema, optsNulls), ydbSchema); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, ydbSchema, optsNulls), ydbSchema); UNIT_ASSERT(!ok); // missing columns schema = NArrow::NTest::TTestColumn::CropSchema(schema, 4); - ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema); UNIT_ASSERT(ok); // wrong first key column type (with supported layout: Int64 vs Timestamp) @@ -378,8 +379,7 @@ void TestWrite(const TestTableDescription& table) { schema = ydbSchema; schema[0].SetType(TTypeInfo(NTypeIds::Int64)); - ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), - schema); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema); UNIT_ASSERT(!ok); // wrong type (no additional schema - fails in case of wrong layout) @@ -387,7 +387,7 @@ void TestWrite(const TestTableDescription& table) { for (size_t i = 0; i < ydbSchema.size(); ++i) { schema = ydbSchema; schema[i].SetType(TTypeInfo(NTypeIds::Int8)); - ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema); UNIT_ASSERT(!ok); } @@ -396,14 +396,14 @@ void TestWrite(const TestTableDescription& table) { for (size_t i = 0; i < ydbSchema.size(); ++i) { schema = ydbSchema; schema[i].SetType(TTypeInfo(NTypeIds::Int64)); - ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema); UNIT_ASSERT(ok == (ydbSchema[i].GetType() == TTypeInfo(NTypeIds::Int64))); } schema = ydbSchema; schema[1].SetType(TTypeInfo(NTypeIds::Utf8)); schema[5].SetType(TTypeInfo(NTypeIds::Int32)); - ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema); UNIT_ASSERT(!ok); // reordered columns @@ -415,12 +415,12 @@ void TestWrite(const TestTableDescription& table) { schema.push_back(NArrow::NTest::TTestColumn(name, typeInfo)); } - ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({0, 100}, schema), schema); + ok = WriteData(runtime, sender, writeId++, tableId, MakeTestBlob({ 0, 100 }, schema), schema); UNIT_ASSERT(ok); // too much data - TString bigData = MakeTestBlob({0, 150 * 1000}, ydbSchema); + TString bigData = MakeTestBlob({ 0, 150 * 1000 }, ydbSchema); UNIT_ASSERT(bigData.size() > NColumnShard::TLimits::GetMaxBlobSize()); UNIT_ASSERT(bigData.size() < NColumnShard::TLimits::GetMaxBlobSize() + 2 * 1024 * 1024); ok = WriteData(runtime, sender, writeId++, tableId, bigData, ydbSchema); @@ -445,7 +445,7 @@ void TestWriteOverload(const TestTableDescription& table) { SetupSchema(runtime, sender, tableId, table); - TString testBlob = MakeTestBlob({0, 100 * 1000}, table.Schema); + TString testBlob = MakeTestBlob({ 0, 100 * 1000 }, table.Schema); UNIT_ASSERT(testBlob.size() > NOlap::TCompactionLimits::MAX_BLOB_SIZE / 2); UNIT_ASSERT(testBlob.size() < NOlap::TCompactionLimits::MAX_BLOB_SIZE); @@ -489,7 +489,7 @@ void TestWriteOverload(const TestTableDescription& table) { UNIT_ASSERT_VALUES_EQUAL(WaitWriteResult(runtime, TTestTxConfig::TxTablet0), (ui32)NKikimrTxColumnShard::EResultStatus::SUCCESS); } - UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema)); // OK after overload + UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testBlob, table.Schema)); // OK after overload } // TODO: Improve test. It does not catch KIKIMR-14890 @@ -514,7 +514,7 @@ void TestWriteReadDup(const TestTableDescription& table = {}) { SetupSchema(runtime, sender, tableId); constexpr ui32 numRows = 10; - std::pair portion = {10, 10 + numRows}; + std::pair portion = { 10, 10 + numRows }; auto testData = MakeTestBlob(portion, ydbSchema); TAutoPtr handle; @@ -533,11 +533,11 @@ void TestWriteReadDup(const TestTableDescription& table = {}) { // read if (planStep != initPlanStep) { TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max())); - reader.SetReplyColumns({"timestamp"}); + reader.SetReplyColumns({ "timestamp" }); auto rb = reader.ReadAll(); UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, portion, true)); + UNIT_ASSERT(DataHas({ rb }, portion, true)); } } } @@ -561,7 +561,7 @@ void TestWriteReadLongTxDup() { SetupSchema(runtime, sender, tableId); constexpr ui32 numRows = 10; - std::pair portion = {10, 10 + numRows}; + std::pair portion = { 10, 10 + numRows }; NLongTxService::TLongTxId longTxId; UNIT_ASSERT(longTxId.ParseString("ydb://long-tx/01ezvvxjdk2hd4vdgjs68knvp8?node_id=1")); @@ -573,7 +573,7 @@ void TestWriteReadLongTxDup() { // Only the first blob with dedup pair {longTx, dedupId} should be inserted // Others should return OK (write retries emulation) for (ui32 i = 0; i < 4; ++i) { - auto data = MakeTestBlob({portion.first + i, portion.second + i}, ydbSchema); + auto data = MakeTestBlob({ portion.first + i, portion.second + i }, ydbSchema); UNIT_ASSERT(data.size() < NColumnShard::TLimits::MIN_BYTES_TO_INSERT); auto writeIdOpt = WriteData(runtime, sender, longTxId, tableId, 1, data, ydbSchema); @@ -584,8 +584,8 @@ void TestWriteReadLongTxDup() { UNIT_ASSERT_EQUAL(*writeIdOpt, *writeId); } - ProposeCommit(runtime, sender, ++txId, {*writeId}); - TSet txIds = {txId}; + ProposeCommit(runtime, sender, ++txId, { *writeId }); + TSet txIds = { txId }; PlanCommit(runtime, sender, planStep, txIds); // read @@ -600,8 +600,8 @@ void TestWriteReadLongTxDup() { Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, TTestSchema::ExtractNames(ydbSchema))); UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, portion, true)); - UNIT_ASSERT(DataHasOnly({rb}, portion)); + UNIT_ASSERT(DataHas({ rb }, portion, true)); + UNIT_ASSERT(DataHasOnly({ rb }, portion)); } } @@ -622,8 +622,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId, - const TString& data, const std::vector& ydbSchema, std::vector& intWriteIds) { + auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId, const TString& data, + const std::vector& ydbSchema, std::vector& intWriteIds) { bool ok = WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &intWriteIds); if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); @@ -631,8 +631,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString return ok; }; - auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, - const std::vector& writeIds) { + auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector& writeIds) { ProposeCommit(runtime, sender, txId, writeIds); if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); @@ -660,12 +659,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString // -----xx.. // xx---- // -xxxxx - std::vector> portion = { - {200, 300}, - {250, 250 + 80 * 1000}, // committed -> index - {0, 100}, - {50, 300} - }; + std::vector> portion = { { 200, 300 }, { 250, 250 + 80 * 1000 }, // committed -> index + { 0, 100 }, { 50, 300 } }; // write 1: ins:1, cmt:0, idx:0 @@ -678,7 +673,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 1); TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 1)); - reader.SetReplyColumns({"resource_type"}); + reader.SetReplyColumns({ "resource_type" }); auto rb = reader.ReadAll(); UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT_EQUAL(rb, nullptr); @@ -695,7 +690,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 2); TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 1)); - reader.SetReplyColumns({"resource_type"}); + reader.SetReplyColumns({ "resource_type" }); auto rb = reader.ReadAll(); UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT_EQUAL(rb, nullptr); @@ -713,14 +708,14 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, portion[0])); + UNIT_ASSERT(DataHas({ rb }, portion[0])); } // read 4 (column by id) { NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 4); TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); - reader.SetReplyColumnIds({1}); + reader.SetReplyColumnIds({ 1 }); auto rb = reader.ReadAll(); UNIT_ASSERT(rb); Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, std::vector({ "timestamp" }))); @@ -728,14 +723,14 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, portion[0])); + UNIT_ASSERT(DataHas({ rb }, portion[0])); } // read 5 (2 columns by name) { NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 5); TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); - reader.SetReplyColumns({"timestamp", "message"}); + reader.SetReplyColumns({ "timestamp", "message" }); auto rb = reader.ReadAll(); UNIT_ASSERT(rb); Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, std::vector({ "timestamp", "message" }))); @@ -743,7 +738,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, portion[0])); + UNIT_ASSERT(DataHas({ rb }, portion[0])); } // write 2 (big portion of data): ins:1, cmt:1, idx:0 @@ -773,7 +768,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString { NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 6); TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(0, 1)); - reader.SetReplyColumns({"timestamp", "message"}); + reader.SetReplyColumns({ "timestamp", "message" }); auto rb = reader.ReadAll(); UNIT_ASSERT(!rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); @@ -791,9 +786,9 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, portion[0])); - UNIT_ASSERT(!DataHas({rb}, portion[1])); - UNIT_ASSERT(!DataHas({rb}, portion[2])); + UNIT_ASSERT(DataHas({ rb }, portion[0])); + UNIT_ASSERT(!DataHas({ rb }, portion[1])); + UNIT_ASSERT(!DataHas({ rb }, portion[2])); } // read 8, planstep 22 (full index) @@ -808,9 +803,9 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, portion[0])); - UNIT_ASSERT(DataHas({rb}, portion[1])); - UNIT_ASSERT(!DataHas({rb}, portion[2])); + UNIT_ASSERT(DataHas({ rb }, portion[0])); + UNIT_ASSERT(DataHas({ rb }, portion[1])); + UNIT_ASSERT(!DataHas({ rb }, portion[2])); } // commit 3: ins:0, cmt:1, idx:1 @@ -838,10 +833,10 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, portion[0])); - UNIT_ASSERT(DataHas({rb}, portion[1])); - UNIT_ASSERT(DataHas({rb}, portion[2])); - UNIT_ASSERT(!DataHas({rb}, portion[3])); + UNIT_ASSERT(DataHas({ rb }, portion[0])); + UNIT_ASSERT(DataHas({ rb }, portion[1])); + UNIT_ASSERT(DataHas({ rb }, portion[2])); + UNIT_ASSERT(!DataHas({ rb }, portion[3])); } // commit 4: ins:0, cmt:2, idx:1 (with duplicates in PK) @@ -863,11 +858,11 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, portion[0])); - UNIT_ASSERT(DataHas({rb}, portion[1])); - UNIT_ASSERT(DataHas({rb}, portion[2])); - UNIT_ASSERT(DataHas({rb}, portion[3])); - UNIT_ASSERT(DataHas({rb}, {0, 500}, true)); + UNIT_ASSERT(DataHas({ rb }, portion[0])); + UNIT_ASSERT(DataHas({ rb }, portion[1])); + UNIT_ASSERT(DataHas({ rb }, portion[2])); + UNIT_ASSERT(DataHas({ rb }, portion[3])); + UNIT_ASSERT(DataHas({ rb }, { 0, 500 }, true)); const ui64 compactedBytes = reader.GetReadStat("compacted_bytes"); const ui64 insertedBytes = reader.GetReadStat("inserted_bytes"); @@ -896,13 +891,12 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString } } - // read 11 (range predicate: closed interval) { NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 11); TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId)); reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); - reader.AddRange(MakeTestRange({10, 42}, true, true, testYdbPk)); + reader.AddRange(MakeTestRange({ 10, 42 }, true, true, testYdbPk)); auto rb = reader.ReadAll(); UNIT_ASSERT(rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); @@ -910,8 +904,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, {10, 42 + 1})); - UNIT_ASSERT(DataHasOnly({rb}, {10, 42 + 1})); + UNIT_ASSERT(DataHas({ rb }, { 10, 42 + 1 })); + UNIT_ASSERT(DataHasOnly({ rb }, { 10, 42 + 1 })); } // read 12 (range predicate: open interval) @@ -919,7 +913,7 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString NActors::TLogContextGuard guard = NActors::TLogContextBuilder::Build(NKikimrServices::TX_COLUMNSHARD)("TEST_STEP", 11); TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(24, txId)); reader.SetReplyColumns(TTestSchema::ExtractNames(ydbSchema)); - reader.AddRange(MakeTestRange({10, 42}, false, false, testYdbPk)); + reader.AddRange(MakeTestRange({ 10, 42 }, false, false, testYdbPk)); auto rb = reader.ReadAll(); UNIT_ASSERT(rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); @@ -927,8 +921,8 @@ void TestWriteRead(bool reboots, const TestTableDescription& table = {}, TString UNIT_ASSERT((ui32)rb->num_columns() == TTestSchema::ExtractNames(ydbSchema).size()); UNIT_ASSERT(rb->num_rows()); UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(DataHas({rb}, {10 + 1, 41 + 1})); - UNIT_ASSERT(DataHasOnly({rb}, {10 + 1, 41 + 1})); + UNIT_ASSERT(DataHas({ rb }, { 10 + 1, 41 + 1 })); + UNIT_ASSERT(DataHasOnly({ rb }, { 10 + 1, 41 + 1 })); } } @@ -944,8 +938,8 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); runtime.DispatchEvents(options); - auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId, - const TString& data, const std::vector& ydbSchema, std::vector& writeIds) { + auto write = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 writeId, ui64 tableId, const TString& data, + const std::vector& ydbSchema, std::vector& writeIds) { bool ok = WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &writeIds); if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); @@ -953,8 +947,7 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table return ok; }; - auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, - const std::vector& writeIds) { + auto proposeCommit = [&](TTestBasicRuntime& runtime, TActorId& sender, ui64 txId, const std::vector& writeIds) { ProposeCommit(runtime, sender, txId, writeIds); if (reboots) { RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); @@ -983,14 +976,14 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table // Write same keys: merge on compaction static const ui32 triggerPortionSize = 75 * 1000; - std::pair triggerPortion = {0, triggerPortionSize}; + std::pair triggerPortion = { 0, triggerPortionSize }; TString triggerData = MakeTestBlob(triggerPortion, ydbSchema); UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize()); static const ui32 portionSize = 1; - ui32 numWrites = NColumnShard::TLimits::MIN_SMALL_BLOBS_TO_INSERT; // trigger InsertTable -> Index + ui32 numWrites = NColumnShard::TLimits::MIN_SMALL_BLOBS_TO_INSERT; // trigger InsertTable -> Index // inserts triggered by count ui32 pos = triggerPortionSize; @@ -998,7 +991,7 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table std::vector ids; ids.reserve(numWrites); for (ui32 w = 0; w < numWrites; ++w, ++writeId, pos += portionSize) { - std::pair portion = {pos, pos + portionSize}; + std::pair portion = { pos, pos + portionSize }; TString data = MakeTestBlob(portion, ydbSchema); UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, data, ydbSchema, true, &ids)); @@ -1011,7 +1004,7 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table proposeCommit(runtime, sender, txId, ids); planCommit(runtime, sender, planStep, txId); } - std::pair smallWrites = {triggerPortionSize, pos}; + std::pair smallWrites = { triggerPortionSize, pos }; // inserts triggered by size NOlap::TCompactionLimits engineLimits; @@ -1031,17 +1024,17 @@ void TestCompactionInGranuleImpl(bool reboots, const TestTableDescription& table for (ui32 i = 0; i < 2; ++i) { TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); - reader.SetReplyColumns({"timestamp", "message"}); + reader.SetReplyColumns({ "timestamp", "message" }); auto rb = reader.ReadAll(); UNIT_ASSERT(rb); UNIT_ASSERT(reader.IsCorrectlyFinished()); if (ydbPk[0].GetType() == TTypeInfo(NTypeIds::String) || ydbPk[0].GetType() == TTypeInfo(NTypeIds::Utf8)) { - UNIT_ASSERT(DataHas({rb}, triggerPortion, true)); - UNIT_ASSERT(DataHas({rb}, smallWrites, true)); + UNIT_ASSERT(DataHas({ rb }, triggerPortion, true)); + UNIT_ASSERT(DataHas({ rb }, smallWrites, true)); } else { - UNIT_ASSERT(DataHas({rb}, triggerPortion, true)); - UNIT_ASSERT(DataHas({rb}, smallWrites, true)); + UNIT_ASSERT(DataHas({ rb }, triggerPortion, true)); + UNIT_ASSERT(DataHas({ rb }, smallWrites, true)); } RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); } @@ -1054,7 +1047,7 @@ using TAggAssignment = NKikimrSSA::TProgram::TAggregateAssignment; static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssignment::FUNC_CMP_EQUAL) { NKikimrSSA::TProgram ssa; - std::vector columnIds = {1, 9, 5}; + std::vector columnIds = { 1, 9, 5 }; ui32 tmpColumnId = 100; auto* line1 = ssa.AddCommand(); @@ -1078,7 +1071,7 @@ static NKikimrSSA::TProgram MakeSelect(TAssignment::EFunction compareId = TAssig static NKikimrSSA::TProgram MakeSelectLike(TAssignment::EFunction likeId, const TString& pattern) { NKikimrSSA::TProgram ssa; - std::vector columnIds = {6}; // message + std::vector columnIds = { 6 }; // message auto* line1 = ssa.AddCommand(); auto* l1_assign = line1->MutableAssign(); @@ -1102,9 +1095,7 @@ static NKikimrSSA::TProgram MakeSelectLike(TAssignment::EFunction likeId, const } // SELECT min(x), max(x), some(x), count(x) FROM t [GROUP BY key[0], key[1], ...] -NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId, const std::vector& keys = {}, - bool addProjection = true) -{ +NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId, const std::vector& keys = {}, bool addProjection = true) { NKikimrSSA::TProgram ssa; auto* line1 = ssa.AddCommand(); @@ -1150,10 +1141,8 @@ NKikimrSSA::TProgram MakeSelectAggregates(ui32 columnId, const std::vector } // SELECT min(x), max(x), some(x), count(x) FROM t WHERE y = 1 [GROUP BY key[0], key[1], ...] -NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterColumnId, - const std::vector& keys = {}, - bool addProjection = true) -{ +NKikimrSSA::TProgram MakeSelectAggregatesWithFilter( + ui32 columnId, ui32 filterColumnId, const std::vector& keys = {}, bool addProjection = true) { NKikimrSSA::TProgram ssa; auto* line1 = ssa.AddCommand(); @@ -1218,8 +1207,7 @@ NKikimrSSA::TProgram MakeSelectAggregatesWithFilter(ui32 columnId, ui32 filterCo return ssa; } -void TestReadWithProgram(const TestTableDescription& table = {}) -{ +void TestReadWithProgram(const TestTableDescription& table = {}) { TTestBasicRuntime runtime; TTester::Setup(runtime); auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); @@ -1238,9 +1226,9 @@ void TestReadWithProgram(const TestTableDescription& table = {}) SetupSchema(runtime, sender, tableId, table); - { // write some data + { // write some data std::vector writeIds; - bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds); + bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({ 0, 100 }, table.Schema), table.Schema, true, &writeIds); UNIT_ASSERT(ok); ProposeCommit(runtime, sender, txId, writeIds); @@ -1290,7 +1278,7 @@ void TestReadWithProgram(const TestTableDescription& table = {}) UNIT_ASSERT(rb->num_rows()); Y_UNUSED(NArrow::TColumnOperator().VerifyIfAbsent().Extract(rb, std::vector({ "level", "timestamp" }))); UNIT_ASSERT(rb->num_columns() == 2); - UNIT_ASSERT(DataHas({rb}, {0, 100}, true)); + UNIT_ASSERT(DataHas({ rb }, { 0, 100 }, true)); break; case 2: UNIT_ASSERT(!rb || !rb->num_rows()); @@ -1309,8 +1297,7 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) { auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); TActorId sender = runtime.AllocateEdgeActor(); - CreateTestBootstrapper(runtime, - CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); + CreateTestBootstrapper(runtime, CreateTestTabletInfo(TTestTxConfig::TxTablet0, TTabletTypes::ColumnShard), &CreateColumnShard); TDispatchOptions options; options.FinalEvents.push_back(TDispatchOptions::TFinalEventCondition(TEvTablet::EvBoot)); @@ -1323,9 +1310,9 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) { SetupSchema(runtime, sender, tableId, table); - { // write some data + { // write some data std::vector writeIds; - bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds); + bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({ 0, 100 }, table.Schema), table.Schema, true, &writeIds); UNIT_ASSERT(ok); ProposeCommit(runtime, sender, txId, writeIds); @@ -1333,14 +1320,10 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) { } TString pattern = "1"; - std::vector ssas = { - MakeSelectLike(TAssignment::FUNC_STR_MATCH, pattern), - MakeSelectLike(TAssignment::FUNC_STR_MATCH_IGNORE_CASE, pattern), - MakeSelectLike(TAssignment::FUNC_STR_STARTS_WITH, pattern), - MakeSelectLike(TAssignment::FUNC_STR_STARTS_WITH_IGNORE_CASE, pattern), - MakeSelectLike(TAssignment::FUNC_STR_ENDS_WITH, pattern), - MakeSelectLike(TAssignment::FUNC_STR_ENDS_WITH_IGNORE_CASE, pattern) - }; + std::vector ssas = { MakeSelectLike(TAssignment::FUNC_STR_MATCH, pattern), + MakeSelectLike(TAssignment::FUNC_STR_MATCH_IGNORE_CASE, pattern), MakeSelectLike(TAssignment::FUNC_STR_STARTS_WITH, pattern), + MakeSelectLike(TAssignment::FUNC_STR_STARTS_WITH_IGNORE_CASE, pattern), MakeSelectLike(TAssignment::FUNC_STR_ENDS_WITH, pattern), + MakeSelectLike(TAssignment::FUNC_STR_ENDS_WITH_IGNORE_CASE, pattern) }; ui32 i = 0; for (auto& ssa : ssas) { @@ -1355,15 +1338,15 @@ void TestReadWithProgramLike(const TestTableDescription& table = {}) { switch (i) { case 0: case 1: - UNIT_ASSERT(CheckColumns(rb, {"message"}, 19)); + UNIT_ASSERT(CheckColumns(rb, { "message" }, 19)); break; case 2: case 3: - UNIT_ASSERT(CheckColumns(rb, {"message"}, 11)); + UNIT_ASSERT(CheckColumns(rb, { "message" }, 11)); break; case 4: case 5: - UNIT_ASSERT(CheckColumns(rb, {"message"}, 10)); + UNIT_ASSERT(CheckColumns(rb, { "message" }, 10)); break; default: break; @@ -1391,9 +1374,9 @@ void TestSomePrograms(const TestTableDescription& table) { SetupSchema(runtime, sender, tableId, table); - { // write some data + { // write some data std::vector writeIds; - bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({0, 100}, table.Schema), table.Schema, true, &writeIds); + bool ok = WriteData(runtime, sender, writeId, tableId, MakeTestBlob({ 0, 100 }, table.Schema), table.Schema, true, &writeIds); UNIT_ASSERT(ok); ProposeCommit(runtime, sender, txId, writeIds); @@ -1426,15 +1409,14 @@ void TestSomePrograms(const TestTableDescription& table) { struct TReadAggregateResult { ui32 NumRows = 1; - std::vector MinValues = {0}; - std::vector MaxValues = {99}; - std::vector Counts = {100}; + std::vector MinValues = { 0 }; + std::vector MaxValues = { 99 }; + std::vector Counts = { 100 }; }; -void TestReadAggregate(const std::vector& ydbSchema, const TString& testDataBlob, - bool addProjection, const std::vector& aggKeys = {}, - const TReadAggregateResult& expectedResult = {}, - const TReadAggregateResult& expectedFiltered = {1, {1}, {1}, {1}}) { +void TestReadAggregate(const std::vector& ydbSchema, const TString& testDataBlob, bool addProjection, + const std::vector& aggKeys = {}, const TReadAggregateResult& expectedResult = {}, + const TReadAggregateResult& expectedFiltered = { 1, { 1 }, { 1 }, { 1 } }) { TTestBasicRuntime runtime; TTester::Setup(runtime); auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); @@ -1452,10 +1434,10 @@ void TestReadAggregate(const std::vector& ydbSchema, ui64 txId = 100; auto pk = NArrow::NTest::TTestColumn::CropSchema(ydbSchema, 4); - TestTableDescription table{.Schema = ydbSchema, .Pk = pk}; + TestTableDescription table{ .Schema = ydbSchema, .Pk = pk }; SetupSchema(runtime, sender, tableId, table); - { // write some data + { // write some data std::vector writeIds; bool ok = WriteData(runtime, sender, writeId, tableId, testDataBlob, table.Schema, true, &writeIds); UNIT_ASSERT(ok); @@ -1469,11 +1451,9 @@ void TestReadAggregate(const std::vector& ydbSchema, std::vector programs; THashSet isFiltered; THashSet checkResult; - THashSet intTypes = { - NTypeIds::Int8, NTypeIds::Int16, NTypeIds::Int32, NTypeIds::Int64, - NTypeIds::Uint8, NTypeIds::Uint16, NTypeIds::Uint32, NTypeIds::Uint64, - NTypeIds::Timestamp, NTypeIds::Date32, NTypeIds::Datetime64, NTypeIds::Timestamp64, NTypeIds::Interval64 - }; + THashSet intTypes = { NTypeIds::Int8, NTypeIds::Int16, NTypeIds::Int32, NTypeIds::Int64, NTypeIds::Uint8, NTypeIds::Uint16, + NTypeIds::Uint32, NTypeIds::Uint64, NTypeIds::Timestamp, NTypeIds::Date32, NTypeIds::Datetime64, NTypeIds::Timestamp64, + NTypeIds::Interval64 }; THashSet strTypes = { NTypeIds::Utf8, NTypeIds::String //NTypeIds::Yson, NTypeIds::Json, NTypeIds::JsonDocument @@ -1481,8 +1461,7 @@ void TestReadAggregate(const std::vector& ydbSchema, ui32 prog = 0; for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) { - if (intTypes.contains(ydbSchema[i].GetType().GetTypeId()) || - strTypes.contains(ydbSchema[i].GetType().GetTypeId())) { + if (intTypes.contains(ydbSchema[i].GetType().GetTypeId()) || strTypes.contains(ydbSchema[i].GetType().GetTypeId())) { checkResult.insert(prog); } @@ -1498,8 +1477,7 @@ void TestReadAggregate(const std::vector& ydbSchema, for (ui32 i = 0; i < ydbSchema.size(); ++i, ++prog) { isFiltered.insert(prog); - if (intTypes.contains(ydbSchema[i].GetType().GetTypeId()) || - strTypes.contains(ydbSchema[i].GetType().GetTypeId())) { + if (intTypes.contains(ydbSchema[i].GetType().GetTypeId()) || strTypes.contains(ydbSchema[i].GetType().GetTypeId())) { checkResult.insert(prog); } @@ -1513,8 +1491,8 @@ void TestReadAggregate(const std::vector& ydbSchema, UNIT_ASSERT(program.SerializeToString(&programs.back())); } - std::vector namedColumns = {"res_min", "res_max", "res_some", "res_count"}; - std::vector unnamedColumns = {"100", "101", "102", "103"}; + std::vector namedColumns = { "res_min", "res_max", "res_some", "res_count" }; + std::vector unnamedColumns = { "100", "101", "102", "103" }; if (!addProjection) { for (auto& key : aggKeys) { namedColumns.push_back(ydbSchema[key].GetName()); @@ -1534,7 +1512,7 @@ void TestReadAggregate(const std::vector& ydbSchema, if (checkResult.contains(prog)) { if (isFiltered.contains(prog)) { UNIT_ASSERT(CheckColumns(batch, namedColumns, expectedFiltered.NumRows)); - if (aggKeys.empty()) { // TODO: ORDER BY for compare + if (aggKeys.empty()) { // TODO: ORDER BY for compare UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_min"), expectedFiltered.MinValues)); UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_max"), expectedFiltered.MaxValues)); UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_some"), expectedFiltered.MinValues)); @@ -1542,7 +1520,7 @@ void TestReadAggregate(const std::vector& ydbSchema, UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("res_count"), expectedFiltered.Counts)); } else { UNIT_ASSERT(CheckColumns(batch, unnamedColumns, expectedResult.NumRows)); - if (aggKeys.empty()) { // TODO: ORDER BY for compare + if (aggKeys.empty()) { // TODO: ORDER BY for compare UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("100"), expectedResult.MinValues)); UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("101"), expectedResult.MaxValues)); UNIT_ASSERT(CheckIntValues(batch->GetColumnByName("102"), expectedResult.MinValues)); @@ -1555,10 +1533,9 @@ void TestReadAggregate(const std::vector& ydbSchema, } } -} +} // namespace Y_UNIT_TEST_SUITE(EvWrite) { - Y_UNIT_TEST(WriteInTransaction) { using namespace NArrow; @@ -1566,48 +1543,37 @@ Y_UNIT_TEST_SUITE(EvWrite) { TTester::Setup(runtime); auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); - const ui64 ownerId = 0; const ui64 tableId = 1; - const ui64 schemaVersion = 1; - const std::vector schema = { - NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)), - NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) - }; - const std::vector columnsIds = {1, 2}; + const std::vector schema = { NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)), + NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) }; + const std::vector columnsIds = { 1, 2 }; PrepareTablet(runtime, tableId, schema); const ui64 txId = 111; - NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared>>("key"); + NConstruction::IArrayBuilder::TPtr keyColumn = + std::make_shared>>("key"); NConstruction::IArrayBuilder::TPtr column = std::make_shared>( "field", NConstruction::TStringPoolFiller(8, 100)); auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048); - TString blobData = NArrow::SerializeBatchNoCompression(batch); - UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize()); - - auto evWrite = std::make_unique(NKikimrDataEvents::TEvWrite::MODE_PREPARE); - evWrite->SetTxId(txId); - ui64 payloadIndex = NEvWrite::TPayloadWriter(*evWrite).AddDataToPayload(std::move(blobData)); - evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW); - TActorId sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); + NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222); + AFL_VERIFY(writer.Write(batch, {1, 2}, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); + AFL_VERIFY(writer.StartCommit(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); { - TAutoPtr handle; - auto event = runtime.GrabEdgeEvent(handle); - UNIT_ASSERT(event); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), txId); - UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); + NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 444); + AFL_VERIFY(writer.StartCommit(444) == NKikimrDataEvents::TEvWriteResult::STATUS_BAD_REQUEST); + } + { auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema); UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0); - PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId)); + PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(11, txId)); } - auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot::MaxForPlanStep(11), schema); UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048); } @@ -1618,46 +1584,26 @@ Y_UNIT_TEST_SUITE(EvWrite) { TTester::Setup(runtime); auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); - - const ui64 ownerId = 0; const ui64 tableId = 1; - const ui64 schemaVersion = 1; - const std::vector schema = { - NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)), - NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) - }; - const std::vector columnsIds = {1, 2}; + const std::vector schema = { NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)), + NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) }; + const std::vector columnsIds = { 1, 2 }; PrepareTablet(runtime, tableId, schema); const ui64 txId = 111; - NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared>>("key"); + NConstruction::IArrayBuilder::TPtr keyColumn = + std::make_shared>>("key"); NConstruction::IArrayBuilder::TPtr column = std::make_shared>( "field", NConstruction::TStringPoolFiller(8, 100)); auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048); - TString blobData = NArrow::SerializeBatchNoCompression(batch); - UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize()); + NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222); + AFL_VERIFY(writer.Write(batch, {1, 2}, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); + AFL_VERIFY(writer.Abort(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); - auto evWrite = std::make_unique(NKikimrDataEvents::TEvWrite::MODE_PREPARE); - evWrite->SetTxId(txId); - ui64 payloadIndex = NEvWrite::TPayloadWriter(*evWrite).AddDataToPayload(std::move(blobData)); - evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW); + PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(10, txId + 1), false); - TActorId sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); - - ui64 outdatedStep = 11; - { - TAutoPtr handle; - auto event = runtime.GrabEdgeEvent(handle); - UNIT_ASSERT(event); - UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); - - outdatedStep = event->Record.GetMaxStep() + 1; - PlanWriteTx(runtime, sender, NOlap::TSnapshot(outdatedStep, txId + 1), false); - } - - auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(outdatedStep, txId), schema); + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot::MaxForPlanStep(10), schema); UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0); } @@ -1668,19 +1614,14 @@ Y_UNIT_TEST_SUITE(EvWrite) { TTester::Setup(runtime); auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); - - const ui64 ownerId = 0; const ui64 tableId = 1; - const ui64 schemaVersion = 1; - const std::vector schema = { - NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)), - NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) - }; - const std::vector columnsIds = {1, 2}; + const std::vector schema = { NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)), + NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) }; PrepareTablet(runtime, tableId, schema); const ui64 txId = 111; - NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared>>("key"); + NConstruction::IArrayBuilder::TPtr keyColumn = + std::make_shared>>("key"); NConstruction::IArrayBuilder::TPtr column = std::make_shared>( "field", NConstruction::TStringPoolFiller(8, TLimits::GetMaxBlobSize() / 1024)); @@ -1688,23 +1629,13 @@ Y_UNIT_TEST_SUITE(EvWrite) { TString blobData = NArrow::SerializeBatchNoCompression(batch); UNIT_ASSERT(blobData.size() > TLimits::GetMaxBlobSize()); - auto evWrite = std::make_unique(NKikimrDataEvents::TEvWrite::MODE_PREPARE); - evWrite->SetTxId(txId); - ui64 payloadIndex = NEvWrite::TPayloadWriter(*evWrite).AddDataToPayload(std::move(blobData)); - evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW); - - TActorId sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); + NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222); + AFL_VERIFY(writer.Write(batch, {1, 2}, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); + AFL_VERIFY(writer.StartCommit(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); - { - TAutoPtr handle; - auto event = runtime.GrabEdgeEvent(handle); - UNIT_ASSERT(event); - UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); - PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId)); - } + PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(11, txId)); - auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot::MaxForPlanStep(11), schema); UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 2048); } @@ -1715,93 +1646,44 @@ Y_UNIT_TEST_SUITE(EvWrite) { TTester::Setup(runtime); auto csDefaultControllerGuard = NKikimr::NYDBTest::TControllers::RegisterCSControllerGuard(); - - const ui64 ownerId = 0; const ui64 tableId = 1; - const ui64 schemaVersion = 1; - const std::vector schema = { - NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64) ), - NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8) ) - }; - const std::vector columnsIds = {1, 2}; + const std::vector schema = { NArrow::NTest::TTestColumn("key", TTypeInfo(NTypeIds::Uint64)), + NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) }; + const std::vector columnIds = { 1, 2 }; PrepareTablet(runtime, tableId, schema); const ui64 txId = 111; - const ui64 lockId = 110; + NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222); { - NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared>>("key"); - NConstruction::IArrayBuilder::TPtr column = std::make_shared>("field", NConstruction::TStringPoolFiller(8, 100)); + NConstruction::IArrayBuilder::TPtr keyColumn = + std::make_shared>>("key"); + NConstruction::IArrayBuilder::TPtr column = + std::make_shared>( + "field", NConstruction::TStringPoolFiller(8, 100)); auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048); - TString blobData = NArrow::SerializeBatchNoCompression(batch); - UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize()); - auto evWrite = std::make_unique(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); - evWrite->SetLockId(lockId, 1); - - ui64 payloadIndex = NEvWrite::TPayloadWriter(*evWrite).AddDataToPayload(std::move(blobData)); - evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW); - - TActorId sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); - + AFL_VERIFY(writer.Write(batch, columnIds, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); { - TAutoPtr handle; - auto event = runtime.GrabEdgeEvent(handle); - UNIT_ASSERT(event); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), lockId); - UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); - - auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, lockId), schema); + auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema); UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0); } } { - NConstruction::IArrayBuilder::TPtr keyColumn = std::make_shared>>("key", 2049); - NConstruction::IArrayBuilder::TPtr column = std::make_shared>("field", NConstruction::TStringPoolFiller(8, 100)); + NConstruction::IArrayBuilder::TPtr keyColumn = + std::make_shared>>("key", 2049); + NConstruction::IArrayBuilder::TPtr column = + std::make_shared>( + "field", NConstruction::TStringPoolFiller(8, 100)); auto batch = NConstruction::TRecordBatchConstructor({ keyColumn, column }).BuildBatch(2048); - TString blobData = NArrow::SerializeBatchNoCompression(batch); - UNIT_ASSERT(blobData.size() < TLimits::GetMaxBlobSize()); - auto evWrite = std::make_unique(NKikimrDataEvents::TEvWrite::MODE_IMMEDIATE); - evWrite->SetLockId(lockId, 1); - - ui64 payloadIndex = NEvWrite::TPayloadWriter(*evWrite).AddDataToPayload(std::move(blobData)); - evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW); - - TActorId sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); + AFL_VERIFY(writer.Write(batch, columnIds, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); { - TAutoPtr handle; - auto event = runtime.GrabEdgeEvent(handle); - UNIT_ASSERT(event); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), lockId); - UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); - auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(10, txId), schema); UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 0); } } { - auto evWrite = std::make_unique(NKikimrDataEvents::TEvWrite::MODE_PREPARE); - evWrite->SetTxId(txId); - evWrite->Record.MutableLocks()->SetOp(NKikimrDataEvents::TKqpLocks::Commit); - auto* lock = evWrite->Record.MutableLocks()->AddLocks(); - lock->SetLockId(lockId); - - TActorId sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); - - { - TAutoPtr handle; - auto event = runtime.GrabEdgeEvent(handle); - UNIT_ASSERT(event); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetOrigin(), TTestTxConfig::TxTablet0); - UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); - UNIT_ASSERT_VALUES_EQUAL(event->Record.GetTxId(), txId); - } - - PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId)); + AFL_VERIFY(writer.StartCommit(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); + PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(11, txId)); } auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); @@ -1881,7 +1763,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { { TSet txIds; std::vector writeIds; - UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Update)); + UNIT_ASSERT( + WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Update)); ProposeCommit(runtime, sender, ++txId, writeIds); txIds.insert(txId); PlanCommit(runtime, sender, planStep, txIds); @@ -1896,7 +1779,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { { TSet txIds; std::vector writeIds; - UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Insert)); + UNIT_ASSERT( + WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Insert)); ProposeCommit(runtime, sender, ++txId, writeIds); txIds.insert(txId); PlanCommit(runtime, sender, planStep, txIds); @@ -1912,7 +1796,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { { TSet txIds; std::vector writeIds; - UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Upsert)); + UNIT_ASSERT( + WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Upsert)); ProposeCommit(runtime, sender, ++txId, writeIds); txIds.insert(txId); PlanCommit(runtime, sender, planStep, txIds); @@ -1928,7 +1813,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { { TSet txIds; std::vector writeIds; - UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Update)); + UNIT_ASSERT( + WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Update)); ProposeCommit(runtime, sender, ++txId, writeIds); txIds.insert(txId); PlanCommit(runtime, sender, planStep, txIds); @@ -1944,12 +1830,14 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { { TSet txIds; std::vector writeIds; - UNIT_ASSERT(!WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Insert)); + UNIT_ASSERT( + !WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Insert)); } { TSet txIds; std::vector writeIds; - UNIT_ASSERT(WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Delete)); + UNIT_ASSERT( + WriteData(runtime, sender, ++writeId, tableId, testData, ydbSchema, true, &writeIds, NEvWrite::EModificationType::Delete)); ProposeCommit(runtime, sender, ++txId, writeIds); txIds.insert(txId); PlanCommit(runtime, sender, planStep, txIds); @@ -2011,7 +1899,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { schema[0].SetType(TTypeInfo(typeId)); pk[0].SetType(TTypeInfo(typeId)); - TestTableDescription table{.Schema = schema, .Pk = pk}; + TestTableDescription table{ .Schema = schema, .Pk = pk }; TestCompactionInGranuleImpl(reboot, table); } @@ -2047,7 +1935,6 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TestCompactionInGranule(false, NTypeIds::Datetime); } - Y_UNIT_TEST(CompactionInGranule_PKString_Reboot) { TestCompactionInGranule(true, NTypeIds::String); } @@ -2090,16 +1977,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { Y_UNIT_TEST(ReadSomePrograms) { TestTableDescription table; - table.Schema = { - NArrow::NTest::TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp) ), - NArrow::NTest::TTestColumn("resource_id", TTypeInfo(NTypeIds::Utf8) ), - NArrow::NTest::TTestColumn("uid", TTypeInfo(NTypeIds::Utf8) ), - NArrow::NTest::TTestColumn("level", TTypeInfo(NTypeIds::Int32) ), - NArrow::NTest::TTestColumn("message", TTypeInfo(NTypeIds::Utf8) ) - }; - table.Pk = { - NArrow::NTest::TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp) ) - }; + table.Schema = { NArrow::NTest::TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp)), + NArrow::NTest::TTestColumn("resource_id", TTypeInfo(NTypeIds::Utf8)), NArrow::NTest::TTestColumn("uid", TTypeInfo(NTypeIds::Utf8)), + NArrow::NTest::TTestColumn("level", TTypeInfo(NTypeIds::Int32)), NArrow::NTest::TTestColumn("message", TTypeInfo(NTypeIds::Utf8)) }; + table.Pk = { NArrow::NTest::TTestColumn("timestamp", TTypeInfo(NTypeIds::Timestamp)) }; TestSomePrograms(table); } @@ -2122,14 +2003,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { counts.push_back(1); } - THashSet sameValTypes = { - NTypeIds::Yson, NTypeIds::Json, NTypeIds::JsonDocument - }; + THashSet sameValTypes = { NTypeIds::Yson, NTypeIds::Json, NTypeIds::JsonDocument }; // TODO: query needs normalization to compare with expected TReadAggregateResult resDefault = { 100, {}, {}, counts }; - TReadAggregateResult resFiltered = { 1, {}, {}, {1} }; - TReadAggregateResult resGrouped = { 1, {}, {}, {100} }; + TReadAggregateResult resFiltered = { 1, {}, {}, { 1 } }; + TReadAggregateResult resGrouped = { 1, {}, {}, { 100 } }; for (ui32 key = 0; key < schema.size(); ++key) { Cerr << "-- group by key: " << key << "\n"; @@ -2143,8 +2022,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } for (ui32 key = 0; key < schema.size() - 1; ++key) { Cerr << "-- group by key: " << key << ", " << key + 1 << "\n"; - if (sameValTypes.contains(schema[key].GetType().GetTypeId()) && - sameValTypes.contains(schema[key + 1].GetType().GetTypeId())) { + if (sameValTypes.contains(schema[key].GetType().GetTypeId()) && sameValTypes.contains(schema[key + 1].GetType().GetTypeId())) { TestReadAggregate(schema, testBlob, (key % 2), { key, key + 1 }, resGrouped, resFiltered); } else { TestReadAggregate(schema, testBlob, (key % 2), { key, key + 1 }, resDefault, resFiltered); @@ -2152,8 +2030,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } for (ui32 key = 0; key < schema.size() - 2; ++key) { Cerr << "-- group by key: " << key << ", " << key + 1 << ", " << key + 2 << "\n"; - if (sameValTypes.contains(schema[key].GetType().GetTypeId()) && - sameValTypes.contains(schema[key + 1].GetType().GetTypeId()) && + if (sameValTypes.contains(schema[key].GetType().GetTypeId()) && sameValTypes.contains(schema[key + 1].GetType().GetTypeId()) && sameValTypes.contains(schema[key + 1].GetType().GetTypeId())) { TestReadAggregate(schema, testBlob, (key % 2), { key, key + 1, key + 2 }, resGrouped, resFiltered); } else { @@ -2170,12 +2047,13 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { const std::vector YdbPk; public: - TTabletReadPredicateTest(TTestBasicRuntime& runtime, const ui64 planStep, const ui64 txId, const std::vector& ydbPk) + TTabletReadPredicateTest( + TTestBasicRuntime& runtime, const ui64 planStep, const ui64 txId, const std::vector& ydbPk) : Runtime(runtime) , PlanStep(planStep) , TxId(txId) - , YdbPk(ydbPk) - {} + , YdbPk(ydbPk) { + } class TBorder { private: @@ -2185,14 +2063,15 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { public: TBorder(const std::vector& values, const bool include = false) : Border(values) - , Include(include) - {} + , Include(include) { + } - bool GetInclude() const noexcept { return Include; } + bool GetInclude() const noexcept { + return Include; + } - std::vector GetCellVec(const std::vector& pk, - std::vector& mem, bool trailingNulls = false) const - { + std::vector GetCellVec( + const std::vector& pk, std::vector& mem, bool trailingNulls = false) const { UNIT_ASSERT(Border.size() <= pk.size()); std::vector cells; size_t i = 0; @@ -2213,16 +2092,25 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TTestCaseOptions() = default; - TTestCaseOptions& SetFrom(const TBorder& border) { From = border; return *this; } - TTestCaseOptions& SetTo(const TBorder& border) { To = border; return *this; } - TTestCaseOptions& SetExpectedCount(ui32 count) { ExpectedCount = count; return *this; } + TTestCaseOptions& SetFrom(const TBorder& border) { + From = border; + return *this; + } + TTestCaseOptions& SetTo(const TBorder& border) { + To = border; + return *this; + } + TTestCaseOptions& SetExpectedCount(ui32 count) { + ExpectedCount = count; + return *this; + } TSerializedTableRange MakeRange(const std::vector& pk) const { std::vector mem; auto cellsFrom = From ? From->GetCellVec(pk, mem, false) : std::vector(); auto cellsTo = To ? To->GetCellVec(pk, mem) : std::vector(); return TSerializedTableRange(TConstArrayRef(cellsFrom), (From ? From->GetInclude() : false), - TConstArrayRef(cellsTo), (To ? To->GetInclude(): false)); + TConstArrayRef(cellsTo), (To ? To->GetInclude() : false)); } }; @@ -2233,17 +2121,17 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { void Execute() { const ui64 tableId = 1; - std::set useFields = {"timestamp", "message"}; - { // read with predicate (FROM) + std::set useFields = { "timestamp", "message" }; + { // read with predicate (FROM) TShardReader reader(Owner.Runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(Owner.PlanStep, Owner.TxId)); - reader.SetReplyColumns({"timestamp", "message"}); + reader.SetReplyColumns({ "timestamp", "message" }); reader.AddRange(MakeRange(Owner.YdbPk)); auto rb = reader.ReadAll(); UNIT_ASSERT(reader.IsCorrectlyFinished()); if (ExpectedCount) { if (*ExpectedCount) { UNIT_ASSERT(CheckOrdered(rb)); - UNIT_ASSERT(CheckColumns(rb, {"timestamp", "message"}, ExpectedCount)); + UNIT_ASSERT(CheckColumns(rb, { "timestamp", "message" }, ExpectedCount)); } else { UNIT_ASSERT(!rb || !rb->num_rows()); } @@ -2255,8 +2143,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { TTestCase(TTabletReadPredicateTest& owner, const TString& testCaseName, const TTestCaseOptions& opts = {}) : TTestCaseOptions(opts) , Owner(owner) - , TestCaseName(testCaseName) - { + , TestCaseName(testCaseName) { Cerr << "TEST CASE " << TestCaseName << " START..." << Endl; } @@ -2331,18 +2218,17 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { for (ui32 i = 0; i < 2; ++i) { { TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep, txId)); - reader.SetReplyColumns({"timestamp", "message"}); + reader.SetReplyColumns({ "timestamp", "message" }); auto rb = reader.ReadAll(); UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT(CheckOrdered(rb)); if (testBlobOptions.SameValueColumns.contains("timestamp")) { UNIT_ASSERT(!testBlobOptions.SameValueColumns.contains("message")); - UNIT_ASSERT(DataHas({rb}, {0, fullNumRows}, true, "message")); + UNIT_ASSERT(DataHas({ rb }, { 0, fullNumRows }, true, "message")); } else { - UNIT_ASSERT(isStrPk0 - ? DataHas({rb}, {0, fullNumRows}, true, "timestamp") - : DataHas({rb}, {0, fullNumRows}, true, "timestamp")); + UNIT_ASSERT(isStrPk0 ? DataHas({ rb }, { 0, fullNumRows }, true, "timestamp") + : DataHas({ rb }, { 0, fullNumRows }, true, "timestamp")); } } std::vector val0 = { 0 }; @@ -2351,9 +2237,9 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { std::vector val9999 = { 99999 }; std::vector val1M = { 1000000000 }; std::vector val1M_1 = { 1000000001 }; - std::vector valNumRows = {fullNumRows}; - std::vector valNumRows_1 = {fullNumRows - 1 }; - std::vector valNumRows_2 = {fullNumRows - 2 }; + std::vector valNumRows = { fullNumRows }; + std::vector valNumRows_1 = { fullNumRows - 1 }; + std::vector valNumRows_2 = { fullNumRows - 2 }; { UNIT_ASSERT(table.Pk.size() >= 2); @@ -2365,7 +2251,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { val9999 = { sameValue, 99999 }; val1M = { sameValue, 1000000000 }; val1M_1 = { sameValue, 1000000001 }; - valNumRows = { sameValue, fullNumRows}; + valNumRows = { sameValue, fullNumRows }; valNumRows_1 = { sameValue, fullNumRows - 1 }; valNumRows_2 = { sameValue, fullNumRows - 2 }; } @@ -2382,8 +2268,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { testAgent.Test("[0:1)").SetFrom(TBorder(val0, true)).SetTo(TBorder(val1, false)).SetExpectedCount(1); testAgent.Test("(0:1)").SetFrom(TBorder(val0, false)).SetTo(TBorder(val1, false)).SetExpectedCount(0); testAgent.Test("outscope1").SetFrom(TBorder(val1M, true)).SetTo(TBorder(val1M_1, true)).SetExpectedCount(0); -// VERIFIED AS INCORRECT INTERVAL (its good) -// testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TBorder(0, false)).SetExpectedCount(0); + // VERIFIED AS INCORRECT INTERVAL (its good) + // testAgent.Test("[0-0)").SetFrom(TTabletReadPredicateTest::TBorder(0, true)).SetTo(TBorder(0, false)).SetExpectedCount(0); if (isStrPk0) { testAgent.Test("(99990:").SetFrom(TBorder(val9990, false)).SetExpectedCount(109); @@ -2401,8 +2287,8 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } const TInstant start = TInstant::Now(); bool success = false; - while (!success && TInstant::Now() - start < TDuration::Seconds(30)) { // Get index stats - ScanIndexStats(runtime, sender, {tableId, 42}, NOlap::TSnapshot(planStep, txId), 0); + while (!success && TInstant::Now() - start < TDuration::Seconds(30)) { // Get index stats + ScanIndexStats(runtime, sender, { tableId, 42 }, NOlap::TSnapshot(planStep, txId), 0); auto scanInited = runtime.GrabEdgeEvent(handle); auto& msg = scanInited->Record; auto scanActorId = ActorIdFromProto(msg.GetScanActorId()); @@ -2440,11 +2326,12 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { if (!activity) { continue; } - Cerr << "[" << __LINE__ << "] " << activity << " " << table.Pk[0].GetType().GetTypeId() << " " - << pathId << " " << kindStr << " " << numRows << " " << numBytes << " " << numRawBytes << "\n"; + Cerr << "[" << __LINE__ << "] " << activity << " " << table.Pk[0].GetType().GetTypeId() << " " << pathId << " " << kindStr + << " " << numRows << " " << numBytes << " " << numRawBytes << "\n"; if (pathId == tableId) { - if (kindStr == ::ToString(NOlap::NPortion::EProduced::COMPACTED) || kindStr == ::ToString(NOlap::NPortion::EProduced::SPLIT_COMPACTED) || numBytes > (4LLU << 20)) { + if (kindStr == ::ToString(NOlap::NPortion::EProduced::COMPACTED) || + kindStr == ::ToString(NOlap::NPortion::EProduced::SPLIT_COMPACTED) || numBytes > (4LLU << 20)) { sumCompactedBytes += numBytes; sumCompactedRows += numRows; //UNIT_ASSERT(numRawBytes > numBytes); @@ -2483,7 +2370,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { pk[0].SetType(TTypeInfo(typeId)); schema[1].SetType(TTypeInfo(typeId)); pk[1].SetType(TTypeInfo(typeId)); - TestTableDescription table{.Schema = schema, .Pk = pk}; + TestTableDescription table{ .Schema = schema, .Pk = pk }; TestCompactionSplitGranuleImpl(table, opts); } @@ -2542,7 +2429,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Write some test data to advance the time { - std::pair triggerPortion = {1, 1000}; + std::pair triggerPortion = { 1, 1000 }; TString triggerData = MakeTestBlob(triggerPortion, ydbSchema); std::vector writeIds; @@ -2581,11 +2468,10 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Try to read snapshot that is too old { TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - staleness.MilliSeconds(), Max())); - reader.SetReplyColumns({"timestamp", "message"}); + reader.SetReplyColumns({ "timestamp", "message" }); reader.ReadAll(); UNIT_ASSERT(reader.IsError()); } - } void TestCompactionGC() { @@ -2648,7 +2534,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { ++compactionsHappened; TStringBuilder sb; sb << "Compaction old portions:"; - ui64 srcPathId{0}; + ui64 srcPathId{ 0 }; for (const auto& portionInfo : compact->SwitchedPortions) { const ui64 pathId = portionInfo.GetPathId(); UNIT_ASSERT(!srcPathId || srcPathId == pathId); @@ -2673,7 +2559,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } } else if (auto* msg = TryGetPrivateEvent(ev)) { { - const std::vector prefixes = {"Delay Delete Blob "}; + const std::vector prefixes = { "Delay Delete Blob " }; for (TString prefix : prefixes) { size_t pos = msg->Line.find(prefix); if (pos != TString::npos) { @@ -2716,7 +2602,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Write different keys: grow on compaction static const ui32 triggerPortionSize = 75 * 1000; - std::pair triggerPortion = {0, triggerPortionSize}; + std::pair triggerPortion = { 0, triggerPortionSize }; TString triggerData = MakeTestBlob(triggerPortion, ydbSchema); UNIT_ASSERT(triggerData.size() > NColumnShard::TLimits::MIN_BYTES_TO_INSERT); UNIT_ASSERT(triggerData.size() < NColumnShard::TLimits::GetMaxBlobSize()); @@ -2736,7 +2622,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // Do a small write that is not indexed so that we will get a committed blob in read request { - TString smallData = MakeTestBlob({0, 2}, ydbSchema); + TString smallData = MakeTestBlob({ 0, 2 }, ydbSchema); UNIT_ASSERT(smallData.size() < 100 * 1024); std::vector writeIds; UNIT_ASSERT(WriteData(runtime, sender, writeId, tableId, smallData, ydbSchema, true, &writeIds)); @@ -2751,7 +2637,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { --planStep; --txId; Cerr << compactionsHappened << Endl; -// UNIT_ASSERT_GE(compactionsHappened, 3); // we catch it three times per action + // UNIT_ASSERT_GE(compactionsHappened, 3); // we catch it three times per action ui64 previousCompactionsHappened = compactionsHappened; ui64 previousCleanupsHappened = cleanupsHappened; @@ -2760,7 +2646,7 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { // This request is expected to read at least 1 committed blob and several index portions // These committed blob and portions must not be deleted by the BlobManager until the read request finishes TShardReader reader(runtime, TTestTxConfig::TxTablet0, tableId, NOlap::TSnapshot(planStep - 1, Max())); - reader.SetReplyColumns({"timestamp", "message"}); + reader.SetReplyColumns({ "timestamp", "message" }); auto rb = reader.ReadAll(); UNIT_ASSERT(reader.IsCorrectlyFinished()); UNIT_ASSERT(CheckOrdered(rb)); @@ -2868,4 +2754,4 @@ Y_UNIT_TEST_SUITE(TColumnShardTestReadWrite) { } } -} +} // namespace NKikimr diff --git a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp index 734047952707..1f2312b1bfd6 100644 --- a/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp +++ b/ydb/core/tx/columnshard/ut_rw/ut_normalizer.cpp @@ -1,16 +1,14 @@ #include -#include - +#include #include #include -#include - #include +#include +#include -#include #include #include - +#include namespace NKikimr { @@ -34,17 +32,17 @@ struct TPortionRecord { ui32 Size = 0; }; - class TNormalizerChecker { public: - virtual ~TNormalizerChecker() {} + virtual ~TNormalizerChecker() { + } virtual ui64 RecordsCountAfterReboot(const ui64 initialRecodsCount) const { return initialRecodsCount; } }; -class TPathIdCleaner : public NYDBTest::ILocalDBModifier { +class TPathIdCleaner: public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { using namespace NColumnShard; @@ -83,33 +81,26 @@ class TPathIdCleaner : public NYDBTest::ILocalDBModifier { UNIT_ASSERT(pathId.has_value()); - for (auto&& [ portionId, key ] : portion2Key) { - db.Table().Key(key.Index, key.Granule, key.ColumnIdx, - key.PlanStep, key.TxId, key.Portion, key.Chunk).Delete(); - - db.Table().Key(key.Index, 1, key.ColumnIdx, - key.PlanStep, key.TxId, key.Portion, key.Chunk).Update( - NIceDb::TUpdate(key.XPlanStep), - NIceDb::TUpdate(key.XTxId), - NIceDb::TUpdate(key.Blob), - NIceDb::TUpdate(key.Metadata), - NIceDb::TUpdate(key.Offset), - NIceDb::TUpdate(key.Size), - - NIceDb::TNull() - ); + for (auto&& [portionId, key] : portion2Key) { + db.Table().Key(key.Index, key.Granule, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk).Delete(); + + db.Table() + .Key(key.Index, 1, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk) + .Update(NIceDb::TUpdate(key.XPlanStep), NIceDb::TUpdate(key.XTxId), + NIceDb::TUpdate(key.Blob), NIceDb::TUpdate(key.Metadata), + NIceDb::TUpdate(key.Offset), NIceDb::TUpdate(key.Size), + + NIceDb::TNull()); } - db.Table().Key(0, *pathId, "1").Update( - NIceDb::TUpdate(1), - NIceDb::TUpdate(1), - NIceDb::TUpdate(1), - NIceDb::TUpdate("") - ); + db.Table() + .Key(0, *pathId, "1") + .Update(NIceDb::TUpdate(1), NIceDb::TUpdate(1), + NIceDb::TUpdate(1), NIceDb::TUpdate("")); } }; -class TColumnChunksCleaner : public NYDBTest::ILocalDBModifier { +class TColumnChunksCleaner: public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { using namespace NColumnShard; @@ -148,21 +139,20 @@ class TColumnChunksCleaner : public NYDBTest::ILocalDBModifier { UNIT_ASSERT(pathId.has_value()); - for (auto&& key: portion2Key) { + for (auto&& key : portion2Key) { NKikimrTxColumnShard::TIndexColumnMeta metaProto; UNIT_ASSERT(metaProto.ParseFromArray(key.Metadata.data(), key.Metadata.size())); metaProto.ClearNumRows(); metaProto.ClearRawBytes(); - db.Table().Key(key.Index, key.Granule, key.ColumnIdx, - key.PlanStep, key.TxId, key.Portion, key.Chunk).Update( - NIceDb::TUpdate(metaProto.SerializeAsString()) - ); + db.Table() + .Key(key.Index, key.Granule, key.ColumnIdx, key.PlanStep, key.TxId, key.Portion, key.Chunk) + .Update(NIceDb::TUpdate(metaProto.SerializeAsString())); } } }; -class TPortionsCleaner : public NYDBTest::ILocalDBModifier { +class TPortionsCleaner: public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { using namespace NColumnShard; @@ -174,20 +164,20 @@ class TPortionsCleaner : public NYDBTest::ILocalDBModifier { UNIT_ASSERT(rowset.IsReady()); while (!rowset.EndOfSet()) { - NOlap::TPortionAddress addr(rowset.GetValue(), rowset.GetValue()); + NOlap::TPortionAddress addr( + rowset.GetValue(), rowset.GetValue()); portions.emplace_back(addr); UNIT_ASSERT(rowset.Next()); } } - for (auto&& key: portions) { + for (auto&& key : portions) { db.Table().Key(key.GetPathId(), key.GetPortionId()).Delete(); } } }; - -class TEmptyPortionsCleaner : public NYDBTest::ILocalDBModifier { +class TEmptyPortionsCleaner: public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { using namespace NColumnShard; @@ -200,8 +190,7 @@ class TEmptyPortionsCleaner : public NYDBTest::ILocalDBModifier { } }; - -class TTablesCleaner : public NYDBTest::ILocalDBModifier { +class TTablesCleaner: public NYDBTest::ILocalDBModifier { public: virtual void Apply(NTabletFlatExecutor::TTransactionContext& txc) const override { using namespace NColumnShard; @@ -219,7 +208,7 @@ class TTablesCleaner : public NYDBTest::ILocalDBModifier { } } - for (auto&& key: tables) { + for (auto&& key : tables) { db.Table().Key(key).Delete(); } @@ -244,10 +233,9 @@ class TTablesCleaner : public NYDBTest::ILocalDBModifier { } } - for (auto&& key: versions) { + for (auto&& key : versions) { db.Table().Key(key.PathId, key.Step, key.TxId).Delete(); } - } }; @@ -255,6 +243,7 @@ template class TPrepareLocalDBController: public NKikimr::NYDBTest::NColumnShard::TController { private: using TBase = NKikimr::NYDBTest::ICSController; + public: NYDBTest::ILocalDBModifier::TPtr BuildLocalBaseModifier() const override { return std::make_shared(); @@ -262,7 +251,6 @@ class TPrepareLocalDBController: public NKikimr::NYDBTest::NColumnShard::TContro }; Y_UNIT_TEST_SUITE(Normalizers) { - template void TestNormalizerImpl(const TNormalizerChecker& checker = TNormalizerChecker()) { using namespace NArrow; @@ -271,52 +259,36 @@ Y_UNIT_TEST_SUITE(Normalizers) { TTestBasicRuntime runtime; TTester::Setup(runtime); - const ui64 ownerId = 0; const ui64 tableId = 1; - const ui64 schemaVersion = 1; - const std::vector schema = { - NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)), - NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)), - NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8) ) - }; - const std::vector columnsIds = { 1, 2, 3}; + const std::vector schema = { NArrow::NTest::TTestColumn("key1", TTypeInfo(NTypeIds::Uint64)), + NArrow::NTest::TTestColumn("key2", TTypeInfo(NTypeIds::Uint64)), NArrow::NTest::TTestColumn("field", TTypeInfo(NTypeIds::Utf8)) }; + const std::vector columnsIds = { 1, 2, 3 }; PrepareTablet(runtime, tableId, schema, 2); const ui64 txId = 111; - NConstruction::IArrayBuilder::TPtr key1Column = std::make_shared>>("key1"); - NConstruction::IArrayBuilder::TPtr key2Column = std::make_shared>>("key2"); + NConstruction::IArrayBuilder::TPtr key1Column = + std::make_shared>>("key1"); + NConstruction::IArrayBuilder::TPtr key2Column = + std::make_shared>>("key2"); NConstruction::IArrayBuilder::TPtr column = std::make_shared>( "field", NConstruction::TStringPoolFiller(8, 100)); auto batch = NConstruction::TRecordBatchConstructor({ key1Column, key2Column, column }).BuildBatch(20048); - TString blobData = NArrow::SerializeBatchNoCompression(batch); - - auto evWrite = std::make_unique(NKikimrDataEvents::TEvWrite::MODE_PREPARE); - evWrite->SetTxId(txId); - ui64 payloadIndex = NEvWrite::TPayloadWriter(*evWrite).AddDataToPayload(std::move(blobData)); - evWrite->AddOperation(NKikimrDataEvents::TEvWrite::TOperation::OPERATION_REPLACE, {ownerId, tableId, schemaVersion}, columnsIds, payloadIndex, NKikimrDataEvents::FORMAT_ARROW); - - TActorId sender = runtime.AllocateEdgeActor(); - ForwardToTablet(runtime, TTestTxConfig::TxTablet0, sender, evWrite.release()); - { - TAutoPtr handle; - auto event = runtime.GrabEdgeEvent(handle); - UNIT_ASSERT(event); - UNIT_ASSERT_VALUES_EQUAL((ui64)event->Record.GetStatus(), (ui64)NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); - - PlanWriteTx(runtime, sender, NOlap::TSnapshot(11, txId)); - } + NTxUT::TShardWriter writer(runtime, TTestTxConfig::TxTablet0, tableId, 222); + AFL_VERIFY(writer.Write(batch, {1, 2, 3}, txId) == NKikimrDataEvents::TEvWriteResult::STATUS_COMPLETED); + AFL_VERIFY(writer.StartCommit(txId) == NKikimrDataEvents::TEvWriteResult::STATUS_PREPARED); + PlanWriteTx(runtime, writer.GetSender(), NOlap::TSnapshot(11, txId)); { auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); UNIT_ASSERT_VALUES_EQUAL(readResult->num_rows(), 20048); while (!csControllerGuard->GetInsertFinishedCounter().Val()) { Cerr << csControllerGuard->GetInsertStartedCounter().Val() << Endl; - Wakeup(runtime, sender, TTestTxConfig::TxTablet0); + Wakeup(runtime, writer.GetSender(), TTestTxConfig::TxTablet0); runtime.SimulateSleep(TDuration::Seconds(1)); } } - RebootTablet(runtime, TTestTxConfig::TxTablet0, sender); + RebootTablet(runtime, TTestTxConfig::TxTablet0, writer.GetSender()); { auto readResult = ReadAllAsBatch(runtime, tableId, NOlap::TSnapshot(11, txId), schema); @@ -341,7 +313,7 @@ Y_UNIT_TEST_SUITE(Normalizers) { } Y_UNIT_TEST(EmptyTablesNormalizer) { - class TLocalNormalizerChecker : public TNormalizerChecker { + class TLocalNormalizerChecker: public TNormalizerChecker { public: ui64 RecordsCountAfterReboot(const ui64) const override { return 0; @@ -352,4 +324,4 @@ Y_UNIT_TEST_SUITE(Normalizers) { } } -} // namespace NKikimr +} // namespace NKikimr