-
Notifications
You must be signed in to change notification settings - Fork 5
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[GAIAPLAT-2163] Ensure that DDL sessions cannot happen concurrently with other DDL or regular sessions #1524
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm possibly being dense but I don't understand the concurrency approach here. See comments for details (tl;dr why isn't a reader/writer lock all you need?)...
production/db/core/src/db_client.cpp
Outdated
|
||
if (event == session_event_t::SESSION_ERROR) | ||
{ | ||
gaia::db::end_session(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't think it's a good pattern to invoke a public API that was not designed to be called from internal code. You're assuming that end_session()
just cleans up some internal state and nothing more, which is not a robust assumption. The right approach would be to just throw the exception with no explicit local cleanup, and have a scope_guard
automatically invoke an internal cleanup method on unsuccessful exit (which could also be invoked by end_session()
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we should have a proper internal end_session
that gets called by the public API as well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we should have a proper internal
end_session
that gets called by the public API as well.
That's what I was getting at.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Or we should have a proper internal end_session that gets called by the public API as well.
We already have that client_t::end_session()
.
But yeah, probably using RIIA to cleanup resources is the way to go. It seems that most resources are already inside a scopeguard, should I just throw the exception then?
production/db/core/src/db_server.cpp
Outdated
// } | ||
s_open_sessions_count++; | ||
|
||
auto clean_open_session = scope_guard::make_scope_guard([] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not a good name IMO. How about just decrement_open_session_count
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
production/db/inc/core/db_server.hpp
Outdated
// This counter gives a more accurate view of the open sessions than | ||
// s_session_threads.size(). See reap_exited_threads(). | ||
static inline std::atomic<uint32_t> s_open_sessions_count{0}; | ||
static inline std::shared_mutex m_start_session_mutex; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do you need both a shared mutex and a refcount? Normally a shared mutex is implemented using a refcount! Why can't you just have a DDL session acquire the mutex in exclusive mode and a data session acquire the mutex in shared mode? Isn't the usual reader-writer semantics (readers block writers, writers block both readers and writers) exactly what you want here? (If you don't want to block on acquiring the mutex, then just use try_lock()
.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I asked, at a certain point, if instead of erroring out we could wait for the DDL sessions to complete before starting other sessions, and vice-versa. I didn't get an answer. This is a beginning in that direction.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The reason why I used a lock was that I didn't want to use a double-check logic (we need to check for both the number of sessions and if a DDL session is active), it is just simpler.
production/db/core/src/db_server.cpp
Outdated
bool has_succeeded = s_is_ddl_session_active.compare_exchange_strong(expected_value, true); | ||
if (!has_succeeded || s_open_sessions_count > 1) | ||
{ | ||
std::cerr << "Impossible to start a DDL session while other sessions are active." << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Cannot start..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
production/db/core/src/db_server.cpp
Outdated
|
||
if (s_is_ddl_session_active) | ||
{ | ||
std::cerr << "Impossible to start a session while a DDL session is active." << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
"Cannot start..."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
production/db/core/src/db_server.cpp
Outdated
|
||
if (s_session_type == session_type_t::ddl) | ||
{ | ||
std::unique_lock lock(m_start_session_mutex); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What datum is this lock protecting? Why do you want to block instead of immediately returning an error?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@simone-gaia is trying to protect the reading of his counters and flags from concurrent updates. So he's basically starting a critical section here. But I think this can be avoided, as I did in my previous logic, with a careful ordering of the checks and double-checking.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LaurentiuCristofor exactly. Thogh, I wanted to avoid the double logic since it's a little trickier. But I will go ahead and do it.
production/db/core/src/db_server.cpp
Outdated
@@ -416,6 +422,8 @@ void server_t::handle_client_shutdown( | |||
bool client_disconnected = true; | |||
txn_rollback(client_disconnected); | |||
} | |||
|
|||
s_open_sessions_count--; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For exception safety, this decrement should be in a scope_guard
(even though this is mostly a theoretical concern on the server).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@simone-gaia : Make sure to run debug tests as well, to see if you're encountering the timing failures that I was also hitting when a session was opened by the client before the server could handle the previous session closure.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LaurentiuCristofor I did encounter the timing issue, that is why I have added the counter. I remembered you mentioning this problem (I was using s_session_threads.size()
before this. What tests are you exactly referring to?
@senderista I did that in a first attempt, but then I noticed that any exception thrown here will crash the server, so I thought it doesn't really matter... Maybe we can protect ourselves against future non-server-crashing-txns though...
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@LaurentiuCristofor I think I found the tests you are referring to... should we just send a message as we do for being_session()
, and wait for its completion?
production/db/inc/core/db_server.hpp
Outdated
static inline std::atomic<bool> s_can_ddl_sessions_still_be_started{true}; | ||
|
||
// This counter gives a more accurate view of the open sessions than | ||
// s_session_threads.size(). See reap_exited_threads(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think you meant to reference can_start_session()
, not reap_exited_threads()
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant reap_exited_threads()
which explains why s_session_threads.size()
is not a reliable count.
production/db/core/src/db_server.cpp
Outdated
@@ -1069,6 +1077,25 @@ bool server_t::authenticate_client_socket(int socket) | |||
return true; | |||
} | |||
|
|||
bool server_t::can_start_session(int socket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's call the arg socket_fd
just to clarify its type (since fds don't have an explicit type).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -1229,12 +1256,8 @@ void server_t::client_dispatch_handler(const std::string& socket_name) | |||
throw_system_error("accept() failed!"); | |||
} | |||
|
|||
if (s_session_threads.size() >= c_session_limit | |||
|| !authenticate_client_socket(session_socket)) | |||
if (!can_start_session(session_socket)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This still doesn't distinguish auth failures and "session limit exceeded", of course. Unfortunately it's not easy to solve because propagating an explicit error back to the client requires establishing a session, which is what we can't do when resource limits have been exceeded!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but at least you can read the server log and make sense of what happened. The reason for this refactoring is that the DDL logic was partially implemented here, but I didn't like to spread the DDL logic across multiple places, so I moved it into the session thread but left this partial refactoring in place.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually, it might be better to move the comment from the can_start_session()
definition to here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I also think that the synchronization logic can be simplified and right now it uses more elements than are needed.
I'm also not clear about the logging change - if it's not related to DDL sessions, perhaps it should be pushed separately.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I have addressed the most basic comments. Will need more time to think about the more complex ones.
production/db/core/src/db_server.cpp
Outdated
bool has_succeeded = s_is_ddl_session_active.compare_exchange_strong(expected_value, true); | ||
if (!has_succeeded || s_open_sessions_count > 1) | ||
{ | ||
std::cerr << "Impossible to start a DDL session while other sessions are active." << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
production/db/core/src/db_server.cpp
Outdated
|
||
if (s_is_ddl_session_active) | ||
{ | ||
std::cerr << "Impossible to start a session while a DDL session is active." << std::endl; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
production/db/core/src/db_server.cpp
Outdated
// } | ||
s_open_sessions_count++; | ||
|
||
auto clean_open_session = scope_guard::make_scope_guard([] { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
production/db/core/src/db_server.cpp
Outdated
@@ -1069,6 +1077,25 @@ bool server_t::authenticate_client_socket(int socket) | |||
return true; | |||
} | |||
|
|||
bool server_t::can_start_session(int socket) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
production/db/inc/core/db_server.hpp
Outdated
static inline std::atomic<bool> s_can_ddl_sessions_still_be_started{true}; | ||
|
||
// This counter gives a more accurate view of the open sessions than | ||
// s_session_threads.size(). See reap_exited_threads(). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I meant reap_exited_threads()
which explains why s_session_threads.size()
is not a reliable count.
@@ -1229,12 +1256,8 @@ void server_t::client_dispatch_handler(const std::string& socket_name) | |||
throw_system_error("accept() failed!"); | |||
} | |||
|
|||
if (s_session_threads.size() >= c_session_limit | |||
|| !authenticate_client_socket(session_socket)) | |||
if (!can_start_session(session_socket)) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, but at least you can read the server log and make sense of what happened. The reason for this refactoring is that the DDL logic was partially implemented here, but I didn't like to spread the DDL logic across multiple places, so I moved it into the session thread but left this partial refactoring in place.
Could you explain in detail why these counters are needed at all if you have a read-write lock ("regular" sessions are "readers", DDL sessions are "writers")? Why can't you do everything you need to do here with a single |
production/db/core/src/db_client.cpp
Outdated
if (event == session_event_t::SESSION_ERROR) | ||
{ | ||
throw session_failure_internal(); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is no longer necessary, but we could keep it. Happy to remove it though, or move to a new PR. @senderista @LaurentiuCristofor LMK what you think.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If there's no code on the server sending this, then we should just remove it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
OK
Maybe this will help clarify what I'm getting at: a reader-writer lock is isomorphic to (and is typically implemented as) a boolean flag to track exclusive access, plus an integer counter to track shared access. (See e.g. our |
@@ -24,7 +24,7 @@ enum session_event_t: uint8 { | |||
REQUEST_STREAM, | |||
DECIDE_TXN_ROLLBACK_FOR_ERROR, | |||
CONNECT_DDL, | |||
CONNECT_PING, | |||
CONNECT_PING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this change? A comma is kept last usually, to avoid having to edit the last line when adding one more.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There used to be another line here (SESSION_ERROR) which has been removed. Also, I don't really like it, but if it is a convention I'll follow it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic looks good now. Just a small note about an edit that seems unnecessary - the removal of the comma after the last enum value.
production/db/core/src/db_server.cpp
Outdated
{ | ||
if (s_session_threads.size() >= c_session_limit) | ||
{ | ||
// The connecting client will get ECONNRESET on their first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Both branches have the same effect, so maybe move this comment above the function: "If this function returns false, then the connecting client will get ECONNRESET
on their first read from socket_fd
."
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Following the next comment (leave this comment here and remove it from can_start_session())
{ | ||
// The connecting client will get ECONNRESET on their first |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave this comment here and remove it from can_start_session()
(because what the client observes depends on how the function's return value is used, not on the function itself, which has no side effects).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done.
@@ -24,7 +24,7 @@ enum session_event_t: uint8 { | |||
REQUEST_STREAM, | |||
DECIDE_TXN_ROLLBACK_FOR_ERROR, | |||
CONNECT_DDL, | |||
CONNECT_PING, | |||
CONNECT_PING |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would leave comma in place; this is a common convention for enums that I think we use elsewhere.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Done
|
||
std::thread regular_session([&num_active_sessions] { | ||
gaia::db::begin_session(); | ||
ASSERT_EQ(num_active_sessions, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This might be OK for a test if we don't observe flakiness, but it's technically racy: the ddl_session
thread could just be nondeterministically delayed at this point rather than blocked, so you could possibly observe num_active_sessions == 0
even if begin_ddl_session()
didn't properly block. But this test may not be worth proper synchronization.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
. But this test may not be worth proper synchronization.
That's what I thought. In any case, flakiness here should not lead to failure. In the following tests, where flakiness could indeed lead to failure, I used a proper blocking mechanism.
gaia::db::begin_ddl_session(); | ||
num_active_sessions++; | ||
std::this_thread::sleep_for(std::chrono::milliseconds(c_session_sleep_millis)); | ||
ASSERT_EQ(num_active_sessions, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What is the purpose of this assert? It's only testing that the previous increment succeeded, but that can't fail.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A right, a remaining of a copy-paste from the previous test.
// NOLINTNEXTLINE(performance-inefficient-vector-operation) | ||
threads.emplace_back([&] { | ||
gaia::db::begin_ping_session(); | ||
num_ping_sessions--; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can't modify num_ping_sessions
without acquiring the mutex that protects it. (You can signal the CV after releasing the mutex, but I think it's clearer to signal it before.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
num_ping_sessions
is atomic, isn't it sufficient?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No, because it could lead to a lost wakeup. If you're not waiting when the CV is signaled, then you won't get the signal. If another thread can modify the protected data without acquiring the mutex, then it could modify the data and signal the CV while you're checking the predicate but before you've gone back into a wait. In that case (if that's the last signal sent) you might never wake up. Forcing any mutating thread to acquire the mutex ensures this can't happen.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a lost wake-up is almost impossible here. We spun up 5 threads, each of which signals the CV. but actually, only the last signal is going to be relevant because it is going to be the one that has num_ping_sessions == 0
.
What the lock is protecting is not the data, but it just ensures that DDL session si running for all the time that the PING sessions are running too (here using timeouts would be indeed too flaky).
Am I missing something?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think so. Imagine that the next-to-last thread decrements the counter and signals the CV. The blocking thread is waiting on the CV when it's signaled, so it wakes up, atomically acquiring the mutex that it atomically released when it went into the wait. Just after the blocking thread checks the predicate and reads 1, but before it goes back into a wait, the last thread decrements the counter to 0 and signals the CV. But the blocking thread isn't in a wait, so it misses this signal (signals aren't queued). Then the blocking thread goes back into its wait, but there are no threads left to signal, so it waits forever. But if the signaling thread must acquire the mutex before modifying the predicate, then this can't happen, because the blocking thread is guaranteed to be waiting when the predicate is modified. The only thing that can wake up the blocking thread at this point is the signal, and it acquires the mutex atomically on wakeup, so we know that when the blocking thread wakes up, it will see all modifications to the predicate that happened before it acquired the mutex (other threads might have acquired the mutex, modified the predicate, and released the mutex before the blocking thread acquires the mutex, so the predicate observed by the blocking thread on wakeup isn't necessarily the same as the predicate observed by the signaling thread).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(Note that everything still works out if you signal the CV after releasing the mutex, but I at least find this to be much harder to reason about than the convention where you only signal the CV while holding the mutex.)
gaia::db::begin_session(); | ||
num_active_sessions++; | ||
std::this_thread::sleep_for(std::chrono::milliseconds(c_session_sleep_millis)); | ||
ASSERT_EQ(num_active_sessions, 1); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments below.
|
||
std::thread ddl_session([&num_active_sessions] { | ||
gaia::db::begin_ddl_session(); | ||
ASSERT_EQ(num_active_sessions, 0); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
See comments below.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Which one?
- If the one about ASSERT, this is on purpose, we want to ensure that the
regular_session
has finished by the time we reach this point. - If the one about locking, this is an atomic variable.
}); | ||
} | ||
|
||
for (auto& thread : threads) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you have a timeout to ensure this test doesn't hang forever in case one of the threads blocks forever due to a bug, or are we just expecting the test framework to time out?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should you have a timeout to ensure this test doesn't hang forever in case one of the threads blocks forever due to a bug, or are we just expecting the test framework to time out?
TeamCity used to have a time limit for tests, now I don't know. Anyway, we use this pattern pretty often, if we see tests hanging, we can add a timeout.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Logic looks good, just a few nits about tests.
db35d8a
to
84047b1
Compare
session_error
exception to notify the client of a generic session error.begin_session() can now throw a
session_error` exception if a DDL session is started concurrently with another session.gaia_log::is_initialized()
to check whether the logger is initialized.