Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

timeout for shard data writing #10275

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 19 additions & 4 deletions ydb/core/tx/data_events/shard_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ namespace NKikimr::NEvWrite {
}

TShardWriter::TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite)
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx, const EModificationType mType, const bool immediateWrite,
const std::optional<TDuration> timeout
)
: ShardId(shardId)
, WritePartIdx(writePartIdx)
, TableId(tableId)
Expand All @@ -54,6 +56,7 @@ namespace NKikimr::NEvWrite {
, ActorSpan(parentSpan.BuildChildrenSpan("ShardWriter"))
, ModificationType(mType)
, ImmediateWrite(immediateWrite)
, Timeout(timeout)
{
}

Expand All @@ -71,6 +74,9 @@ namespace NKikimr::NEvWrite {

void TShardWriter::Bootstrap() {
SendWriteRequest();
if (Timeout) {
Schedule(*Timeout, new TEvents::TEvWakeup(1));
}
Become(&TShardWriter::StateMain);
}

Expand Down Expand Up @@ -138,16 +144,25 @@ namespace NKikimr::NEvWrite {
}
}

void TShardWriter::HandleTimeout(const TActorContext& /*ctx*/) {
RetryWriteRequest(false);
void TShardWriter::Handle(NActors::TEvents::TEvWakeup::TPtr& ev) {
if (ev->Get()->Tag) {
auto gPassAway = PassAwayGuard();
ExternalController->OnFail(Ydb::StatusIds::TIMEOUT, TStringBuilder()
<< "Cannot write data (TIMEOUT) into shard " << ShardId << " in longTx "
<< ExternalController->GetLongTxId().ToString());
ExternalController->GetCounters()->OnGlobalTimeout();
} else {
ExternalController->GetCounters()->OnRetryTimeout();
RetryWriteRequest(false);
}
}

bool TShardWriter::RetryWriteRequest(const bool delayed) {
if (NumRetries >= MaxRetriesPerShard) {
return false;
}
if (delayed) {
Schedule(OverloadTimeout(), new TEvents::TEvWakeup());
Schedule(OverloadTimeout(), new TEvents::TEvWakeup(0));
} else {
++NumRetries;
SendWriteRequest();
Expand Down
45 changes: 32 additions & 13 deletions ydb/core/tx/data_events/shard_writer.h
Original file line number Diff line number Diff line change
@@ -1,16 +1,17 @@
#pragma once

#include "common/modification_type.h"
#include "events.h"
#include "shards_splitter.h"

#include <ydb/library/accessor/accessor.h>
#include "common/modification_type.h"

#include <ydb/core/base/tablet_pipecache.h>
#include <ydb/core/tx/columnshard/counters/common/owner.h>
#include <ydb/core/tx/long_tx_service/public/events.h>

#include <ydb/library/accessor/accessor.h>
#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/wilson/wilson_profile_span.h>
#include <ydb/core/tx/columnshard/counters/common/owner.h>


namespace NKikimr::NEvWrite {

Expand All @@ -19,6 +20,7 @@ class TWriteIdForShard {
YDB_READONLY(ui64, ShardId, 0);
YDB_READONLY(ui64, WriteId, 0);
YDB_READONLY(ui64, WritePartId, 0);

public:
TWriteIdForShard() = default;
TWriteIdForShard(const ui64 shardId, const ui64 writeId, const ui32 writePartId)
Expand All @@ -40,6 +42,9 @@ class TCSUploadCounters: public NColumnShard::TCommonCountersOwner {
NMonitoring::TDynamicCounters::TCounterPtr RowsCount;
NMonitoring::TDynamicCounters::TCounterPtr BytesCount;
NMonitoring::TDynamicCounters::TCounterPtr FailsCount;
NMonitoring::TDynamicCounters::TCounterPtr GlobalTimeoutCount;
NMonitoring::TDynamicCounters::TCounterPtr RetryTimeoutCount;

public:
TCSUploadCounters()
: TBase("CSUpload")
Expand All @@ -51,8 +56,18 @@ class TCSUploadCounters: public NColumnShard::TCommonCountersOwner {
, RowsDistribution(TBase::GetHistogram("Requests/Rows", NMonitoring::ExponentialHistogram(15, 2, 16)))
, RowsCount(TBase::GetDeriviative("Rows"))
, BytesCount(TBase::GetDeriviative("Bytes"))
, FailsCount(TBase::GetDeriviative("Fails")) {
, FailsCount(TBase::GetDeriviative("Fails"))
, GlobalTimeoutCount(TBase::GetDeriviative("GlobalTimeouts"))
, RetryTimeoutCount(TBase::GetDeriviative("RetryTimeouts"))
{
}

void OnGlobalTimeout() const {
GlobalTimeoutCount->Inc();
}

void OnRetryTimeout() const {
RetryTimeoutCount->Inc();
}

void OnRequest(const ui64 rows, const ui64 bytes) const {
Expand Down Expand Up @@ -110,6 +125,7 @@ class TWritersController {
LongTxActorId.Send(NLongTxService::MakeLongTxServiceID(LongTxActorId.NodeId()), req.Release());
}
}

public:
using TPtr = std::shared_ptr<TWritersController>;

Expand All @@ -131,10 +147,10 @@ class TWritersController {
, Issues(issues) {
}
};

};

TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId, const bool immediateWrite);
TWritersController(const ui32 writesCount, const NActors::TActorIdentity& longTxActorId, const NLongTxService::TLongTxId& longTxId,
const bool immediateWrite);
void OnSuccess(const ui64 shardId, const ui64 writeId, const ui32 writePartId);
void OnFail(const Ydb::StatusIds::StatusCode code, const TString& message);
};
Expand All @@ -158,40 +174,43 @@ class TShardWriter: public NActors::TActorBootstrapped<TShardWriter> {
NWilson::TProfileSpan ActorSpan;
EModificationType ModificationType;
const bool ImmediateWrite = false;
const std::optional<TDuration> Timeout;

void SendWriteRequest();
static TDuration OverloadTimeout() {
return TDuration::MilliSeconds(OverloadedDelayMs);
}
void SendToTablet(THolder<IEventBase> event) {
Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), ShardId, true),
IEventHandle::FlagTrackDelivery, 0, ActorSpan.GetTraceId());
Send(LeaderPipeCache, new TEvPipeCache::TEvForward(event.Release(), ShardId, true), IEventHandle::FlagTrackDelivery, 0,
ActorSpan.GetTraceId());
}
virtual void PassAway() override {
Send(LeaderPipeCache, new TEvPipeCache::TEvUnlink(0));
TBase::PassAway();
}

public:
TShardWriter(const ui64 shardId, const ui64 tableId, const ui64 schemaVersion, const TString& dedupId, const IShardInfo::TPtr& data,
const NWilson::TProfileSpan& parentSpan, TWritersController::TPtr externalController, const ui32 writePartIdx,
const EModificationType mType, const bool immediateWrite);
const EModificationType mType, const bool immediateWrite, const std::optional<TDuration> timeout = std::nullopt);

STFUNC(StateMain) {
switch (ev->GetTypeRewrite()) {
hFunc(TEvColumnShard::TEvWriteResult, Handle);
hFunc(TEvPipeCache::TEvDeliveryProblem, Handle);
hFunc(NEvents::TDataEvents::TEvWriteResult, Handle);
CFunc(TEvents::TSystem::Wakeup, HandleTimeout);
hFunc(NActors::TEvents::TEvWakeup, Handle);
}
}

void Bootstrap();

void Handle(NActors::TEvents::TEvWakeup::TPtr& ev);
void Handle(TEvColumnShard::TEvWriteResult::TPtr& ev);
void Handle(TEvPipeCache::TEvDeliveryProblem::TPtr& ev);
void Handle(NEvents::TDataEvents::TEvWriteResult::TPtr& ev);
void HandleTimeout(const TActorContext& ctx);

private:
bool RetryWriteRequest(const bool delayed = true);
};
}
} // namespace NKikimr::NEvWrite
19 changes: 10 additions & 9 deletions ydb/core/tx/tx_proxy/rpc_long_tx.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

#include <ydb/core/formats/arrow/size_calcer.h>
#include <ydb/core/tx/columnshard/columnshard.h>
#include <ydb/core/tx/columnshard/counters/common/object_counter.h>
#include <ydb/core/tx/data_events/shard_writer.h>
#include <ydb/core/tx/long_tx_service/public/events.h>
#include <ydb/core/tx/schemeshard/schemeshard.h>
Expand All @@ -21,7 +22,8 @@ using namespace NLongTxService;
// Common logic of LongTx Write that takes care of splitting the data according to the sharding scheme,
// sending it to shards and collecting their responses
template <class TLongTxWriteImpl>
class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl>,
NColumnShard::TMonitoringObjectsCounter<TLongTxWriteBase<TLongTxWriteImpl>> {
using TBase = TActorBootstrapped<TLongTxWriteImpl>;
static inline TAtomicCounter MemoryInFlight = 0;

Expand All @@ -37,8 +39,7 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
, Path(path)
, DedupId(dedupId)
, LongTxId(longTxId)
, ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max<ui32>()), "TLongTxWriteBase")
{
, ActorSpan(0, NWilson::TTraceId::NewTraceId(0, Max<ui32>()), "TLongTxWriteBase") {
if (token) {
UserToken.emplace(token);
}
Expand Down Expand Up @@ -95,7 +96,8 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
accessor.reset();

const auto& splittedData = shardsSplitter->GetSplitData();
InternalController = std::make_shared<NEvWrite::TWritersController>(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite);
InternalController =
std::make_shared<NEvWrite::TWritersController>(splittedData.GetShardRequestsCount(), this->SelfId(), LongTxId, NoTxWrite);
ui32 sumBytes = 0;
ui32 rowsCount = 0;
ui32 writeIdx = 0;
Expand All @@ -104,9 +106,9 @@ class TLongTxWriteBase: public TActorBootstrapped<TLongTxWriteImpl> {
InternalController->GetCounters()->OnRequest(shardInfo->GetRowsCount(), shardInfo->GetBytes());
sumBytes += shardInfo->GetBytes();
rowsCount += shardInfo->GetRowsCount();
this->Register(new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo,
ActorSpan, InternalController,
++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite));
this->Register(
new NEvWrite::TShardWriter(shard, shardsSplitter->GetTableId(), shardsSplitter->GetSchemaVersion(), DedupId, shardInfo,
ActorSpan, InternalController, ++writeIdx, NEvWrite::EModificationType::Replace, NoTxWrite, TDuration::Seconds(20)));
}
}
pSpan.Attribute("affected_shards_count", (long)splittedData.GetShardsInfo().size());
Expand Down Expand Up @@ -235,8 +237,7 @@ class TLongTxWriteInternal: public TLongTxWriteBase<TLongTxWriteInternal> {
, ReplyTo(replyTo)
, NavigateResult(navigateResult)
, Batch(batch)
, Issues(issues)
{
, Issues(issues) {
Y_ABORT_UNLESS(Issues);
DataAccessor = std::make_unique<TParsedBatchData>(Batch);
}
Expand Down
Loading