Skip to content

Commit

Permalink
YQ-3742 Shared reading: remove assert (ydb-platform#10322)
Browse files Browse the repository at this point in the history
  • Loading branch information
kardymonds committed Oct 16, 2024
1 parent e9d53b1 commit 202cb01
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 25 deletions.
76 changes: 52 additions & 24 deletions ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
#include <ydb/public/sdk/cpp/client/ydb_topic/topic.h>
#include <ydb/public/sdk/cpp/client/ydb_types/credentials/credentials.h>

#include <ydb/library/actors/core/actor_bootstrapped.h>
#include <ydb/library/actors/core/actor.h>
#include <ydb/library/actors/core/event_local.h>
#include <ydb/library/actors/core/events.h>
#include <ydb/library/actors/core/hfunc.h>
Expand Down Expand Up @@ -94,16 +94,19 @@ struct TEvPrivate {
enum EEv : ui32 {
EvBegin = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
EvPrintState = EvBegin + 20,
EvProcessState = EvBegin + 21,
EvEnd
};
static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), "expect EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE)");
struct TEvPrintState : public NActors::TEventLocal<TEvPrintState, EvPrintState> {};
struct TEvProcessState : public NActors::TEventLocal<TEvProcessState, EvProcessState> {};
};

ui64 PrintStatePeriodSec = 60;

class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::NDq::NInternal::TDqPqReadActorBase {
public:

const ui64 PrintStatePeriodSec = 60;
const ui64 ProcessStatePeriodSec = 2;

using TDebugOffsets = TMaybe<std::pair<ui64, ui64>>;

struct TReadyBatch {
Expand Down Expand Up @@ -136,6 +139,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
ui64 CoordinatorRequestCookie = 0;
TRowDispatcherReadActorMetrics Metrics;
bool SchedulePrintStatePeriod = false;
bool ProcessStateScheduled = false;

struct SessionInfo {
enum class ESessionStatus {
Expand Down Expand Up @@ -193,6 +197,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
void Handle(NActors::TEvents::TEvPong::TPtr& ev);
void Handle(const NActors::TEvents::TEvPing::TPtr&);
void Handle(TEvPrivate::TEvPrintState::TPtr&);
void Handle(TEvPrivate::TEvProcessState::TPtr&);

STRICT_STFUNC(StateFunc, {
hFunc(NFq::TEvRowDispatcher::TEvCoordinatorChanged, Handle);
Expand All @@ -212,6 +217,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
hFunc(NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed, Handle);
hFunc(NActors::TEvents::TEvPing, Handle);
hFunc(TEvPrivate::TEvPrintState, Handle);
hFunc(TEvPrivate::TEvProcessState, Handle);
})

static constexpr char ActorName[] = "DQ_PQ_READ_ACTOR";
Expand All @@ -224,7 +230,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
void ProcessState();
void Stop(const TString& message);
void StopSessions();
void ReInit();
void ReInit(const TString& reason);
void PrintInternalState();
};

Expand Down Expand Up @@ -262,6 +268,10 @@ void TDqPqRdReadActor::ProcessState() {
if (!ReadyBuffer.empty()) {
return;
}
if (!ProcessStateScheduled) {
ProcessStateScheduled = true;
Schedule(TDuration::Seconds(ProcessStatePeriodSec), new TEvPrivate::TEvProcessState());
}
if (!CoordinatorActorId) {
SRC_LOG_D("Send TEvCoordinatorChangesSubscribe to local row dispatcher, self id " << SelfId());
Send(LocalRowDispatcherActorId, new NFq::TEvRowDispatcher::TEvCoordinatorChangesSubscribe());
Expand Down Expand Up @@ -401,7 +411,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& e

ui64 partitionId = ev->Get()->Record.GetConsumer().GetPartitionId();
auto sessionIt = Sessions.find(partitionId);
YQL_ENSURE(sessionIt != Sessions.end(), "Unknown partition id");
if (sessionIt == Sessions.end()) {
SRC_LOG_W("Ignore TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
YQL_ENSURE(State != EState::STARTED);
return;
}
auto& sessionInfo = sessionIt->second;
if (!sessionInfo.EventsQueue.OnEventReceived(ev)) {
SRC_LOG_W("Wrong seq num ignore message, seqNo " << meta.GetSeqNo());
Expand All @@ -415,7 +430,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev)

ui64 partitionId = ev->Get()->Record.GetPartitionId();
auto sessionIt = Sessions.find(partitionId);
YQL_ENSURE(sessionIt != Sessions.end(), "Unknown partition id");
if (sessionIt == Sessions.end()) {
SRC_LOG_W("Ignore TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
YQL_ENSURE(State != EState::STARTED);
return;
}

auto& sessionInfo = sessionIt->second;
if (!sessionInfo.EventsQueue.OnEventReceived(ev)) {
Expand All @@ -431,7 +451,12 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatus::TPtr& ev) {

ui64 partitionId = ev->Get()->Record.GetPartitionId();
auto sessionIt = Sessions.find(partitionId);
YQL_ENSURE(sessionIt != Sessions.end(), "Unknown partition id");
if (sessionIt == Sessions.end()) {
SRC_LOG_W("Ignore TEvStatus from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
YQL_ENSURE(State != EState::STARTED);
return;
}
auto& sessionInfo = sessionIt->second;

if (!sessionInfo.EventsQueue.OnEventReceived(ev)) {
Expand All @@ -452,7 +477,9 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
ui64 partitionId = ev->Get()->Record.GetPartitionId();
auto sessionIt = Sessions.find(partitionId);
if (sessionIt == Sessions.end()) {
Stop("Internal error: unknown partition id " + ToString(partitionId));
SRC_LOG_W("Ignore TEvNewDataArrived from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
YQL_ENSURE(State != EState::STARTED);
return;
}

Expand Down Expand Up @@ -512,20 +539,18 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvCoordinatorChanged::TPtr
}

CoordinatorActorId = ev->Get()->CoordinatorActorId;
SRC_LOG_I("Coordinator is changed, reinit all sessions");
ReInit();
ReInit("Coordinator is changed");
ProcessState();
}

void TDqPqRdReadActor::ReInit() {
SRC_LOG_I("ReInit state");
void TDqPqRdReadActor::ReInit(const TString& reason) {
SRC_LOG_I("ReInit state, reason " << reason);
StopSessions();
Sessions.clear();
State = EState::INIT;
if (!ReadyBuffer.empty()) {
Send(ComputeActorId, new TEvNewAsyncInputDataArrived(InputIndex));
}
ProcessState();
}

void TDqPqRdReadActor::Stop(const TString& message) {
Expand Down Expand Up @@ -582,24 +607,23 @@ void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
}

if (CoordinatorActorId && *CoordinatorActorId == ev->Sender) {
SRC_LOG_D("TEvUndelivered to coordinator, reinit");
ReInit();
ReInit("TEvUndelivered to coordinator");
}
}

void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
SRC_LOG_T("TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() << ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo());
ui64 partitionId = ev->Get()->Record.GetPartitionId();
YQL_ENSURE(Sessions.count(partitionId), "Unknown partition id " << partitionId);
auto it = Sessions.find(partitionId);
if (it == Sessions.end()) {
Stop("Wrong session data");
return;
auto sessionIt = Sessions.find(partitionId);
if (sessionIt == Sessions.end()) {
SRC_LOG_W("Ignore TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
YQL_ENSURE(State != EState::STARTED);
}

Metrics.InFlyGetNextBatch->Set(0);
auto& sessionInfo = it->second;
auto& sessionInfo = sessionIt->second;
if (!sessionInfo.EventsQueue.OnEventReceived(ev)) {
SRC_LOG_W("Wrong seq num ignore message, seqNo " << meta.GetSeqNo());
return;
Expand Down Expand Up @@ -631,8 +655,7 @@ std::pair<NUdf::TUnboxedValuePod, i64> TDqPqRdReadActor::CreateItem(const TStrin
}

void TDqPqRdReadActor::Handle(const NYql::NDq::TEvRetryQueuePrivate::TEvSessionClosed::TPtr& ev) {
SRC_LOG_D("Session closed, event queue id " << ev->Get()->EventQueueId);
ReInit();
ReInit(TStringBuilder() << "Session closed, event queue id " << ev->Get()->EventQueueId);
}

void TDqPqRdReadActor::Handle(NActors::TEvents::TEvPong::TPtr& ev) {
Expand All @@ -654,6 +677,11 @@ void TDqPqRdReadActor::PrintInternalState() {
SRC_LOG_D(str.Str());
}

void TDqPqRdReadActor::Handle(TEvPrivate::TEvProcessState::TPtr&) {
Schedule(TDuration::Seconds(ProcessStatePeriodSec), new TEvPrivate::TEvProcessState());
ProcessState();
}

std::pair<IDqComputeActorAsyncInput*, NActors::IActor*> CreateDqPqRdReadActor(
NPq::NProto::TDqPqTopicSource&& settings,
ui64 inputIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ class TPqDqIntegration: public TDqIntegrationBase {
}
}

sharedReading = sharedReading && (format == "json_each_row" || (format == "raw"));
sharedReading = sharedReading && (format == "json_each_row" || format == "raw");
TString predicateSql = NYql::FormatWhere(predicateProto);
if (sharedReading) {
if (format == "json_each_row") {
Expand Down
6 changes: 6 additions & 0 deletions ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -355,5 +355,11 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {

ProcessSomeJsons(2, {Json3}, RowDispatcher1);
}

Y_UNIT_TEST_F(IgnoreMessageIfNoSessions, TFixture) {
StartSession();
MockCoordinatorChanged(Coordinator2Id);
MockSessionError();
}
}
} // NYql::NDq

0 comments on commit 202cb01

Please sign in to comment.