Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[GAIAPLAT-2163] Ensure that DDL sessions cannot happen concurrently with other DDL or regular sessions #1524

Merged
merged 10 commits into from
May 20, 2022

Conversation

simone-gaia
Copy link
Contributor

  • Add session_error exception to notify the client of a generic session error.
  • begin_session() can now throw a session_error` exception if a DDL session is started concurrently with another session.
  • Add gaia_log::is_initialized() to check whether the logger is initialized.

Copy link
Contributor

@senderista senderista left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm possibly being dense but I don't understand the concurrency approach here. See comments for details (tl;dr why isn't a reader/writer lock all you need?)...


if (event == session_event_t::SESSION_ERROR)
{
gaia::db::end_session();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think it's a good pattern to invoke a public API that was not designed to be called from internal code. You're assuming that end_session() just cleans up some internal state and nothing more, which is not a robust assumption. The right approach would be to just throw the exception with no explicit local cleanup, and have a scope_guard automatically invoke an internal cleanup method on unsuccessful exit (which could also be invoked by end_session()).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we should have a proper internal end_session that gets called by the public API as well.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we should have a proper internal end_session that gets called by the public API as well.

That's what I was getting at.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or we should have a proper internal end_session that gets called by the public API as well.

We already have that client_t::end_session().

But yeah, probably using RIIA to cleanup resources is the way to go. It seems that most resources are already inside a scopeguard, should I just throw the exception then?

// }
s_open_sessions_count++;

auto clean_open_session = scope_guard::make_scope_guard([] {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not a good name IMO. How about just decrement_open_session_count?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// This counter gives a more accurate view of the open sessions than
// s_session_threads.size(). See reap_exited_threads().
static inline std::atomic<uint32_t> s_open_sessions_count{0};
static inline std::shared_mutex m_start_session_mutex;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do you need both a shared mutex and a refcount? Normally a shared mutex is implemented using a refcount! Why can't you just have a DDL session acquire the mutex in exclusive mode and a data session acquire the mutex in shared mode? Isn't the usual reader-writer semantics (readers block writers, writers block both readers and writers) exactly what you want here? (If you don't want to block on acquiring the mutex, then just use try_lock().)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I asked, at a certain point, if instead of erroring out we could wait for the DDL sessions to complete before starting other sessions, and vice-versa. I didn't get an answer. This is a beginning in that direction.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The reason why I used a lock was that I didn't want to use a double-check logic (we need to check for both the number of sessions and if a DDL session is active), it is just simpler.

bool has_succeeded = s_is_ddl_session_active.compare_exchange_strong(expected_value, true);
if (!has_succeeded || s_open_sessions_count > 1)
{
std::cerr << "Impossible to start a DDL session while other sessions are active." << std::endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Cannot start..."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


if (s_is_ddl_session_active)
{
std::cerr << "Impossible to start a session while a DDL session is active." << std::endl;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Cannot start..."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


if (s_session_type == session_type_t::ddl)
{
std::unique_lock lock(m_start_session_mutex);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What datum is this lock protecting? Why do you want to block instead of immediately returning an error?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@simone-gaia is trying to protect the reading of his counters and flags from concurrent updates. So he's basically starting a critical section here. But I think this can be avoided, as I did in my previous logic, with a careful ordering of the checks and double-checking.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LaurentiuCristofor exactly. Thogh, I wanted to avoid the double logic since it's a little trickier. But I will go ahead and do it.

@@ -416,6 +422,8 @@ void server_t::handle_client_shutdown(
bool client_disconnected = true;
txn_rollback(client_disconnected);
}

s_open_sessions_count--;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For exception safety, this decrement should be in a scope_guard (even though this is mostly a theoretical concern on the server).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@simone-gaia : Make sure to run debug tests as well, to see if you're encountering the timing failures that I was also hitting when a session was opened by the client before the server could handle the previous session closure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LaurentiuCristofor I did encounter the timing issue, that is why I have added the counter. I remembered you mentioning this problem (I was using s_session_threads.size() before this. What tests are you exactly referring to?

@senderista I did that in a first attempt, but then I noticed that any exception thrown here will crash the server, so I thought it doesn't really matter... Maybe we can protect ourselves against future non-server-crashing-txns though...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@LaurentiuCristofor I think I found the tests you are referring to... should we just send a message as we do for being_session(), and wait for its completion?

static inline std::atomic<bool> s_can_ddl_sessions_still_be_started{true};

// This counter gives a more accurate view of the open sessions than
// s_session_threads.size(). See reap_exited_threads().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think you meant to reference can_start_session(), not reap_exited_threads()?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant reap_exited_threads() which explains why s_session_threads.size() is not a reliable count.

@@ -1069,6 +1077,25 @@ bool server_t::authenticate_client_socket(int socket)
return true;
}

bool server_t::can_start_session(int socket)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Let's call the arg socket_fd just to clarify its type (since fds don't have an explicit type).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1229,12 +1256,8 @@ void server_t::client_dispatch_handler(const std::string& socket_name)
throw_system_error("accept() failed!");
}

if (s_session_threads.size() >= c_session_limit
|| !authenticate_client_socket(session_socket))
if (!can_start_session(session_socket))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This still doesn't distinguish auth failures and "session limit exceeded", of course. Unfortunately it's not easy to solve because propagating an explicit error back to the client requires establishing a session, which is what we can't do when resource limits have been exceeded!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but at least you can read the server log and make sense of what happened. The reason for this refactoring is that the DDL logic was partially implemented here, but I didn't like to spread the DDL logic across multiple places, so I moved it into the session thread but left this partial refactoring in place.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, it might be better to move the comment from the can_start_session() definition to here.

Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I also think that the synchronization logic can be simplified and right now it uses more elements than are needed.

I'm also not clear about the logging change - if it's not related to DDL sessions, perhaps it should be pushed separately.

Copy link
Contributor Author

@simone-gaia simone-gaia left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have addressed the most basic comments. Will need more time to think about the more complex ones.

bool has_succeeded = s_is_ddl_session_active.compare_exchange_strong(expected_value, true);
if (!has_succeeded || s_open_sessions_count > 1)
{
std::cerr << "Impossible to start a DDL session while other sessions are active." << std::endl;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.


if (s_is_ddl_session_active)
{
std::cerr << "Impossible to start a session while a DDL session is active." << std::endl;
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

// }
s_open_sessions_count++;

auto clean_open_session = scope_guard::make_scope_guard([] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -1069,6 +1077,25 @@ bool server_t::authenticate_client_socket(int socket)
return true;
}

bool server_t::can_start_session(int socket)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

static inline std::atomic<bool> s_can_ddl_sessions_still_be_started{true};

// This counter gives a more accurate view of the open sessions than
// s_session_threads.size(). See reap_exited_threads().
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I meant reap_exited_threads() which explains why s_session_threads.size() is not a reliable count.

@@ -1229,12 +1256,8 @@ void server_t::client_dispatch_handler(const std::string& socket_name)
throw_system_error("accept() failed!");
}

if (s_session_threads.size() >= c_session_limit
|| !authenticate_client_socket(session_socket))
if (!can_start_session(session_socket))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, but at least you can read the server log and make sense of what happened. The reason for this refactoring is that the DDL logic was partially implemented here, but I didn't like to spread the DDL logic across multiple places, so I moved it into the session thread but left this partial refactoring in place.

production/db/core/src/db_client.cpp Outdated Show resolved Hide resolved
production/db/core/src/db_server.cpp Outdated Show resolved Hide resolved
production/db/inc/core/db_server.hpp Outdated Show resolved Hide resolved
production/db/core/src/db_server.cpp Outdated Show resolved Hide resolved
@senderista
Copy link
Contributor

Could you explain in detail why these counters are needed at all if you have a read-write lock ("regular" sessions are "readers", DDL sessions are "writers")? Why can't you do everything you need to do here with a single shared_mutex?

Comment on lines 223 to 226
if (event == session_event_t::SESSION_ERROR)
{
throw session_failure_internal();
}
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is no longer necessary, but we could keep it. Happy to remove it though, or move to a new PR. @senderista @LaurentiuCristofor LMK what you think.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If there's no code on the server sending this, then we should just remove it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK

@senderista
Copy link
Contributor

Could you explain in detail why these counters are needed at all if you have a read-write lock ("regular" sessions are "readers", DDL sessions are "writers")? Why can't you do everything you need to do here with a single shared_mutex?

Maybe this will help clarify what I'm getting at: a reader-writer lock is isomorphic to (and is typically implemented as) a boolean flag to track exclusive access, plus an integer counter to track shared access. (See e.g. our inline_shared_lock implementation.) So your flag tracking exclusive access (s_is_ddl_session_active) plus your counter tracking shared access (s_open_sessions_count) is exactly isomorphic to, and redundant with, your reader-writer lock (m_start_session_mutex) (except that those two variables don't fit into a single word, so they can't both be atomically updated without races).

@@ -24,7 +24,7 @@ enum session_event_t: uint8 {
REQUEST_STREAM,
DECIDE_TXN_ROLLBACK_FOR_ERROR,
CONNECT_DDL,
CONNECT_PING,
CONNECT_PING
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why this change? A comma is kept last usually, to avoid having to edit the last line when adding one more.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There used to be another line here (SESSION_ERROR) which has been removed. Also, I don't really like it, but if it is a convention I'll follow it.

Copy link
Contributor

@LaurentiuCristofor LaurentiuCristofor left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic looks good now. Just a small note about an edit that seems unnecessary - the removal of the comma after the last enum value.

{
if (s_session_threads.size() >= c_session_limit)
{
// The connecting client will get ECONNRESET on their first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Both branches have the same effect, so maybe move this comment above the function: "If this function returns false, then the connecting client will get ECONNRESET on their first read from socket_fd."

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Following the next comment (leave this comment here and remove it from can_start_session())

{
// The connecting client will get ECONNRESET on their first
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave this comment here and remove it from can_start_session() (because what the client observes depends on how the function's return value is used, not on the function itself, which has no side effects).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done.

@@ -24,7 +24,7 @@ enum session_event_t: uint8 {
REQUEST_STREAM,
DECIDE_TXN_ROLLBACK_FOR_ERROR,
CONNECT_DDL,
CONNECT_PING,
CONNECT_PING
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would leave comma in place; this is a common convention for enums that I think we use elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


std::thread regular_session([&num_active_sessions] {
gaia::db::begin_session();
ASSERT_EQ(num_active_sessions, 0);
Copy link
Contributor

@senderista senderista May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This might be OK for a test if we don't observe flakiness, but it's technically racy: the ddl_session thread could just be nondeterministically delayed at this point rather than blocked, so you could possibly observe num_active_sessions == 0 even if begin_ddl_session() didn't properly block. But this test may not be worth proper synchronization.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

. But this test may not be worth proper synchronization.

That's what I thought. In any case, flakiness here should not lead to failure. In the following tests, where flakiness could indeed lead to failure, I used a proper blocking mechanism.

gaia::db::begin_ddl_session();
num_active_sessions++;
std::this_thread::sleep_for(std::chrono::milliseconds(c_session_sleep_millis));
ASSERT_EQ(num_active_sessions, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What is the purpose of this assert? It's only testing that the previous increment succeeded, but that can't fail.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A right, a remaining of a copy-paste from the previous test.

// NOLINTNEXTLINE(performance-inefficient-vector-operation)
threads.emplace_back([&] {
gaia::db::begin_ping_session();
num_ping_sessions--;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can't modify num_ping_sessions without acquiring the mutex that protects it. (You can signal the CV after releasing the mutex, but I think it's clearer to signal it before.)

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

num_ping_sessions is atomic, isn't it sufficient?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, because it could lead to a lost wakeup. If you're not waiting when the CV is signaled, then you won't get the signal. If another thread can modify the protected data without acquiring the mutex, then it could modify the data and signal the CV while you're checking the predicate but before you've gone back into a wait. In that case (if that's the last signal sent) you might never wake up. Forcing any mutating thread to acquire the mutex ensures this can't happen.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think a lost wake-up is almost impossible here. We spun up 5 threads, each of which signals the CV. but actually, only the last signal is going to be relevant because it is going to be the one that has num_ping_sessions == 0.

What the lock is protecting is not the data, but it just ensures that DDL session si running for all the time that the PING sessions are running too (here using timeouts would be indeed too flaky).

Am I missing something?

Copy link
Contributor

@senderista senderista May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I think so. Imagine that the next-to-last thread decrements the counter and signals the CV. The blocking thread is waiting on the CV when it's signaled, so it wakes up, atomically acquiring the mutex that it atomically released when it went into the wait. Just after the blocking thread checks the predicate and reads 1, but before it goes back into a wait, the last thread decrements the counter to 0 and signals the CV. But the blocking thread isn't in a wait, so it misses this signal (signals aren't queued). Then the blocking thread goes back into its wait, but there are no threads left to signal, so it waits forever. But if the signaling thread must acquire the mutex before modifying the predicate, then this can't happen, because the blocking thread is guaranteed to be waiting when the predicate is modified. The only thing that can wake up the blocking thread at this point is the signal, and it acquires the mutex atomically on wakeup, so we know that when the blocking thread wakes up, it will see all modifications to the predicate that happened before it acquired the mutex (other threads might have acquired the mutex, modified the predicate, and released the mutex before the blocking thread acquires the mutex, so the predicate observed by the blocking thread on wakeup isn't necessarily the same as the predicate observed by the signaling thread).

Copy link
Contributor

@senderista senderista May 18, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(Note that everything still works out if you signal the CV after releasing the mutex, but I at least find this to be much harder to reason about than the convention where you only signal the CV while holding the mutex.)

gaia::db::begin_session();
num_active_sessions++;
std::this_thread::sleep_for(std::chrono::milliseconds(c_session_sleep_millis));
ASSERT_EQ(num_active_sessions, 1);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments below.


std::thread ddl_session([&num_active_sessions] {
gaia::db::begin_ddl_session();
ASSERT_EQ(num_active_sessions, 0);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See comments below.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Which one?

  • If the one about ASSERT, this is on purpose, we want to ensure that the regular_session has finished by the time we reach this point.
  • If the one about locking, this is an atomic variable.

});
}

for (auto& thread : threads)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you have a timeout to ensure this test doesn't hang forever in case one of the threads blocks forever due to a bug, or are we just expecting the test framework to time out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should you have a timeout to ensure this test doesn't hang forever in case one of the threads blocks forever due to a bug, or are we just expecting the test framework to time out?

TeamCity used to have a time limit for tests, now I don't know. Anyway, we use this pattern pretty often, if we see tests hanging, we can add a timeout.

Copy link
Contributor

@senderista senderista left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic looks good, just a few nits about tests.

@LaurentiuCristofor LaurentiuCristofor force-pushed the rondelli-exclusive-ddl-sessions branch from db35d8a to 84047b1 Compare May 20, 2022 18:26
@LaurentiuCristofor LaurentiuCristofor merged commit 8315d09 into master May 20, 2022
@LaurentiuCristofor LaurentiuCristofor deleted the rondelli-exclusive-ddl-sessions branch May 20, 2022 19:00
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants