Skip to content

Commit

Permalink
timeout for shard data writing (ydb-platform#10275)
Browse files Browse the repository at this point in the history
  • Loading branch information
ivanmorozov333 authored and zverevgeny committed Jan 5, 2025
1 parent a1ac501 commit 191bce6
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 26 deletions.
23 changes: 19 additions & 4 deletions ydb/core/tx/data_events/shard_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ namespace NKikimr::NEvWrite {
}

TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite)
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite,
const std::optional<TDuration> timeout
)
: ShardId(shardId)
, WritePartIdx(writePartIdx)
, TableId(tableId)
Expand All @@ -54,6 +56,7 @@ namespace NKikimr::NEvWrite {
, ActorSpan(parentSpan.BuildChildrenSpan("ShardWriter"))
, ModificationType(mType)
, ImmediateWrite(immediateWrite)
, Timeout(timeout)
{
}

Expand All @@ -71,6 +74,9 @@ namespace NKikimr::NEvWrite {

void TShardWriter::Bootstrap() {
SendWriteRequest();
if (Timeout) {
Schedule(*Timeout, new TEvents::TEvWakeup(1));
}
Become(&TShardWriter::StateMain);
}

Expand Down Expand Up @@ -138,16 +144,25 @@ namespace NKikimr::NEvWrite {
}
}

void TShardWriter::HandleTimeout(const TActorContext& /*ctx*/) {
RetryWriteRequest(false);
void TShardWriter::Handle(NActors::TEvents::TEvWakeup::TPtr& ev) {
if (ev->Get()->Tag) {
auto gPassAway = PassAwayGuard();
ExternalController->OnFail(Ydb::StatusIds::TIMEOUT, TStringBuilder()
<< "Cannot write data (TIMEOUT) into shard " << ShardId << " in longTx "
<< ExternalController->GetLongTxId().ToString());
ExternalController->GetCounters()->OnGlobalTimeout();
} else {
ExternalController->GetCounters()->OnRetryTimeout();
RetryWriteRequest(false);
}
}

bool TShardWriter::RetryWriteRequest(const bool delayed) {
if (NumRetries >= MaxRetriesPerShard) {
return false;
}
if (delayed) {
Schedule(OverloadTimeout(), new TEvents::TEvWakeup());
Schedule(OverloadTimeout(), new TEvents::TEvWakeup(0));
} else {
++NumRetries;
SendWriteRequest();
Expand Down
45 changes: 32 additions & 13 deletions ydb/core/tx/data_events/shard_writer.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#pragma once

#include "common/modification_type.h"
#include "events.h"
#include "shards_splitter.h"

#include <ydb/library/accessor/accessor.h>
#include "common/modification_type.h"

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tx/columnshard/counters/common/owner.h>
#include <ydb/core/tx/long_tx_service/public/events.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/wilson/wilson_profile_span.h>
#include <ydb/core/tx/columnshard/counters/common/owner.h>


namespace NKikimr::NEvWrite {

Expand All @@ -19,6 +20,7 @@ class TWriteIdForShard {
YDB_READONLY(ui64, ShardId, 0);
YDB_READONLY(ui64, WriteId, 0);
YDB_READONLY(ui64, WritePartId, 0);

public:
TWriteIdForShard() = default;
TWriteIdForShard(const ui64 shardId, const ui64 writeId, const ui32 writePartId)
Expand All @@ -40,6 +42,9 @@ class TCSUploadCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
NMonitoring::TDynamicCounters::TCounterPtr BytesCount;
NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
NMonitoring::TDynamicCounters::TCounterPtr GlobalTimeoutCount;
NMonitoring::TDynamicCounters::TCounterPtr RetryTimeoutCount;

public:
TCSUploadCounters()
: TBase("CSUpload")
Expand All @@ -51,8 +56,18 @@ class TCSUploadCounters: public NColumnShard::TCommonCountersOwner {
, RowsDistribution(TBase::GetHistogram("Requests/Rows", NMonitoring::ExponentialHistogram(15, 2, 16)))
, RowsCount(TBase::GetDeriviative("Rows"))
, BytesCount(TBase::GetDeriviative("Bytes"))
, FailsCount(TBase::GetDeriviative("Fails")) {
, FailsCount(TBase::GetDeriviative("Fails"))
, GlobalTimeoutCount(TBase::GetDeriviative("GlobalTimeouts"))
, RetryTimeoutCount(TBase::GetDeriviative("RetryTimeouts"))
{
}

void OnGlobalTimeout() const {
GlobalTimeoutCount->Inc();
}

void OnRetryTimeout() const {
RetryTimeoutCount->Inc();
}

void OnRequest(const ui64 rows, const ui64 bytes) const {
Expand Down Expand Up @@ -110,6 +125,7 @@ class TWritersController {
LongTxActorId.Send(NLongTxService::MakeLongTxServiceID(LongTxActorId.NodeId()), req.Release());
}
}

public:
using TPtr = std::shared_ptr<TWritersController>;

Expand All @@ -131,10 +147,10 @@ class TWritersController {
, Issues(issues) {
}
};

};

TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite);
TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId,
const bool immediateWrite);
void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId);
void OnFail(const Ydb::StatusIds::StatusCode code, const TString& message);
};
Expand All @@ -158,40 +174,43 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {
NWilson::TProfileSpan ActorSpan;
EModificationType ModificationType;
const bool ImmediateWrite = false;
const std::optional<TDuration> Timeout;

void SendWriteRequest();
static TDuration OverloadTimeout() {
return TDuration::MilliSeconds(OverloadedDelayMs);
}
void SendToTablet(THolder<IEventBase> event) {
Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), ShardId, true),
IEventHandle::FlagTrackDelivery, 0, ActorSpan.GetTraceId());
Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), ShardId, true), IEventHandle::FlagTrackDelivery, 0,
ActorSpan.GetTraceId());
}
virtual void PassAway() override {
Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0));
TBase::PassAway();
}

public:
TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx,
const EModificationType mType, const bool immediateWrite);
const EModificationType mType, const bool immediateWrite, const std::optional<TDuration> timeout = std::nullopt);

STFUNC(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvColumnShard::TEvWriteResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(NEvents::TDataEvents::TEvWriteResult, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
hFunc(NActors::TEvents::TEvWakeup, Handle);
}
}

void Bootstrap();

void Handle(NActors::TEvents::TEvWakeup::TPtr& ev);
void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev);
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev);
void Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev);
void HandleTimeout(const TActorContext& ctx);

private:
bool RetryWriteRequest(const bool delayed = true);
};
}
} // namespace NKikimr::NEvWrite
19 changes: 10 additions & 9 deletions ydb/core/tx/tx_proxy/rpc_long_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/data_events/shard_writer.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
Expand All @@ -21,7 +22,8 @@ using namespace NLongTxService;
// Common logic of LongTx Write that takes care of splitting the data according to the sharding scheme,
// sending it to shards and collecting their responses
template <class TLongTxWriteImpl>
class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl>,
NColumnShard::TMonitoringObjectsCounter<TLongTxWriteBase<TLongTxWriteImpl>> {
using TBase = TActorBootstrapped<TLongTxWriteImpl>;
static inline TAtomicCounter MemoryInFlight = 0;

Expand All @@ -37,8 +39,7 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
, Path(path)
, DedupId(dedupId)
, LongTxId(longTxId)
, ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max<ui32>()), "TLongTxWriteBase")
{
, ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max<ui32>()), "TLongTxWriteBase") {
if (token) {
UserToken.emplace(token);
}
Expand Down Expand Up @@ -95,7 +96,8 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
accessor.reset();

const auto& splittedData = shardsSplitter->GetSplitData();
InternalController = std::make_shared<NEvWrite::TWritersController>(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite);
InternalController =
std::make_shared<NEvWrite::TWritersController>(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite);
ui32 sumBytes = 0;
ui32 rowsCount = 0;
ui32 writeIdx = 0;
Expand All @@ -104,9 +106,9 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
InternalController->GetCounters()->OnRequest(shardInfo->GetRowsCount(), shardInfo->GetBytes());
sumBytes += shardInfo->GetBytes();
rowsCount += shardInfo->GetRowsCount();
this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo,
ActorSpan, InternalController,
++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite));
this->Register(
new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo,
ActorSpan, InternalController, ++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite, TDuration::Seconds(20)));
}
}
pSpan.Attribute("affected_shards_count", (long)splittedData.GetShardsInfo().size());
Expand Down Expand Up @@ -235,8 +237,7 @@ class TLongTxWriteInternal: public TLongTxWriteBase<TLongTxWriteInternal> {
, ReplyTo(replyTo)
, NavigateResult(navigateResult)
, Batch(batch)
, Issues(issues)
{
, Issues(issues) {
Y_ABORT_UNLESS(Issues);
DataAccessor = std::make_unique<TParsedBatchData>(Batch);
}
Expand Down

0 comments on commit 191bce6

Please sign in to comment.