Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[core] Use boost iostream in pipe logger for cross-platform #50044

Open
wants to merge 12 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ ray_cc_library(
":stream_redirection_options",
":thread_utils",
":util",
"@boost//:iostreams",
"@com_github_spdlog//:spdlog",
"@com_google_absl//absl/strings",
],
Expand Down
302 changes: 125 additions & 177 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -33,119 +33,50 @@ namespace ray {

namespace {

// Default pipe log read buffer size.
constexpr size_t kDefaultPipeLogReadBufSize = 1024;

size_t GetPipeLogReadSizeOrDefault() {
// TODO(hjiang): Write a util function `GetEnvOrDefault`.
const char *var_value = std::getenv(kPipeLogReadBufSizeEnv.data());
if (var_value != nullptr) {
size_t read_buf_size = 0;
if (absl::SimpleAtoi(var_value, &read_buf_size) && read_buf_size > 0) {
return read_buf_size;
}
}
return kDefaultPipeLogReadBufSize;
}

struct StreamDumper {
absl::Mutex mu;
bool stopped ABSL_GUARDED_BY(mu) = false;
std::deque<std::string> content ABSL_GUARDED_BY(mu);
};

// File descriptors which indicates standard stream.
#if defined(__APPLE__) || defined(__linux__)
struct StdStreamFd {
int stdout_fd = STDOUT_FILENO;
int stderr_fd = STDERR_FILENO;
};
#elif defined(_WIN32)
// TODO(hjiang): not used for windows, implement later.
struct StdStreamFd {
int stdout_fd = -1;
int stderr_fd = -1;
// Used to write to dup-ed stdout and stderr; use shared pointer to make it copy
// constructible.
struct StreamSink {
dentiny marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stdout_sink;
dentiny marked this conversation as resolved.
Show resolved Hide resolved
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stderr_sink;
dentiny marked this conversation as resolved.
Show resolved Hide resolved
};
#endif

// Read bytes from handle into [data], return number of bytes read.
// If read fails, throw exception.
#if defined(__APPLE__) || defined(__linux__)
size_t Read(int read_fd, char *data, size_t len) {
// TODO(hjiang): Notice frequent read could cause performance issue.
ssize_t bytes_read = read(read_fd, data, len);
// TODO(hjiang): Add macros which checks for syscalls.
RAY_CHECK(bytes_read != -1) << "Fails to read from pipe because " << strerror(errno);
return bytes_read;
}
#endif

template <typename ReadFunc, typename WriteFunc, typename FlushFunc>
void StartStreamDump(ReadFunc read_func,
WriteFunc write_func,
FlushFunc flush_func,
std::function<void()> close_read_handle,
std::function<void()> on_close_completion) {
template <typename WriteFunc, typename FlushFunc>
void StartStreamDump(
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_source>>
pipe_instream,
WriteFunc write_func,
FlushFunc flush_func,
std::function<void()> on_close_completion) {
auto stream_dumper = std::make_shared<StreamDumper>();

// Create two threads, so there's no IO operation within critical section thus no
// blocking on write.
std::thread([read_func = std::move(read_func),
close_read_handle = std::move(close_read_handle),
std::thread([pipe_instream = std::move(pipe_instream),
stream_dumper = stream_dumper]() {
SetThreadName("PipeReaderThd");

const size_t buf_size = GetPipeLogReadSizeOrDefault();
// TODO(hjiang): Should resize without initialization.
std::string content(buf_size, '\0');
// Logging are written in lines, `last_line` records part of the strings left in
// last `read` syscall.
std::string last_line;

while (true) {
size_t bytes_read = read_func(content.data(), content.length());

// Bytes read of size 0 indicates write-side of pipe has been closed.
if (bytes_read == 0) {
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
if (!last_line.empty()) {
stream_dumper->content.emplace_back(std::move(last_line));
}
}

// Place IO operation out of critical section.
close_read_handle();

return;
}

std::string_view cur_content{content.data(), bytes_read};
std::vector<std::string_view> newlines = absl::StrSplit(cur_content, '\n');

for (size_t idx = 0; idx < newlines.size() - 1; ++idx) {
std::string cur_new_line = std::move(last_line);
cur_new_line += newlines[idx];
last_line.clear();

// Backfill newliner for current segment.
cur_new_line += '\n';
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(cur_new_line));
}
std::string newline;
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
}

// Special handle the last segment we've read.
//
// Nothing to do if we've read a complete newline.
if (content.back() == '\n') {
continue;
}
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
}

// Otherwise record the newline so we could reuse in the next read iteration.
last_line += newlines.back();
{
absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->stopped = true;
}
}).detach();

Expand Down Expand Up @@ -216,57 +147,22 @@ bool ShouldUsePipeStream(const StreamRedirectionOption &stream_redirect_opt) {
stream_redirect_opt.tee_to_stderr;
}

#if defined(__APPLE__) || defined(__linux__)
RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {
int fd = open(file_path.data(), O_WRONLY | O_CREAT, 0644);
RAY_CHECK_NE(fd, -1) << "Fails to open file " << file_path << " with failure reason "
<< strerror(errno);

auto flush_fn = [fd]() {
RAY_CHECK_EQ(fsync(fd), 0) << "Fails to flush data to disk because "
<< strerror(errno);
};
auto close_fn = [fd]() {
RAY_CHECK_EQ(fsync(fd), 0) << "Fails to flush data to disk because "
<< strerror(errno);
RAY_CHECK_EQ(close(fd), 0) << "Fails to close redirection file because "
<< strerror(errno);
};

return RedirectionFileHandle{fd, std::move(flush_fn), std::move(close_fn)};
}
#elif defined(_WIN32)
#include <windows.h>
RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {
HANDLE file_handle = CreateFile(file_path.c_str(),
jjyao marked this conversation as resolved.
Show resolved Hide resolved
GENERIC_WRITE,
0, // No sharing
NULL, // Default security attributes
CREATE_ALWAYS, // Always create a new file
FILE_ATTRIBUTE_NORMAL, // Normal file attributes
NULL // No template file
);
RAY_CHECK(file_handle != INVALID_HANDLE_VALUE)
<< "Fails to open file " << file_path << " with error "
<< std::to_string(GetLastError());

auto flush_fn = [file_handle]() {
RAY_CHECK(FlushFileBuffers(file_handle))
<< "Failed to flush data to disk with error: " << std::to_string(GetLastError());
boost::iostreams::file_descriptor_sink sink{file_path, std::ios_base::out};
auto handle = sink.handle();
auto ostream =
std::make_shared<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
auto flush_fn = [ostream]() { ostream->flush(); };
dentiny marked this conversation as resolved.
Show resolved Hide resolved
auto close_fn = [ostream]() {
ostream->flush();
ostream->close();
};
auto close_fn = [file_handle]() {
RAY_CHECK(FlushFileBuffers(file_handle))
<< "Failed to flush data to disk with error: " << std::to_string(GetLastError());
RAY_CHECK(CloseHandle(file_handle))
<< "Failed to close file with error: " << std::to_string(GetLastError());
};
return RedirectionFileHandle{file_handle, std::move(flush_fn), std::move(close_fn)};
return RedirectionFileHandle{
handle, std::move(ostream), std::move(flush_fn), std::move(close_fn)};
}
#endif

} // namespace

#if defined(__APPLE__) || defined(__linux__)
RedirectionFileHandle CreateRedirectionFileHandle(
const StreamRedirectionOption &stream_redirect_opt) {
// Case-1: only redirection, but not rotation and tee involved.
Expand All @@ -284,30 +180,92 @@ RedirectionFileHandle CreateRedirectionFileHandle(
// Invoked after flush and close finished.
auto on_close_completion = [promise = promise]() { promise->set_value(); };

StdStreamFd std_stream_fd{};
StreamSink std_stream_sink{};

#if defined(__APPLE__) || defined(__linux__)
if (stream_redirect_opt.tee_to_stdout) {
std_stream_fd.stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(std_stream_fd.stdout_fd, -1)
<< "Fails to duplicate stdout: " << strerror(errno);
int duped_stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stdout_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_stream_sink.stdout_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (stream_redirect_opt.tee_to_stderr) {
std_stream_fd.stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(std_stream_fd.stderr_fd, -1)
<< "Fails to duplicate stderr: " << strerror(errno);
int duped_stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stderr_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_stream_sink.stderr_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

// TODO(hjiang): Use `boost::iostreams` to represent pipe write fd, which supports
// cross-platform and line-wise read.
int pipefd[2] = {0};
// TODO(hjiang): We shoud have our own syscall macro.
RAY_CHECK_EQ(pipe(pipefd), 0);
int read_fd = pipefd[0];
int write_fd = pipefd[1];
int read_handle = pipefd[0];
int write_handle = pipefd[1];
boost::iostreams::file_descriptor_source pipe_read_source{
read_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
boost::iostreams::file_descriptor_sink pipe_write_sink{
write_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};

#elif defined(_WIN32)
if (stream_redirect_opt.tee_to_stdout) {
HANDLE duped_stderr_handle;
dentiny marked this conversation as resolved.
Show resolved Hide resolved
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_OUTPUT_HANDLE),
GetCurrentProcess(),
&duped_stderr_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stdout handle";

boost::iostreams::file_descriptor_sink sink{duped_stderr_handle, std::ios_base::out};
std_stream_sink.stdout_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (stream_redirect_opt.tee_to_stderr) {
HANDLE duped_stderr_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_ERROR_HANDLE),
GetCurrentProcess(),
&duped_stderr_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stderr handle";

boost::iostreams::file_descriptor_sink sink{duped_stderr_handle, std::ios_base::out};
std_stream_sink.stderr_sink = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

auto read_func = [read_fd](char *data, size_t len) { return Read(read_fd, data, len); };
auto close_read_handle = [read_fd]() { RAY_CHECK_EQ(close(read_fd), 0); };
auto close_fn = [write_fd, promise]() {
RAY_CHECK_EQ(close(write_fd), 0);
HANDLE read_pipe = nullptr;
HANDLE write_pipe = nullptr;
SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), nullptr, TRUE};
RAY_CHECK(CreatePipe(&read_pipe, &write_pipe, &sa, 0)) << "Fails to create pipe";
boost::iostreams::file_descriptor_source pipe_read_source{read_pipe, std::ios_base::in};
boost::iostreams::file_descriptor_sink pipe_write_sink{write_pipe, std::ios_base::out};

#endif

auto pipe_instream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_source>>(
std::move(pipe_read_source));
auto pipe_ostream =
std::make_shared<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(pipe_write_sink));

auto close_fn = [pipe_ostream, promise]() mutable {
pipe_ostream->flush();
pipe_ostream->close();
// Block until destruction finishes.
promise->get_future().get();
};
Expand All @@ -318,14 +276,12 @@ RedirectionFileHandle CreateRedirectionFileHandle(
// newliner, if any.
auto write_fn = [logger,
stream_redirect_opt = stream_redirect_opt,
std_stream_fd = std_stream_fd](std::string content) {
std_stream_sink = std_stream_sink](std::string content) {
if (stream_redirect_opt.tee_to_stdout) {
RAY_CHECK_EQ(write(std_stream_fd.stdout_fd, content.data(), content.length()),
static_cast<ssize_t>(content.length()));
std_stream_sink.stdout_sink->write(content.data(), content.length());
}
if (stream_redirect_opt.tee_to_stderr) {
RAY_CHECK_EQ(write(std_stream_fd.stderr_fd, content.data(), content.length()),
static_cast<ssize_t>(content.length()));
std_stream_sink.stderr_sink->write(content.data(), content.length());
}
if (logger != nullptr) {
// spdlog adds newliner for every content, no need to maintan the application-passed
Expand All @@ -338,35 +294,27 @@ RedirectionFileHandle CreateRedirectionFileHandle(
};
auto flush_fn = [logger,
stream_redirect_opt = stream_redirect_opt,
std_stream_fd = std_stream_fd]() {
std_stream_sink = std_stream_sink]() {
jjyao marked this conversation as resolved.
Show resolved Hide resolved
if (logger != nullptr) {
logger->flush();
}
if (stream_redirect_opt.tee_to_stdout) {
fsync(std_stream_fd.stdout_fd);
std_stream_sink.stdout_sink->flush();
}
if (stream_redirect_opt.tee_to_stderr) {
std_stream_sink.stderr_sink->flush();
}
// No need to sync for stderr since it's unbuffered.
};

StartStreamDump(std::move(read_func),
StartStreamDump(std::move(pipe_instream),
std::move(write_fn),
flush_fn,
std::move(close_read_handle),
std::move(on_close_completion));

RedirectionFileHandle redirection_file_handle{
write_fd, std::move(flush_fn), std::move(close_fn)};
write_handle, std::move(pipe_ostream), std::move(flush_fn), std::move(close_fn)};

return redirection_file_handle;
}

#elif defined(_WIN32)
RedirectionFileHandle CreateRedirectionFileHandle(
const StreamRedirectionOption &stream_redirect_opt) {
// TODO(hjiang): For windows, we currently doesn't support redirection with rotation and
// tee to stdout/stderr.
return OpenFileForRedirection(stream_redirect_opt.file_path);
}
#endif

} // namespace ray
Loading
Loading