Skip to content

Commit

Permalink
[core] Use spdlog fd sink within pipe logger (#50173)
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny authored Feb 3, 2025
1 parent e272a53 commit dbf00a7
Show file tree
Hide file tree
Showing 4 changed files with 113 additions and 188 deletions.
1 change: 1 addition & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -255,6 +255,7 @@ ray_cc_library(
srcs = ["pipe_logger.cc"],
deps = [
":compat",
":spdlog_fd_sink",
":stream_redirection_options",
":thread_utils",
":util",
Expand Down
231 changes: 81 additions & 150 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include <thread>

#include "absl/strings/str_split.h"
#include "ray/util/spdlog_fd_sink.h"
#include "ray/util/thread_utils.h"
#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/sinks/rotating_file_sink.h"
Expand All @@ -39,24 +40,13 @@ struct StreamDumper {
std::deque<std::string> content ABSL_GUARDED_BY(mu);
};

// Used to write to dup-ed stdout and stderr; use shared pointer to make it copy
// constructible.
struct StdOstream {
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stdout_ostream;
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>
stderr_ostream;
};

// Start two threads:
// 1. A reader thread which continuously reads from [pipe_stream] until close;
// 2. A dumper thread which writes content to sink via [write_func].
template <typename WriteFunc, typename FlushFunc>
void StartStreamDump(
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_source>>
pipe_instream,
WriteFunc write_func,
FlushFunc flush_func,
std::shared_ptr<spdlog::logger> logger,
std::function<void()> on_close_completion) {
auto stream_dumper = std::make_shared<StreamDumper>();

Expand Down Expand Up @@ -87,8 +77,7 @@ void StartStreamDump(
}).detach();

std::thread([stream_dumper = stream_dumper,
write_func = std::move(write_func),
flush_func = std::move(flush_func),
logger = std::move(logger),
on_close_completion = std::move(on_close_completion)]() {
SetThreadName("PipeDumpThd");

Expand All @@ -108,22 +97,25 @@ void StartStreamDump(
curline = std::move(stream_dumper->content.front());
stream_dumper->content.pop_front();
} else if (stream_dumper->stopped) {
flush_func();
logger->flush();
on_close_completion();
return;
}
}

// Perform IO operation out of critical section.
write_func(std::move(curline));
logger->log(spdlog::level::info, std::move(curline));
}
}).detach();
}

// Create a spdlog logger with all sinks specified by the given option.
std::shared_ptr<spdlog::logger> CreateLogger(
const StreamRedirectionOption &stream_redirect_opt) {
std::vector<spdlog::sink_ptr> logging_sinks;
// TODO(hjiang): Could optimize to reduce heap allocation.
std::vector<spdlog::sink_ptr> sinks;

// Setup file sink.
spdlog::sink_ptr file_sink = nullptr;
if (stream_redirect_opt.rotation_max_size != std::numeric_limits<size_t>::max()) {
file_sink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(
Expand All @@ -135,11 +127,61 @@ std::shared_ptr<spdlog::logger> CreateLogger(
stream_redirect_opt.file_path);
}
file_sink->set_level(spdlog::level::info);
sinks.emplace_back(std::move(file_sink));

// Setup fd sink for stdout and stderr.
#if defined(__APPLE__) || defined(__linux__)
if (stream_redirect_opt.tee_to_stdout) {
int duped_stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno);
auto stdout_sink = std::make_shared<non_owned_fd_sink_st>(duped_stdout_fd);
sinks.emplace_back(std::move(stdout_sink));
}
if (stream_redirect_opt.tee_to_stderr) {
int duped_stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno);
auto stderr_sink = std::make_shared<non_owned_fd_sink_st>(duped_stderr_fd);
sinks.emplace_back(std::move(stderr_sink));
}

#elif defined(_WIN32)
if (stream_redirect_opt.tee_to_stdout) {
HANDLE duped_stdout_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_OUTPUT_HANDLE),
GetCurrentProcess(),
&duped_stdout_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stdout handle";
auto stdout_sink = std::make_shared<non_owned_fd_sink_st>(duped_stdout_handle);
sinks.emplace_back(std::move(stdout_sink));
}
if (stream_redirect_opt.tee_to_stderr) {
HANDLE duped_stderr_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_ERROR_HANDLE),
GetCurrentProcess(),
&duped_stderr_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stderr handle";
auto stderr_sink = std::make_shared<non_owned_fd_sink_st>(duped_stderr_handle);
sinks.emplace_back(std::move(stderr_sink));
}
#endif

auto logger = std::make_shared<spdlog::logger>(
/*name=*/absl::StrFormat("pipe-logger-%s", stream_redirect_opt.file_path),
std::move(file_sink));
std::make_move_iterator(sinks.begin()),
std::make_move_iterator(sinks.end()));
logger->set_level(spdlog::level::info);
logger->set_pattern("%v"); // Only message string is logged.
// Only message is logged without extra newliner.
auto formatter = std::make_unique<spdlog::pattern_formatter>(
"%v", spdlog::pattern_time_type::local, std::string(""));
logger->set_formatter(std::move(formatter));
return logger;
}

Expand All @@ -154,27 +196,22 @@ bool ShouldUsePipeStream(const StreamRedirectionOption &stream_redirect_opt) {
}

RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {
boost::iostreams::file_descriptor_sink sink{file_path, std::ios_base::out};
auto handle = sink.handle();
boost::iostreams::file_descriptor_sink fd_sink{file_path, std::ios_base::out};
auto handle = fd_sink.handle();
auto ostream =
std::make_shared<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
auto flush_fn = [ostream, handle]() {
// Flush stream internal buffer to fd.
ostream->flush();
// Flush file handle.
#if defined(__APPLE__) || defined(__linux__)
RAY_CHECK_EQ(fdatasync(handle), 0);
#elif defined(_WIN32)
RAY_CHECK(FlushFileBuffers(handle));
#endif
};
auto close_fn = [flush_fn, ostream]() {
flush_fn();
ostream->close();
};
return RedirectionFileHandle{
handle, std::move(ostream), std::move(flush_fn), std::move(close_fn)};
std::move(fd_sink));

// In this case, we don't write to the file via logger, so no need to set formatter.
// spglog is used here merely to reuse the same [RedirectionFileHandle] interface.
auto logger_sink = std::make_shared<non_owned_fd_sink_st>(handle);
auto logger = std::make_shared<spdlog::logger>(
/*name=*/absl::StrFormat("pipe-logger-%s", file_path), std::move(logger_sink));

// Lifecycle for the file handle is bound at [ostream] thus [close_fn].
auto close_fn = [ostream = std::move(ostream)]() { ostream->close(); };

return RedirectionFileHandle{handle, std::move(logger), std::move(close_fn)};
}
} // namespace

Expand All @@ -195,87 +232,22 @@ RedirectionFileHandle CreateRedirectionFileHandle(
// Invoked after flush and close finished.
auto on_close_completion = [promise = promise]() { promise->set_value(); };

StdOstream std_ostream{};

#if defined(__APPLE__) || defined(__linux__)
if (stream_redirect_opt.tee_to_stdout) {
int duped_stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stdout_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_ostream.stdout_ostream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (stream_redirect_opt.tee_to_stderr) {
int duped_stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno);

boost::iostreams::file_descriptor_sink sink{
duped_stderr_fd, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_ostream.stderr_ostream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

int pipefd[2] = {0};
RAY_CHECK_EQ(pipe(pipefd), 0);
int read_handle = pipefd[0];
int write_handle = pipefd[1];
boost::iostreams::file_descriptor_source pipe_read_source{
read_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
boost::iostreams::file_descriptor_sink pipe_write_sink{
write_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};

#elif defined(_WIN32)
if (stream_redirect_opt.tee_to_stdout) {
HANDLE duped_stdout_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_OUTPUT_HANDLE),
GetCurrentProcess(),
&duped_stdout_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stdout handle";

boost::iostreams::file_descriptor_sink sink{
duped_stdout_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_ostream.stdout_ostream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}
if (stream_redirect_opt.tee_to_stderr) {
HANDLE duped_stderr_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_ERROR_HANDLE),
GetCurrentProcess(),
&duped_stderr_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stderr handle";

boost::iostreams::file_descriptor_sink sink{
duped_stderr_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
std_ostream.stderr_ostream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(sink));
}

HANDLE read_handle = nullptr;
HANDLE write_handle = nullptr;
SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), nullptr, TRUE};
RAY_CHECK(CreatePipe(&read_handle, &write_handle, &sa, 0)) << "Fails to create pipe";
#endif

boost::iostreams::file_descriptor_source pipe_read_source{
read_handle,
/*file_descriptor_flags=*/boost::iostreams::close_handle};
read_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
boost::iostreams::file_descriptor_sink pipe_write_sink{
write_handle,
/*file_descriptor_flags=*/boost::iostreams::close_handle};

#endif
write_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};

auto pipe_instream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_source>>(
Expand All @@ -291,51 +263,10 @@ RedirectionFileHandle CreateRedirectionFileHandle(
};

auto logger = CreateLogger(stream_redirect_opt);

// [content] is exactly what application writes to pipe, including the trailing
// newliner, if any.
auto write_fn = [logger,
stream_redirect_opt = stream_redirect_opt,
std_ostream = std_ostream](std::string content) {
if (stream_redirect_opt.tee_to_stdout) {
std_ostream.stdout_ostream->write(content.data(), content.length());
RAY_CHECK(std_ostream.stdout_ostream->good());
}
if (stream_redirect_opt.tee_to_stderr) {
std_ostream.stderr_ostream->write(content.data(), content.length());
RAY_CHECK(std_ostream.stderr_ostream->good());
}
if (logger != nullptr) {
// spdlog adds newliner for every content, no need to maintan the application-passed
// one.
if (!content.empty() && content.back() == '\n') {
content.pop_back();
}
logger->log(spdlog::level::info, content);
}
};
auto flush_fn =
[logger, stream_redirect_opt = stream_redirect_opt, std_ostream = std_ostream]() {
if (logger != nullptr) {
logger->flush();
}
if (stream_redirect_opt.tee_to_stdout) {
std_ostream.stdout_ostream->flush();
RAY_CHECK(std_ostream.stdout_ostream->good());
}
if (stream_redirect_opt.tee_to_stderr) {
std_ostream.stderr_ostream->flush();
RAY_CHECK(std_ostream.stderr_ostream->good());
}
};

StartStreamDump(std::move(pipe_instream),
std::move(write_fn),
flush_fn,
std::move(on_close_completion));
StartStreamDump(std::move(pipe_instream), logger, std::move(on_close_completion));

RedirectionFileHandle redirection_file_handle{
write_handle, std::move(pipe_ostream), std::move(flush_fn), std::move(close_fn)};
write_handle, logger, std::move(close_fn)};

return redirection_file_handle;
}
Expand Down
Loading

0 comments on commit dbf00a7

Please sign in to comment.