From daccbe2baedd912120539c51ce155c33274130ca Mon Sep 17 00:00:00 2001 From: ivanmorozov333 Date: Thu, 10 Oct 2024 12:14:11 +0300 Subject: [PATCH] timeout for shard data writing --- ydb/core/tx/data_events/shard_writer.cpp | 23 +++++++++--- ydb/core/tx/data_events/shard_writer.h | 45 +++++++++++++++++------- ydb/core/tx/tx_proxy/rpc_long_tx.cpp | 19 +++++----- 3 files changed, 61 insertions(+), 26 deletions(-) diff --git a/ydb/core/tx/data_events/shard_writer.cpp b/ydb/core/tx/data_events/shard_writer.cpp index 7d7f69ad067e..d4d38101e6be 100644 --- a/ydb/core/tx/data_events/shard_writer.cpp +++ b/ydb/core/tx/data_events/shard_writer.cpp @@ -42,7 +42,9 @@ namespace NKikimr::NEvWrite { } TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data, - const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite) + const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite, + const std::optional timeout + ) : ShardId(shardId) , WritePartIdx(writePartIdx) , TableId(tableId) @@ -54,6 +56,7 @@ namespace NKikimr::NEvWrite { , ActorSpan(parentSpan.BuildChildrenSpan("ShardWriter")) , ModificationType(mType) , ImmediateWrite(immediateWrite) + , Timeout(timeout) { } @@ -71,6 +74,9 @@ namespace NKikimr::NEvWrite { void TShardWriter::Bootstrap() { SendWriteRequest(); + if (Timeout) { + Schedule(*Timeout, new TEvents::TEvWakeup(1)); + } Become(&TShardWriter::StateMain); } @@ -138,8 +144,17 @@ namespace NKikimr::NEvWrite { } } - void TShardWriter::HandleTimeout(const TActorContext& /*ctx*/) { - RetryWriteRequest(false); + void TShardWriter::Handle(NActors::TEvents::TEvWakeup::TPtr& ev) { + if (ev->Get()->Tag) { + auto gPassAway = PassAwayGuard(); + ExternalController->OnFail(Ydb::StatusIds::TIMEOUT, TStringBuilder() + << "Cannot write data (TIMEOUT) into shard " << ShardId << " in longTx " + << ExternalController->GetLongTxId().ToString()); + ExternalController->GetCounters()->OnGlobalTimeout(); + } else { + ExternalController->GetCounters()->OnRetryTimeout(); + RetryWriteRequest(false); + } } bool TShardWriter::RetryWriteRequest(const bool delayed) { @@ -147,7 +162,7 @@ namespace NKikimr::NEvWrite { return false; } if (delayed) { - Schedule(OverloadTimeout(), new TEvents::TEvWakeup()); + Schedule(OverloadTimeout(), new TEvents::TEvWakeup(0)); } else { ++NumRetries; SendWriteRequest(); diff --git a/ydb/core/tx/data_events/shard_writer.h b/ydb/core/tx/data_events/shard_writer.h index 323bffa5056f..57c64c786f9b 100644 --- a/ydb/core/tx/data_events/shard_writer.h +++ b/ydb/core/tx/data_events/shard_writer.h @@ -1,16 +1,17 @@ #pragma once -#include "common/modification_type.h" #include "events.h" #include "shards_splitter.h" -#include +#include "common/modification_type.h" + #include +#include #include + +#include #include #include -#include - namespace NKikimr::NEvWrite { @@ -19,6 +20,7 @@ class TWriteIdForShard { YDB_READONLY(ui64, ShardId, 0); YDB_READONLY(ui64, WriteId, 0); YDB_READONLY(ui64, WritePartId, 0); + public: TWriteIdForShard() = default; TWriteIdForShard(const ui64 shardId, const ui64 writeId, const ui32 writePartId) @@ -40,6 +42,9 @@ class TCSUploadCounters: public NColumnShard::TCommonCountersOwner { NMonitoring::TDynamicCounters::TCounterPtr RowsCount; NMonitoring::TDynamicCounters::TCounterPtr BytesCount; NMonitoring::TDynamicCounters::TCounterPtr FailsCount; + NMonitoring::TDynamicCounters::TCounterPtr GlobalTimeoutCount; + NMonitoring::TDynamicCounters::TCounterPtr RetryTimeoutCount; + public: TCSUploadCounters() : TBase("CSUpload") @@ -51,8 +56,18 @@ class TCSUploadCounters: public NColumnShard::TCommonCountersOwner { , RowsDistribution(TBase::GetHistogram("Requests/Rows", NMonitoring::ExponentialHistogram(15, 2, 16))) , RowsCount(TBase::GetDeriviative("Rows")) , BytesCount(TBase::GetDeriviative("Bytes")) - , FailsCount(TBase::GetDeriviative("Fails")) { + , FailsCount(TBase::GetDeriviative("Fails")) + , GlobalTimeoutCount(TBase::GetDeriviative("GlobalTimeouts")) + , RetryTimeoutCount(TBase::GetDeriviative("RetryTimeouts")) + { + } + + void OnGlobalTimeout() const { + GlobalTimeoutCount->Inc(); + } + void OnRetryTimeout() const { + RetryTimeoutCount->Inc(); } void OnRequest(const ui64 rows, const ui64 bytes) const { @@ -110,6 +125,7 @@ class TWritersController { LongTxActorId.Send(NLongTxService::MakeLongTxServiceID(LongTxActorId.NodeId()), req.Release()); } } + public: using TPtr = std::shared_ptr; @@ -131,10 +147,10 @@ class TWritersController { , Issues(issues) { } }; - }; - TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite); + TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, + const bool immediateWrite); void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId); void OnFail(const Ydb::StatusIds::StatusCode code, const TString& message); }; @@ -158,40 +174,43 @@ class TShardWriter: public NActors::TActorBootstrapped { NWilson::TProfileSpan ActorSpan; EModificationType ModificationType; const bool ImmediateWrite = false; + const std::optional Timeout; void SendWriteRequest(); static TDuration OverloadTimeout() { return TDuration::MilliSeconds(OverloadedDelayMs); } void SendToTablet(THolder event) { - Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), ShardId, true), - IEventHandle::FlagTrackDelivery, 0, ActorSpan.GetTraceId()); + Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), ShardId, true), IEventHandle::FlagTrackDelivery, 0, + ActorSpan.GetTraceId()); } virtual void PassAway() override { Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0)); TBase::PassAway(); } + public: TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data, const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, - const EModificationType mType, const bool immediateWrite); + const EModificationType mType, const bool immediateWrite, const std::optional timeout = std::nullopt); STFUNC(StateMain) { switch (ev->GetTypeRewrite()) { hFunc(TEvColumnShard::TEvWriteResult, Handle); hFunc(TEvPipeCache::TEvDeliveryProblem, Handle); hFunc(NEvents::TDataEvents::TEvWriteResult, Handle); - CFunc(TEvents::TSystem::Wakeup, HandleTimeout); + hFunc(NActors::TEvents::TEvWakeup, Handle); } } void Bootstrap(); + void Handle(NActors::TEvents::TEvWakeup::TPtr& ev); void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev); void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev); void Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev); - void HandleTimeout(const TActorContext& ctx); + private: bool RetryWriteRequest(const bool delayed = true); }; -} +} // namespace NKikimr::NEvWrite diff --git a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp index 11650948fe93..aaefb6e56459 100644 --- a/ydb/core/tx/tx_proxy/rpc_long_tx.cpp +++ b/ydb/core/tx/tx_proxy/rpc_long_tx.cpp @@ -2,6 +2,7 @@ #include #include +#include #include #include #include @@ -21,7 +22,8 @@ using namespace NLongTxService; // Common logic of LongTx Write that takes care of splitting the data according to the sharding scheme, // sending it to shards and collecting their responses template -class TLongTxWriteBase: public TActorBootstrapped { +class TLongTxWriteBase: public TActorBootstrapped, + NColumnShard::TMonitoringObjectsCounter> { using TBase = TActorBootstrapped; static inline TAtomicCounter MemoryInFlight = 0; @@ -37,8 +39,7 @@ class TLongTxWriteBase: public TActorBootstrapped { , Path(path) , DedupId(dedupId) , LongTxId(longTxId) - , ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max()), "TLongTxWriteBase") - { + , ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max()), "TLongTxWriteBase") { if (token) { UserToken.emplace(token); } @@ -95,7 +96,8 @@ class TLongTxWriteBase: public TActorBootstrapped { accessor.reset(); const auto& splittedData = shardsSplitter->GetSplitData(); - InternalController = std::make_shared(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite); + InternalController = + std::make_shared(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite); ui32 sumBytes = 0; ui32 rowsCount = 0; ui32 writeIdx = 0; @@ -104,9 +106,9 @@ class TLongTxWriteBase: public TActorBootstrapped { InternalController->GetCounters()->OnRequest(shardInfo->GetRowsCount(), shardInfo->GetBytes()); sumBytes += shardInfo->GetBytes(); rowsCount += shardInfo->GetRowsCount(); - this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo, - ActorSpan, InternalController, - ++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite)); + this->Register( + new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo, + ActorSpan, InternalController, ++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite, TDuration::Seconds(20))); } } pSpan.Attribute("affected_shards_count", (long)splittedData.GetShardsInfo().size()); @@ -235,8 +237,7 @@ class TLongTxWriteInternal: public TLongTxWriteBase { , ReplyTo(replyTo) , NavigateResult(navigateResult) , Batch(batch) - , Issues(issues) - { + , Issues(issues) { Y_ABORT_UNLESS(Issues); DataAccessor = std::make_unique(Batch); }