Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix cursor thread leak from closing unconsumed iterators #917

Merged
merged 4 commits into from
Sep 15, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions production/db/core/inc/db_client.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ class client_t
// this generator to build a range or iterator object.
template <typename T_element_type>
static std::function<std::optional<T_element_type>()>
get_stream_generator_for_socket(int stream_socket);
get_stream_generator_for_socket(std::shared_ptr<int> stream_socket_ptr);
senderista marked this conversation as resolved.
Show resolved Hide resolved

private:
// These fields have transaction lifetime.
Expand Down Expand Up @@ -143,7 +143,7 @@ class client_t

static int get_session_socket(const std::string& socket_name);

static int get_id_cursor_socket_for_type(common::gaia_type_t type);
static std::shared_ptr<int> get_id_cursor_socket_for_type(common::gaia_type_t type);

static std::function<std::optional<int>()>
get_fd_stream_generator_for_socket(int stream_socket);
Expand Down
17 changes: 7 additions & 10 deletions production/db/core/inc/db_client.inc
Original file line number Diff line number Diff line change
Expand Up @@ -99,10 +99,10 @@ inline void client_t::txn_log(
// This generator wraps a socket which reads a stream of values of `T_element_type` from the server.
template <typename T_element_type>
std::function<std::optional<T_element_type>()>
client_t::get_stream_generator_for_socket(int stream_socket)
client_t::get_stream_generator_for_socket(std::shared_ptr<int> stream_socket_ptr)
{
// Verify that the socket is the correct type for the semantics we assume.
common::check_socket_type(stream_socket, SOCK_SEQPACKET);
common::check_socket_type(*stream_socket_ptr, SOCK_SEQPACKET);

// Currently, we associate a cursor with a snapshot view, i.e., a transaction.
verify_txn_active();
Expand All @@ -112,9 +112,9 @@ client_t::get_stream_generator_for_socket(int stream_socket)
std::vector<T_element_type> batch_buffer;

// The definition of the generator we return.
return [stream_socket, owning_txn_id, batch_buffer]() mutable -> std::optional<T_element_type> {
return [stream_socket_ptr, owning_txn_id, batch_buffer]() mutable -> std::optional<T_element_type> {
// We shouldn't be called again after we received EOF from the server.
ASSERT_INVARIANT(stream_socket != -1, c_message_stream_socket_is_invalid);
ASSERT_INVARIANT(*stream_socket_ptr != -1, c_message_stream_socket_is_invalid);

// The cursor should only be called from within the scope of its owning transaction.
ASSERT_INVARIANT(s_txn_id == owning_txn_id, "Cursor was not called from the scope of its own transaction!");
Expand All @@ -128,18 +128,15 @@ client_t::get_stream_generator_for_socket(int stream_socket)
// of an extra system call per batch.
// We set MSG_PEEK to avoid reading the datagram into our buffer,
// and we set MSG_TRUNC to return the actual buffer size needed.
ssize_t datagram_size = ::recv(stream_socket, nullptr, 0, MSG_PEEK | MSG_TRUNC);
ssize_t datagram_size = ::recv(*stream_socket_ptr, nullptr, 0, MSG_PEEK | MSG_TRUNC);
if (datagram_size == -1)
{
common::throw_system_error("recv(MSG_PEEK) failed!");
}

if (datagram_size == 0)
{
// We received EOF from the server, so close
// client socket and stop iteration.
common::close_fd(stream_socket);
// Tell the caller to stop iteration.
// We received EOF from the server, so tell the caller to stop iteration.
return std::nullopt;
}

Expand All @@ -153,7 +150,7 @@ client_t::get_stream_generator_for_socket(int stream_socket)
// Get the actual data.
// This is a nonblocking read, because the previous blocking
// read will not return until data is available.
ssize_t bytes_read = ::recv(stream_socket, batch_buffer.data(), batch_buffer.size(), MSG_DONTWAIT);
ssize_t bytes_read = ::recv(*stream_socket_ptr, batch_buffer.data(), batch_buffer.size(), MSG_DONTWAIT);
if (bytes_read == -1)
{
// Per above, we should never have to block here.
Expand Down
25 changes: 16 additions & 9 deletions production/db/core/src/db_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,12 +41,13 @@ using namespace gaia::db::memory_manager;
using namespace flatbuffers;
using namespace scope_guard;

int client_t::get_id_cursor_socket_for_type(gaia_type_t type)
std::shared_ptr<int> client_t::get_id_cursor_socket_for_type(gaia_type_t type)
{
// Build the cursor socket request.
FlatBufferBuilder builder;
auto table_scan_info = Createtable_scan_info_t(builder, type);
auto client_request = Createclient_request_t(builder, session_event_t::REQUEST_STREAM, request_data_t::table_scan, table_scan_info.Union());
auto client_request = Createclient_request_t(
builder, session_event_t::REQUEST_STREAM, request_data_t::table_scan, table_scan_info.Union());
auto message = Createmessage_t(builder, any_message_t::request, client_request.Union());
builder.Finish(message);

Expand All @@ -64,8 +65,18 @@ int client_t::get_id_cursor_socket_for_type(gaia_type_t type)
// Check that our stream socket is blocking (because we need to perform blocking reads).
ASSERT_INVARIANT(!is_non_blocking(stream_socket), "Stream socket is not set to blocking!");

// We use shared_ptr with a custom deleter to guarantee that the socket is
// closed when its owning object is destroyed. We could possibly achieve the
// same effect with an RAII wrapper, but it would need to have copy rather
// than move semantics, since the socket is captured by a lambda that must
// be copyable (since it is coerced to std::function).
std::shared_ptr<int> stream_socket_ptr(new int{stream_socket}, [](int* fd_ptr) { close_fd(*fd_ptr); delete fd_ptr; });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Late suggestion: why not move this code above and get rid of the scope guard completely?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below for why. The asserts are not why we need the scope_guard (they should be expected to terminate the program, so the socket would be closed in any case).


// Both our explicit new() and the shared_ptr constructor dynamically allocate
// memory, so we might need to clean up the socket if either fails.
cleanup_stream_socket.dismiss();
return stream_socket;

return stream_socket_ptr;
}

std::function<std::optional<gaia_id_t>()>
Expand Down Expand Up @@ -124,13 +135,9 @@ client_t::augment_id_generator_for_type(gaia_type_t type, std::function<std::opt
std::shared_ptr<gaia::common::iterators::generator_t<gaia_id_t>>
client_t::get_id_generator_for_type(gaia_type_t type)
{
int stream_socket = get_id_cursor_socket_for_type(type);
auto cleanup_stream_socket = make_scope_guard([&]() {
close_fd(stream_socket);
});
std::shared_ptr<int> stream_socket_ptr = get_id_cursor_socket_for_type(type);

auto id_generator = get_stream_generator_for_socket<gaia_id_t>(stream_socket);
cleanup_stream_socket.dismiss();
auto id_generator = get_stream_generator_for_socket<gaia_id_t>(stream_socket_ptr);

// We need to augment the server-based id generator with a local generator
// that will also return the elements that have been added by the client
Expand Down
2 changes: 1 addition & 1 deletion production/db/inc/query_processor/scan_generators.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class scan_generator_t
std::shared_ptr<query_processor::scan::index_predicate_t> predicate);

private:
static int get_record_cursor_socket_for_index(
static std::shared_ptr<int> get_record_cursor_socket_for_index(
common::gaia_id_t index_id,
gaia_txn_id_t txn_id,
std::shared_ptr<query_processor::scan::index_predicate_t> predicate);
Expand Down
22 changes: 14 additions & 8 deletions production/db/query_processor/src/scan_generators.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,18 +31,14 @@ scan_generator_t::get_record_generator_for_index(
gaia_txn_id_t txn_id,
std::shared_ptr<query_processor::scan::index_predicate_t> predicate)
{
int stream_socket = get_record_cursor_socket_for_index(index_id, txn_id, predicate);
auto cleanup_stream_socket = make_scope_guard([&]() {
close_fd(stream_socket);
});
std::shared_ptr<int> stream_socket_ptr = get_record_cursor_socket_for_index(index_id, txn_id, predicate);

auto record_generator = client_t::get_stream_generator_for_socket<index::index_record_t>(stream_socket);
cleanup_stream_socket.dismiss();
auto record_generator = client_t::get_stream_generator_for_socket<index::index_record_t>(stream_socket_ptr);

return std::make_shared<gaia::common::iterators::generator_t<index::index_record_t>>(record_generator);
}

int scan_generator_t::get_record_cursor_socket_for_index(
std::shared_ptr<int> scan_generator_t::get_record_cursor_socket_for_index(
common::gaia_id_t index_id,
gaia_txn_id_t txn_id,
std::shared_ptr<query_processor::scan::index_predicate_t> predicate)
Expand Down Expand Up @@ -89,8 +85,18 @@ int scan_generator_t::get_record_cursor_socket_for_index(
// Check that our stream socket is blocking (because we need to perform blocking reads).
ASSERT_INVARIANT(!is_non_blocking(stream_socket), "Stream socket is not set to blocking!");

// We use shared_ptr with a custom deleter to guarantee that the socket is
// closed when its owning object is destroyed. We could possibly achieve the
// same effect with an RAII wrapper, but it would need to have copy rather
// than move semantics, since the socket is captured by a lambda that must
// be copyable (since it is coerced to std::function).
std::shared_ptr<int> stream_socket_ptr(new int{stream_socket}, [](int* fd_ptr) { close_fd(*fd_ptr); delete fd_ptr; });
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here too we could probably move this code and eliminate the scope_guard. The asserts can happen after the wrapping into the shared_ptr.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we need the scope_guard regardless, because dynamic allocation can always throw, and we need to close the socket in that case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

(There are actually 2 dynamic allocations happening here: one explicit, which is our operator new call, and the other implicit, which is shared_ptr allocating its shared refcount structure on the heap. make_shared() combines these into a single allocation, but we can't use it because it doesn't support custom deleters.)

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then why remove the scope guard in the other situations? You could have kept it for the same reason.

Anyway, I don't think we should worry about cleanup in the case that new() fails - in that case the server would stop execution anyway.

Copy link
Contributor Author

@senderista senderista Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You're right that in general we don't try to recover from exceptions on the server (unless they're from a misbehaving or crashed client), but the iterator code is all client-side at least conceptually (i.e., it consumes a "cursor socket" sent over the session socket by the server). We don't control the exception handling policy in a client application (e.g., the client might try to recover from a std::bad_alloc exception thrown by the stream_socket_ptr allocation by freeing some memory that they own). That's why exception safety is important to get right on the client, while it can be treated as a mostly theoretical concern on the server (but I still treat non-exception-safety as a bug there as well).

Copy link
Contributor Author

@senderista senderista Sep 15, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But then why remove the scope guard in the other situations? You could have kept it for the same reason.

Not totally sure which "other situations" you're referring to, but the reason I removed scope_guard in the consumers of stream_socket_ptr is that the shared_ptr destructor now closes the socket if an exception is thrown in one of the consumers, so exception safety no longer requires a scope_guard there. The key difference is that the shared_ptr has already been successfully constructed at that point, so it now owns the socket and its destructor is responsible for closing the socket.


// Both our explicit new() and the shared_ptr constructor dynamically allocate
// memory, so we might need to clean up the socket if either fails.
cleanup_stream_socket.dismiss();
return stream_socket;

return stream_socket_ptr;
}
} // namespace scan
} // namespace query_processor
Expand Down