-
Notifications
You must be signed in to change notification settings - Fork 600
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
YQ-3988 Add stop session reason #12839
base: main
Are you sure you want to change the base?
Changes from 3 commits
dc3dddc
d6c79fb
10030e3
59e88fc
94730fb
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -284,7 +284,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql:: | |
void TrySendGetNextBatch(SessionInfo& sessionInfo); | ||
template <class TEventPtr> | ||
bool CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId); | ||
void SendStopSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie); | ||
void SendWrongSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie); | ||
void NotifyCA(); | ||
}; | ||
|
||
|
@@ -425,6 +425,7 @@ void TDqPqRdReadActor::StopSessions() { | |
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>(); | ||
*event->Record.MutableSource() = SourceParams; | ||
event->Record.SetPartitionId(partitionId); | ||
event->Record.SetReason(NFq::NRowDispatcherProto::TEvStopSession::COMMON); | ||
SRC_LOG_I("Send StopSession to " << sessionInfo.RowDispatcherActorId); | ||
sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation); | ||
} | ||
|
@@ -501,7 +502,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& e | |
SRC_LOG_W("Ignore TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo() | ||
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie); | ||
YQL_ENSURE(State != EState::STARTED); | ||
SendStopSession(ev->Sender, partitionId, ev->Cookie); | ||
SendWrongSession(ev->Sender, partitionId, ev->Cookie); | ||
return; | ||
} | ||
auto& sessionInfo = sessionIt->second; | ||
|
@@ -522,7 +523,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev) | |
SRC_LOG_W("Ignore TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo() | ||
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie); | ||
YQL_ENSURE(State != EState::STARTED); | ||
SendStopSession(ev->Sender, partitionId, ev->Cookie); | ||
SendWrongSession(ev->Sender, partitionId, ev->Cookie); | ||
return; | ||
} | ||
|
||
|
@@ -547,7 +548,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) { | |
SRC_LOG_W("Ignore TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo() | ||
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId); | ||
YQL_ENSURE(State != EState::STARTED); | ||
SendStopSession(ev->Sender, partitionId, ev->Cookie); | ||
SendWrongSession(ev->Sender, partitionId, ev->Cookie); | ||
return; | ||
} | ||
auto& sessionInfo = sessionIt->second; | ||
|
@@ -580,7 +581,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev | |
SRC_LOG_W("Ignore TEvNewDataArrived from " << ev->Sender << ", seqNo " << meta.GetSeqNo() | ||
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId); | ||
YQL_ENSURE(State != EState::STARTED); | ||
SendStopSession(ev->Sender, partitionId, ev->Cookie); | ||
SendWrongSession(ev->Sender, partitionId, ev->Cookie); | ||
return; | ||
} | ||
|
||
|
@@ -633,7 +634,7 @@ void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& e | |
if (sessionIt == Sessions.end()) { | ||
SRC_LOG_W("Ignore TEvHeartbeat from " << ev->Sender << ", seqNo " << meta.GetSeqNo() | ||
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie); | ||
SendStopSession(ev->Sender, partitionId, ev->Cookie); | ||
SendWrongSession(ev->Sender, partitionId, ev->Cookie); | ||
return; | ||
} | ||
CheckSession(sessionIt->second, ev, partitionId); | ||
|
@@ -723,9 +724,12 @@ void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected:: | |
} | ||
|
||
void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { | ||
SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString()); | ||
SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString() << " generation " << ev->Cookie); | ||
Counters.Undelivered++; | ||
for (auto& [partitionId, sessionInfo] : Sessions) { | ||
if (ev->Cookie != sessionInfo.Generation) { | ||
continue; | ||
} | ||
if (sessionInfo.EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) { | ||
ReInit(TStringBuilder() << "Session closed, partition id " << sessionInfo.PartitionId); | ||
break; | ||
|
@@ -747,7 +751,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev) | |
SRC_LOG_W("Ignore TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo() | ||
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie); | ||
YQL_ENSURE(State != EState::STARTED); | ||
SendStopSession(ev->Sender, partitionId, ev->Cookie); | ||
SendWrongSession(ev->Sender, partitionId, ev->Cookie); | ||
return; | ||
} | ||
|
||
|
@@ -823,7 +827,7 @@ void TDqPqRdReadActor::PrintInternalState() { | |
|
||
TString TDqPqRdReadActor::GetInternalState() { | ||
TStringStream str; | ||
str << "State: used buffer size " << ReadyBufferSizeBytes << " ready buffer event size " << ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << InFlyAsyncInputData << "\n"; | ||
str << LogPrefix << "State: used buffer size " << ReadyBufferSizeBytes << " ready buffer event size " << ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << InFlyAsyncInputData << "\n"; | ||
str << "Counters: GetAsyncInputData " << Counters.GetAsyncInputData << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult | ||
<< " MessageBatch " << Counters.MessageBatch << " StartSessionAck " << Counters.StartSessionAck << " NewDataArrived " << Counters.NewDataArrived | ||
<< " SessionError " << Counters.SessionError << " Statistics " << Counters.Statistics << " NodeDisconnected " << Counters.NodeDisconnected | ||
|
@@ -867,7 +871,7 @@ template <class TEventPtr> | |
bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId) { | ||
if (ev->Cookie != session.Generation) { | ||
SRC_LOG_W("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << session.Generation << ", send TEvStopSession"); | ||
SendStopSession(ev->Sender, partitionId, ev->Cookie); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Не до конца понимаю почему нельзя было послать новый generation. Который сейчас в dq_pq_rd_read_actor и по нему в row_dispatcher все старые поудалять? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Предположим что клиент в RD не удалился (сообщение не дошло и т.п.).
Эту проблему можно решить по другому, например на StopSession не проверять SeqNo. |
||
SendWrongSession(ev->Sender, partitionId, ev->Cookie); | ||
return false; | ||
} | ||
if (!session.EventsQueue.OnEventReceived(ev)) { | ||
|
@@ -878,10 +882,11 @@ bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, u | |
return true; | ||
} | ||
|
||
void TDqPqRdReadActor::SendStopSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) { | ||
void TDqPqRdReadActor::SendWrongSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) { | ||
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>(); | ||
*event->Record.MutableSource() = SourceParams; | ||
event->Record.SetPartitionId(partitionId); | ||
event->Record.SetReason(NFq::NRowDispatcherProto::TEvStopSession::WRONG_SESSION); | ||
Send(recipient, event.release(), 0, cookie); | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
А точно этот комментарий нужен?