Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

YQ-3988 Add stop session reason #12839

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions ydb/core/fq/libs/row_dispatcher/protos/events.proto
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,15 @@ message TEvNewDataArrived {
}

message TEvStopSession {
enum EStopReason {
STOP_REASON_UNSPECIFIED = 0;
COMMON = 1;
WRONG_SESSION = 2;
}

NYql.NPq.NProto.TDqPqTopicSource Source = 1;
uint32 PartitionId = 2;
EStopReason Reason = 3;
optional NYql.NDqProto.TMessageTransportMeta TransportMeta = 100;
}

Expand Down
13 changes: 12 additions & 1 deletion ydb/core/fq/libs/row_dispatcher/row_dispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -869,7 +869,7 @@ bool TRowDispatcher::CheckSession(TAtomicSharedPtr<ConsumerInfo>& consumer, cons
}
if (!consumer->EventsQueue.OnEventReceived(ev)) {
const NYql::NDqProto::TMessageTransportMeta& meta = ev->Get()->Record.GetTransportMeta();
LOG_ROW_DISPATCHER_WARN("Wrong seq num ignore message (" << typeid(TEventPtr).name() << ") seqNo " << meta.GetSeqNo() << " from " << ev->Sender.ToString() << ", query id " << consumer->QueryId);
LOG_ROW_DISPATCHER_WARN("Wrong seq num, ignore message (" << typeid(TEventPtr).name() << ") seqNo " << meta.GetSeqNo() << " from " << ev->Sender.ToString() << ", query id " << consumer->QueryId);
return false;
}
return true;
Expand All @@ -885,6 +885,17 @@ void TRowDispatcher::Handle(NFq::TEvRowDispatcher::TEvStopSession::TPtr& ev) {
LWPROBE(StopSession, ev->Sender.ToString(), ev->Get()->Record.GetPartitionId(), it->second->QueryId, ev->Get()->Record.ByteSizeLong());
LOG_ROW_DISPATCHER_DEBUG("Received TEvStopSession, topicPath " << ev->Get()->Record.GetSource().GetTopicPath() <<
" partitionId " << ev->Get()->Record.GetPartitionId() << " query id " << it->second->QueryId);
const auto& consumer = it->second;
if (ev->Cookie != consumer->Generation) {
LOG_ROW_DISPATCHER_WARN("Wrong message generation, ignore TEvStopSession, sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << consumer->Generation << ", query id " << consumer->QueryId);
return;
}
if (ev->Get()->Record.GetReason() == NFq::NRowDispatcherProto::TEvStopSession::WRONG_SESSION) {
LOG_ROW_DISPATCHER_WARN("Received TEvStopSession with WRONG_SESSION (sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << consumer->Generation << ", query id " << consumer->QueryId << "), delete consumer");
DeleteConsumer(key);
return;
}

if (!CheckSession(it->second, ev)) {
return;
}
Expand Down
6 changes: 3 additions & 3 deletions ydb/core/fq/libs/row_dispatcher/topic_session.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,9 +106,9 @@ class TTopicSession : public TActorBootstrapped<TTopicSession> {
}
Y_UNUSED(TDuration::TryParse(Settings.GetSource().GetReconnectPeriod(), ReconnectPeriod));
auto queryGroup = Counters->GetSubgroup("query_id", ev->Get()->Record.GetQueryId());
auto topicGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(readGroup));
FilteredDataRate = topicGroup->GetCounter("FilteredDataRate", true);
RestartSessionByOffsetsByQuery = counters->GetCounter("RestartSessionByOffsetsByQuery", true);
auto readSubGroup = queryGroup->GetSubgroup("read_group", SanitizeLabel(readGroup));
FilteredDataRate = readSubGroup->GetCounter("FilteredDataRate", true);
RestartSessionByOffsetsByQuery = readSubGroup->GetCounter("RestartSessionByOffsetsByQuery", true);
}

~TClientsInfo() {
Expand Down
23 changes: 21 additions & 2 deletions ydb/core/fq/libs/row_dispatcher/ut/row_dispatcher_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ class TFixture : public NUnitTest::TBaseFixture {

public:
TFixture()
: Runtime(1) {}
: Runtime(2) {}

void SetUp(NUnitTest::TTestContext&) override {
TAutoPtr<TAppPrepare> app = new TAppPrepare();
Expand All @@ -78,6 +78,7 @@ class TFixture : public NUnitTest::TBaseFixture {
EdgeActor = Runtime.AllocateEdgeActor();
ReadActorId1 = Runtime.AllocateEdgeActor();
ReadActorId2 = Runtime.AllocateEdgeActor();
ReadActorId3 = Runtime.AllocateEdgeActor(1);
TestActorFactory = MakeIntrusive<TTestActorFactory>(Runtime);

NYql::TPqGatewayServices pqServices(
Expand Down Expand Up @@ -133,13 +134,17 @@ class TFixture : public NUnitTest::TBaseFixture {
Nothing(), // readOffset,
0, // StartingMessageTimestamp;
"QueryId");
event->Record.MutableTransportMeta()->SetSeqNo(1);
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event, 0, generation));
}

void MockStopSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId) {
void MockStopSession(const NYql::NPq::NProto::TDqPqTopicSource& source, ui64 partitionId, TActorId readActorId,
NFq::NRowDispatcherProto::TEvStopSession::EStopReason reason = NFq::NRowDispatcherProto::TEvStopSession::COMMON) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
event->Record.MutableSource()->CopyFrom(source);
event->Record.SetPartitionId(partitionId);
event->Record.SetReason(reason);
event->Record.MutableTransportMeta()->SetSeqNo(1);
Runtime.Send(new IEventHandle(RowDispatcher, readActorId, event.release(), 0, 1));
}

Expand Down Expand Up @@ -239,6 +244,7 @@ class TFixture : public NUnitTest::TBaseFixture {
NActors::TActorId EdgeActor;
NActors::TActorId ReadActorId1;
NActors::TActorId ReadActorId2;
NActors::TActorId ReadActorId3;
TIntrusivePtr<TTestActorFactory> TestActorFactory;

NYql::NPq::NProto::TDqPqTopicSource Source1 = BuildPqTopicSourceSettings("Endpoint1", "Database1", "topic", "connection_id1");
Expand Down Expand Up @@ -421,6 +427,19 @@ Y_UNIT_TEST_SUITE(RowDispatcherTests) {
MockStopSession(Source1Connection2, PartitionId0, ReadActorId2);
ExpectStopSession(session2, PartitionId0);
}

// TODO: not working GrabEdgeEvent()
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

А точно этот комментарий нужен?

// Y_UNIT_TEST_F(StopSessionWithWrongSessionFlag, TFixture) {
// MockAddSession(Source1, PartitionId0, ReadActorId3);
// auto topicSessionId = ExpectRegisterTopicSession();
// ExpectStartSessionAck(ReadActorId3);
// ExpectStartSession(topicSessionId);

// ProcessData(ReadActorId3, PartitionId0, topicSessionId);

// MockStopSession(Source1, PartitionId0, ReadActorId3, NFq::NRowDispatcherProto::TEvStopSession::WRONG_SESSION);
// ExpectStopSession(topicSessionId, PartitionId0);
// }
}

}
Expand Down
27 changes: 16 additions & 11 deletions ydb/library/yql/providers/pq/async_io/dq_pq_rd_read_actor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -284,7 +284,7 @@ class TDqPqRdReadActor : public NActors::TActor<TDqPqRdReadActor>, public NYql::
void TrySendGetNextBatch(SessionInfo& sessionInfo);
template <class TEventPtr>
bool CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId);
void SendStopSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie);
void SendWrongSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie);
void NotifyCA();
};

Expand Down Expand Up @@ -425,6 +425,7 @@ void TDqPqRdReadActor::StopSessions() {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
*event->Record.MutableSource() = SourceParams;
event->Record.SetPartitionId(partitionId);
event->Record.SetReason(NFq::NRowDispatcherProto::TEvStopSession::COMMON);
SRC_LOG_I("Send StopSession to " << sessionInfo.RowDispatcherActorId);
sessionInfo.EventsQueue.Send(event.release(), sessionInfo.Generation);
}
Expand Down Expand Up @@ -501,7 +502,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStartSessionAck::TPtr& e
SRC_LOG_W("Ignore TEvStartSessionAck from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
YQL_ENSURE(State != EState::STARTED);
SendStopSession(ev->Sender, partitionId, ev->Cookie);
SendWrongSession(ev->Sender, partitionId, ev->Cookie);
return;
}
auto& sessionInfo = sessionIt->second;
Expand All @@ -522,7 +523,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvSessionError::TPtr& ev)
SRC_LOG_W("Ignore TEvSessionError from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
YQL_ENSURE(State != EState::STARTED);
SendStopSession(ev->Sender, partitionId, ev->Cookie);
SendWrongSession(ev->Sender, partitionId, ev->Cookie);
return;
}

Expand All @@ -547,7 +548,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvStatistics::TPtr& ev) {
SRC_LOG_W("Ignore TEvStatistics from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
YQL_ENSURE(State != EState::STARTED);
SendStopSession(ev->Sender, partitionId, ev->Cookie);
SendWrongSession(ev->Sender, partitionId, ev->Cookie);
return;
}
auto& sessionInfo = sessionIt->second;
Expand Down Expand Up @@ -580,7 +581,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvNewDataArrived::TPtr& ev
SRC_LOG_W("Ignore TEvNewDataArrived from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId);
YQL_ENSURE(State != EState::STARTED);
SendStopSession(ev->Sender, partitionId, ev->Cookie);
SendWrongSession(ev->Sender, partitionId, ev->Cookie);
return;
}

Expand Down Expand Up @@ -633,7 +634,7 @@ void TDqPqRdReadActor::Handle(const NFq::TEvRowDispatcher::TEvHeartbeat::TPtr& e
if (sessionIt == Sessions.end()) {
SRC_LOG_W("Ignore TEvHeartbeat from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
SendStopSession(ev->Sender, partitionId, ev->Cookie);
SendWrongSession(ev->Sender, partitionId, ev->Cookie);
return;
}
CheckSession(sessionIt->second, ev, partitionId);
Expand Down Expand Up @@ -723,9 +724,12 @@ void TDqPqRdReadActor::HandleDisconnected(TEvInterconnect::TEvNodeDisconnected::
}

void TDqPqRdReadActor::Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString());
SRC_LOG_D("TEvUndelivered, " << ev->Get()->ToString() << " from " << ev->Sender.ToString() << " generation " << ev->Cookie);
Counters.Undelivered++;
for (auto& [partitionId, sessionInfo] : Sessions) {
if (ev->Cookie != sessionInfo.Generation) {
continue;
}
if (sessionInfo.EventsQueue.HandleUndelivered(ev) == NYql::NDq::TRetryEventsQueue::ESessionState::SessionClosed) {
ReInit(TStringBuilder() << "Session closed, partition id " << sessionInfo.PartitionId);
break;
Expand All @@ -747,7 +751,7 @@ void TDqPqRdReadActor::Handle(NFq::TEvRowDispatcher::TEvMessageBatch::TPtr& ev)
SRC_LOG_W("Ignore TEvMessageBatch from " << ev->Sender << ", seqNo " << meta.GetSeqNo()
<< ", ConfirmedSeqNo " << meta.GetConfirmedSeqNo() << ", PartitionId " << partitionId << ", cookie " << ev->Cookie);
YQL_ENSURE(State != EState::STARTED);
SendStopSession(ev->Sender, partitionId, ev->Cookie);
SendWrongSession(ev->Sender, partitionId, ev->Cookie);
return;
}

Expand Down Expand Up @@ -823,7 +827,7 @@ void TDqPqRdReadActor::PrintInternalState() {

TString TDqPqRdReadActor::GetInternalState() {
TStringStream str;
str << "State: used buffer size " << ReadyBufferSizeBytes << " ready buffer event size " << ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << InFlyAsyncInputData << "\n";
str << LogPrefix << "State: used buffer size " << ReadyBufferSizeBytes << " ready buffer event size " << ReadyBuffer.size() << " state " << static_cast<ui64>(State) << " InFlyAsyncInputData " << InFlyAsyncInputData << "\n";
str << "Counters: GetAsyncInputData " << Counters.GetAsyncInputData << " CoordinatorChanged " << Counters.CoordinatorChanged << " CoordinatorResult " << Counters.CoordinatorResult
<< " MessageBatch " << Counters.MessageBatch << " StartSessionAck " << Counters.StartSessionAck << " NewDataArrived " << Counters.NewDataArrived
<< " SessionError " << Counters.SessionError << " Statistics " << Counters.Statistics << " NodeDisconnected " << Counters.NodeDisconnected
Expand Down Expand Up @@ -867,7 +871,7 @@ template <class TEventPtr>
bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, ui64 partitionId) {
if (ev->Cookie != session.Generation) {
SRC_LOG_W("Wrong message generation (" << typeid(TEventPtr).name() << "), sender " << ev->Sender << " cookie " << ev->Cookie << ", session generation " << session.Generation << ", send TEvStopSession");
SendStopSession(ev->Sender, partitionId, ev->Cookie);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Не до конца понимаю почему нельзя было послать новый generation. Который сейчас в dq_pq_rd_read_actor и по нему в row_dispatcher все старые поудалять?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Предположим что клиент в RD не удалился (сообщение не дошло и т.п.).
Далее может быть 2 случая когда read_actor переподключается к RD (предположим одна партиция):

  1. read_actor идет к тому же RD. RD на StartSession видит бОльший generation и пересоздает клиента,
  2. read_actor идет к другому же RD (координатор сменился и раздал другое распределение). Теперь read_actor о старом RD ничего не знает. В старом RD сессия жива и RD периодически посылает heartbeat. read_actor (на heartbeat) отвечает StopSession при этом проставляя невалидный SeqNo (он и не знает какой нужен т.к. сессия удалена).

Эту проблему можно решить по другому, например на StopSession не проверять SeqNo.

SendWrongSession(ev->Sender, partitionId, ev->Cookie);
return false;
}
if (!session.EventsQueue.OnEventReceived(ev)) {
Expand All @@ -878,10 +882,11 @@ bool TDqPqRdReadActor::CheckSession(SessionInfo& session, const TEventPtr& ev, u
return true;
}

void TDqPqRdReadActor::SendStopSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) {
void TDqPqRdReadActor::SendWrongSession(const NActors::TActorId& recipient, ui64 partitionId, ui64 cookie) {
auto event = std::make_unique<NFq::TEvRowDispatcher::TEvStopSession>();
*event->Record.MutableSource() = SourceParams;
event->Record.SetPartitionId(partitionId);
event->Record.SetReason(NFq::NRowDispatcherProto::TEvStopSession::WRONG_SESSION);
Send(recipient, event.release(), 0, cookie);
}

Expand Down
17 changes: 12 additions & 5 deletions ydb/tests/fq/pq_async_io/ut/dq_pq_rd_read_actor_ut.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -221,10 +221,10 @@ struct TFixture : public TPqIoTestFixture {
});
}

void MockUndelivered() {
void MockUndelivered(ui64 generation = 0, NActors::TEvents::TEvUndelivered::EReason reason = NActors::TEvents::TEvUndelivered::ReasonActorUnknown) {
CaSetup->Execute([&](TFakeActor& actor) {
auto event = new NActors::TEvents::TEvUndelivered(0, NActors::TEvents::TEvUndelivered::ReasonActorUnknown);
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, RowDispatcher1, event));
auto event = new NActors::TEvents::TEvUndelivered(0, reason);
CaSetup->Runtime->Send(new NActors::IEventHandle(*actor.DqAsyncInputActorId, RowDispatcher1, event, 0, generation));
});
}

Expand Down Expand Up @@ -272,6 +272,13 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
}

Y_UNIT_TEST_F(IgnoreUndeliveredWithWrongGeneration, TFixture) {
StartSession(Source1);
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
MockUndelivered(NActors::TEvents::TEvUndelivered::Disconnected);
ProcessSomeMessages(2, {Message3}, RowDispatcher1);
}

Y_UNIT_TEST_F(SessionError, TFixture) {
StartSession(Source1);

Expand Down Expand Up @@ -414,7 +421,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
ProcessSomeMessages(0, {Message1, Message2}, RowDispatcher1);
MockDisconnected();
MockConnected();
MockUndelivered();
MockUndelivered(1);

auto req = ExpectCoordinatorRequest(Coordinator1Id);
MockCoordinatorResult(RowDispatcher1, req->Cookie);
Expand Down Expand Up @@ -451,7 +458,7 @@ Y_UNIT_TEST_SUITE(TDqPqRdReadActorTests) {
MockCoordinatorChanged(Coordinator2Id);
auto req = ExpectCoordinatorRequest(Coordinator2Id);

MockUndelivered();
MockUndelivered(1);

MockCoordinatorResult(RowDispatcher1, req->Cookie);
MockCoordinatorResult(RowDispatcher1, req->Cookie);
Expand Down
Loading