From 202cb01a6c38df8a4d362f5c6033ce8c27ab35d8 Mon Sep 17 00:00:00 2001 From: Dmitry Kardymon Date: Mon, 14 Oct 2024 08:40:13 +0300 Subject: [PATCH] YQ-3742 Shared reading: remove assert (#10322) --- .../pq/async_io/dq_pq_rd_read_actor.cpp | 76 +++++++++++++------ .../pq/provider/yql_pq_dq_integration.cpp | 2 +- .../pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp | 6 ++ 3 files changed, 59 insertions(+), 25 deletions(-) diff --git a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp index 37890476d5a6..18c3a48028a3 100644 --- a/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp +++ b/ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp @@ -24,7 +24,7 @@ #include #include -#include +#include #include #include #include @@ -94,16 +94,19 @@ struct TEvPrivate { enum EEv : ui32 { EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), EvPrintState = EvBegin + 20, + EvProcessState = EvBegin + 21, EvEnd }; static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)"); struct TEvPrintState : public NActors::TEventLocal {}; + struct TEvProcessState : public NActors::TEventLocal {}; }; -ui64 PrintStatePeriodSec = 60; - class TDqPqRdReadActor : public NActors::TActor, public NYql::NDq::NInternal::TDqPqReadActorBase { -public: + + const ui64 PrintStatePeriodSec = 60; + const ui64 ProcessStatePeriodSec = 2; + using TDebugOffsets = TMaybe>; struct TReadyBatch { @@ -136,6 +139,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: ui64 CoordinatorRequestCookie = 0; TRowDispatcherReadActorMetrics Metrics; bool SchedulePrintStatePeriod = false; + bool ProcessStateScheduled = false; struct SessionInfo { enum class ESessionStatus { @@ -193,6 +197,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: void Handle(NActors::TEvents::TEvPong::TPtr& ev); void Handle(const NActors::TEvents::TEvPing::TPtr&); void Handle(TEvPrivate::TEvPrintState::TPtr&); + void Handle(TEvPrivate::TEvProcessState::TPtr&); STRICT_STFUNC(StateFunc, { hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, Handle); @@ -212,6 +217,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle); hFunc(NActors::TEvents::TEvPing, Handle); hFunc(TEvPrivate::TEvPrintState, Handle); + hFunc(TEvPrivate::TEvProcessState, Handle); }) static constexpr char ActorName[] = "DQ_PQ_READ_ACTOR"; @@ -224,7 +230,7 @@ class TDqPqRdReadActor : public NActors::TActor, public NYql:: void ProcessState(); void Stop(const TString& message); void StopSessions(); - void ReInit(); + void ReInit(const TString& reason); void PrintInternalState(); }; @@ -262,6 +268,10 @@ void TDqPqRdReadActor::ProcessState() { if (!ReadyBuffer.empty()) { return; } + if (!ProcessStateScheduled) { + ProcessStateScheduled = true; + Schedule(TDuration::Seconds(ProcessStatePeriodSec), new TEvPrivate::TEvProcessState()); + } if (!CoordinatorActorId) { SRC_LOG_D("Send TEvCoordinatorChangesSubscribe to local row dispatcher, self id " << SelfId()); Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe()); @@ -401,7 +411,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& e ui64 partitionId = ev->Get()->Record.GetConsumer().GetPartitionId(); auto sessionIt = Sessions.find(partitionId); - YQL_ENSURE(sessionIt != Sessions.end(), "Unknown partition id"); + if (sessionIt == Sessions.end()) { + SRC_LOG_W("Ignore TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo() + << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId); + YQL_ENSURE(State != EState::STARTED); + return; + } auto& sessionInfo = sessionIt->second; if (!sessionInfo.EventsQueue.OnEventReceived(ev)) { SRC_LOG_W("Wrong seq num ignore message, seqNo " << meta.GetSeqNo()); @@ -415,7 +430,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) ui64 partitionId = ev->Get()->Record.GetPartitionId(); auto sessionIt = Sessions.find(partitionId); - YQL_ENSURE(sessionIt != Sessions.end(), "Unknown partition id"); + if (sessionIt == Sessions.end()) { + SRC_LOG_W("Ignore TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo() + << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId); + YQL_ENSURE(State != EState::STARTED); + return; + } auto& sessionInfo = sessionIt->second; if (!sessionInfo.EventsQueue.OnEventReceived(ev)) { @@ -431,7 +451,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatus::TPtr& ev) { ui64 partitionId = ev->Get()->Record.GetPartitionId(); auto sessionIt = Sessions.find(partitionId); - YQL_ENSURE(sessionIt != Sessions.end(), "Unknown partition id"); + if (sessionIt == Sessions.end()) { + SRC_LOG_W("Ignore TEvStatus from " << ev->Sender << ", seqNo " << meta.GetSeqNo() + << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId); + YQL_ENSURE(State != EState::STARTED); + return; + } auto& sessionInfo = sessionIt->second; if (!sessionInfo.EventsQueue.OnEventReceived(ev)) { @@ -452,7 +477,9 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev ui64 partitionId = ev->Get()->Record.GetPartitionId(); auto sessionIt = Sessions.find(partitionId); if (sessionIt == Sessions.end()) { - Stop("Internal error: unknown partition id " + ToString(partitionId)); + SRC_LOG_W("Ignore TEvNewDataArrived from " << ev->Sender << ", seqNo " << meta.GetSeqNo() + << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId); + YQL_ENSURE(State != EState::STARTED); return; } @@ -512,20 +539,18 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr } CoordinatorActorId = ev->Get()->CoordinatorActorId; - SRC_LOG_I("Coordinator is changed, reinit all sessions"); - ReInit(); + ReInit("Coordinator is changed"); ProcessState(); } -void TDqPqRdReadActor::ReInit() { - SRC_LOG_I("ReInit state"); +void TDqPqRdReadActor::ReInit(const TString& reason) { + SRC_LOG_I("ReInit state, reason " << reason); StopSessions(); Sessions.clear(); State = EState::INIT; if (!ReadyBuffer.empty()) { Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex)); } - ProcessState(); } void TDqPqRdReadActor::Stop(const TString& message) { @@ -582,8 +607,7 @@ void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { } if (CoordinatorActorId && *CoordinatorActorId == ev->Sender) { - SRC_LOG_D("TEvUndelivered to coordinator, reinit"); - ReInit(); + ReInit("TEvUndelivered to coordinator"); } } @@ -591,15 +615,15 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta(); SRC_LOG_T("TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo()); ui64 partitionId = ev->Get()->Record.GetPartitionId(); - YQL_ENSURE(Sessions.count(partitionId), "Unknown partition id " << partitionId); - auto it = Sessions.find(partitionId); - if (it == Sessions.end()) { - Stop("Wrong session data"); - return; + auto sessionIt = Sessions.find(partitionId); + if (sessionIt == Sessions.end()) { + SRC_LOG_W("Ignore TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() + << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId); + YQL_ENSURE(State != EState::STARTED); } Metrics.InFlyGetNextBatch->Set(0); - auto& sessionInfo = it->second; + auto& sessionInfo = sessionIt->second; if (!sessionInfo.EventsQueue.OnEventReceived(ev)) { SRC_LOG_W("Wrong seq num ignore message, seqNo " << meta.GetSeqNo()); return; @@ -631,8 +655,7 @@ std::pair TDqPqRdReadActor::CreateItem(const TStrin } void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) { - SRC_LOG_D("Session closed, event queue id " << ev->Get()->EventQueueId); - ReInit(); + ReInit(TStringBuilder() << "Session closed, event queue id " << ev->Get()->EventQueueId); } void TDqPqRdReadActor::Handle(NActors::TEvents::TEvPong::TPtr& ev) { @@ -654,6 +677,11 @@ void TDqPqRdReadActor::PrintInternalState() { SRC_LOG_D(str.Str()); } +void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) { + Schedule(TDuration::Seconds(ProcessStatePeriodSec), new TEvPrivate::TEvProcessState()); + ProcessState(); +} + std::pair CreateDqPqRdReadActor( NPq::NProto::TDqPqTopicSource&& settings, ui64 inputIndex, diff --git a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp index b6720a9f6b23..3a305edaf59d 100644 --- a/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp +++ b/ydb/library/yql/providers/pq/provider/yql_pq_dq_integration.cpp @@ -268,7 +268,7 @@ class TPqDqIntegration: public TDqIntegrationBase { } } - sharedReading = sharedReading && (format == "json_each_row" || (format == "raw")); + sharedReading = sharedReading && (format == "json_each_row" || format == "raw"); TString predicateSql = NYql::FormatWhere(predicateProto); if (sharedReading) { if (format == "json_each_row") { diff --git a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp index 350c0bd5b40c..230aa2c8d20c 100644 --- a/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp +++ b/ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp @@ -355,5 +355,11 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) { ProcessSomeJsons(2, {Json3}, RowDispatcher1); } + + Y_UNIT_TEST_F(IgnoreMessageIfNoSessions, TFixture) { + StartSession(); + MockCoordinatorChanged(Coordinator2Id); + MockSessionError(); + } } } // NYql::NDq