Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BUG] Fixed desynch between deleted socket and unsubscribing from epoll (misleading log flood) #3090

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion srtcore/api.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -833,6 +833,9 @@ int srt::CUDTUnited::newConnection(const SRTSOCKET listen,
ns->removeFromGroup(true);
}
#endif
// You won't be updating any EIDs anymore.
m_EPoll.wipe_usock(id, ns->core().m_sPollID);

m_Sockets.erase(id);
m_ClosedSockets[id] = ns;
}
Expand Down Expand Up @@ -1557,6 +1560,9 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
targets[tii].errorcode = e.getErrorCode();
targets[tii].id = CUDT::INVALID_SOCK;

// You won't be updating any EIDs anymore.
m_EPoll.wipe_usock(ns->m_SocketID, ns->core().m_sPollID);

ScopedLock cl(m_GlobControlLock);
ns->removeFromGroup(false);
m_Sockets.erase(ns->m_SocketID);
Expand All @@ -1571,6 +1577,8 @@ int srt::CUDTUnited::groupConnect(CUDTGroup* pg, SRT_SOCKGROUPCONFIG* targets, i
targets[tii].id = CUDT::INVALID_SOCK;
ScopedLock cl(m_GlobControlLock);
ns->removeFromGroup(false);
// You won't be updating any EIDs anymore.
m_EPoll.wipe_usock(ns->m_SocketID, ns->core().m_sPollID);
m_Sockets.erase(ns->m_SocketID);
// Intercept to delete the socket on failure.
delete ns;
Expand Down Expand Up @@ -2063,6 +2071,9 @@ int srt::CUDTUnited::close(CUDTSocket* s)
}
#endif

// You won't be updating any EIDs anymore.
m_EPoll.wipe_usock(s->m_SocketID, s->core().m_sPollID);

m_Sockets.erase(s->m_SocketID);
m_ClosedSockets[s->m_SocketID] = s;
HLOGC(smlog.Debug, log << "@" << u << "U::close: Socket MOVED TO CLOSED for collecting later.");
Expand Down Expand Up @@ -2851,6 +2862,10 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
CUDTSocket* as = si->second;

as->breakSocket_LOCKED();

// You won't be updating any EIDs anymore.
m_EPoll.wipe_usock(as->m_SocketID, as->core().m_sPollID);

m_ClosedSockets[q->first] = as;
m_Sockets.erase(q->first);
}
Expand All @@ -2870,7 +2885,8 @@ void srt::CUDTUnited::removeSocket(const SRTSOCKET u)
* remains forever causing epoll_wait to unblock continuously for inexistent
* sockets. Get rid of all events for this socket.
*/
m_EPoll.update_events(u, s->core().m_sPollID, SRT_EPOLL_IN | SRT_EPOLL_OUT | SRT_EPOLL_ERR, false);
// (just in case, this should be wiped out already)
m_EPoll.wipe_usock(u, s->core().m_sPollID);

// delete this one
m_ClosedSockets.erase(i);
Expand Down
37 changes: 1 addition & 36 deletions srtcore/core.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6282,42 +6282,7 @@ bool srt::CUDT::closeInternal() ATR_NOEXCEPT
* it would remove the socket from the EPoll after close.
*/

// Make a copy under a lock because other thread might access it
// at the same time.
enterCS(uglobal().m_EPoll.m_EPollLock);
set<int> epollid = m_sPollID;
leaveCS(uglobal().m_EPoll.m_EPollLock);

// trigger any pending IO events.
HLOGC(smlog.Debug, log << CONID() << "close: SETTING ERR readiness on E" << Printable(epollid));
uglobal().m_EPoll.update_events(m_SocketID, m_sPollID, SRT_EPOLL_ERR, true);
// then remove itself from all epoll monitoring
int no_events = 0;
for (set<int>::iterator i = epollid.begin(); i != epollid.end(); ++i)
{
HLOGC(smlog.Debug, log << CONID() << "close: CLEARING subscription on E" << (*i));
try
{
uglobal().m_EPoll.update_usock(*i, m_SocketID, &no_events);
}
catch (...)
{
// The goal of this loop is to remove all subscriptions in
// the epoll system to this socket. If it's unsubscribed already,
// that's even better.
}
HLOGC(smlog.Debug, log << CONID() << "close: removing E" << (*i) << " from back-subscribers");
}

// Not deleting elements from m_sPollID inside the loop because it invalidates
// the control iterator of the loop. Instead, all will be removed at once.

// IMPORTANT: there's theoretically little time between setting ERR readiness
// and unsubscribing, however if there's an application waiting on this event,
// it should be informed before this below instruction locks the epoll mutex.
enterCS(uglobal().m_EPoll.m_EPollLock);
m_sPollID.clear();
leaveCS(uglobal().m_EPoll.m_EPollLock);
uglobal().m_EPoll.wipe_usock(m_SocketID, m_sPollID);

// XXX What's this, could any of the above actions make it !m_bOpened?
if (!m_bOpened)
Expand Down
27 changes: 27 additions & 0 deletions srtcore/epoll.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -945,6 +945,33 @@ int srt::CEPoll::update_events(const SRTSOCKET& uid, std::set<int>& eids, const
return nupdated;
}

/// This is a simple function which removes the socket from epoll system.
/// The subscription list should be provided in the @a eids container and
/// the socket is removed from each of them, then this is cleared. This
/// should be the socket's private EID container that keeps EIDs that it
/// should update when an appropriate event comes.
///
/// @param uid Socket ID that has to be removed from the epoll system
/// @param eids EIDs that the given socket believes being subscribed in
void srt::CEPoll::wipe_usock(const SRTSOCKET uid, std::set<int>& eids)
{
ScopedLock pg (m_EPollLock);
for (set<int>::iterator i = eids.begin(); i != eids.end(); ++ i)
{
map<int, CEPollDesc>::iterator p = m_mPolls.find(*i);
if (p == m_mPolls.end())
{
HLOGC(eilog.Note, log << "epoll/wipe: E" << *i << " was deleted in the meantime");
continue;
}

CEPollDesc& ed = p->second;
ed.removeSubscription(uid);
}

eids.clear();
}

// Debug use only.
#if ENABLE_HEAVY_LOGGING
namespace srt
Expand Down
2 changes: 2 additions & 0 deletions srtcore/epoll.h
Original file line number Diff line number Diff line change
Expand Up @@ -489,6 +489,8 @@ friend class srt::CRendezvousQueue;

int update_events(const SRTSOCKET& uid, std::set<int>& eids, int events, bool enable);

void wipe_usock(const SRTSOCKET uid, std::set<int>& eids);

int setflags(const int eid, int32_t flags);

private:
Expand Down
22 changes: 18 additions & 4 deletions test/test_bonding.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include <array>
#include <future>
#include <thread>
#include <mutex>
#include <chrono>
#include <vector>
#include "gtest/gtest.h"
Expand Down Expand Up @@ -62,6 +63,8 @@ TEST(Bonding, SRTConnectGroup)

#define EXPECT_SRT_SUCCESS(callform) EXPECT_NE(callform, -1) << "SRT ERROR: " << srt_getlasterror_str()

static std::mutex g_listening_stopped;

void listening_thread(bool should_read)
{
const SRTSOCKET server_sock = srt_create_socket();
Expand Down Expand Up @@ -118,6 +121,9 @@ void listening_thread(bool should_read)
}
}

std::cout << "Listen: wait for green light from the caller...\n";
std::unique_lock<std::mutex> listen_lock (g_listening_stopped);

srt_close(acp);
srt_close(server_sock);

Expand All @@ -135,8 +141,8 @@ int g_nfailed = 0;
void ConnectCallback(void* , SRTSOCKET sock, int error, const sockaddr* /*peer*/, int token)
{
std::cout << "Connect callback. Socket: " << sock
<< ", error: " << error
<< ", token: " << token << '\n';
<< ", error: " << error << " (" << srt_strerror(error, 0)
<< "), token: " << token << '\n';

if (error == SRT_SUCCESS)
++g_nconnected;
Expand Down Expand Up @@ -171,6 +177,10 @@ TEST(Bonding, NonBlockingGroupConnect)
sockaddr_in safail = sa;
safail.sin_port = htons(4201); // port where we have no listener

// We need to keep the listener with the socket without closing it
// until we are done.
std::unique_lock<std::mutex> listen_lock (g_listening_stopped);

std::future<void> listen_promise = std::async(std::launch::async, std::bind(&listening_thread, false));

std::cout << "Connecting two sockets " << std::endl;
Expand Down Expand Up @@ -203,7 +213,7 @@ TEST(Bonding, NonBlockingGroupConnect)
write, &wlen,
5000, /* timeout */
0, 0, 0, 0);

std::cout << "Epoll result: " << epoll_res << '\n';
std::cout << "Epoll rlen: " << rlen << ", wlen: " << wlen << '\n';
for (int i = 0; i < rlen; ++i)
Expand All @@ -212,10 +222,14 @@ TEST(Bonding, NonBlockingGroupConnect)
}
for (int i = 0; i < wlen; ++i)
{
std::cout << "Epoll write[" << i << "]: " << write[i] << " (removed from epoll)\n";
SRT_SOCKSTATUS st = srt_getsockstate(write[i]);
std::cout << "Epoll write[" << i << "]: " << write[i]
<< " ST:" << srt_logging::SockStatusStr(st)
<< " (removing from epoll)\n";
EXPECT_EQ(srt_epoll_remove_usock(poll_id, write[i]), 0);
}
}
listen_lock.unlock(); // give green light to the listener so that it closes sockets.

listen_promise.wait();

Expand Down
Loading