Skip to content

Commit

Permalink
cannot use spdlog because it has newliner
Browse files Browse the repository at this point in the history
Signed-off-by: dentiny <[email protected]>
  • Loading branch information
dentiny committed Feb 1, 2025
1 parent c6c7957 commit d6699e1
Show file tree
Hide file tree
Showing 5 changed files with 133 additions and 86 deletions.
1 change: 1 addition & 0 deletions src/ray/util/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -269,6 +269,7 @@ ray_cc_library(
srcs = ["pipe_logger.cc"],
deps = [
":compat",
":spdlog_fd_sink",
":stream_redirection_options",
":thread_utils",
":util",
Expand Down
145 changes: 74 additions & 71 deletions src/ray/util/pipe_logger.cc
Original file line number Diff line number Diff line change
Expand Up @@ -24,11 +24,11 @@
#include <thread>

#include "absl/strings/str_split.h"
#include "ray/util/spdlog_fd_sink.h"
#include "ray/util/thread_utils.h"
#include "spdlog/sinks/basic_file_sink.h"
#include "spdlog/sinks/rotating_file_sink.h"
#include "spdlog/sinks/stdout_color_sinks.h"
#include "ray/util/spdlog_fd_sink.h"

namespace ray {

Expand All @@ -43,7 +43,6 @@ struct StreamDumper {
// Start two threads:
// 1. A reader thread which continuously reads from [pipe_stream] until close;
// 2. A dumper thread which writes content to sink via [write_func].
template <typename WriteFunc, typename FlushFunc>
void StartStreamDump(
std::shared_ptr<boost::iostreams::stream<boost::iostreams::file_descriptor_source>>
pipe_instream,
Expand All @@ -61,11 +60,6 @@ void StartStreamDump(

// Exit at pipe read EOF.
while (std::getline(*pipe_instream, newline)) {
// Backfill newliner for current segment.
if (!pipe_instream->eof()) {
newline += '\n';
}

absl::MutexLock lock(&stream_dumper->mu);
stream_dumper->content.emplace_back(std::move(newline));
}
Expand Down Expand Up @@ -105,14 +99,17 @@ void StartStreamDump(
}

// Perform IO operation out of critical section.
write_func(std::move(curline));
logger->log(spdlog::level::info, std::move(curline));
}
}).detach();
}

// Create a spdlog logger with all sinks specified by the given option.
std::shared_ptr<spdlog::logger> CreateLogger(
const StreamRedirectionOption &stream_redirect_opt) {
std::vector<spdlog::sink_ptr> sinks;

// Setup file sink.
spdlog::sink_ptr file_sink = nullptr;
if (stream_redirect_opt.rotation_max_size != std::numeric_limits<size_t>::max()) {
file_sink = std::make_shared<spdlog::sinks::rotating_file_sink_mt>(
Expand All @@ -124,9 +121,56 @@ std::shared_ptr<spdlog::logger> CreateLogger(
stream_redirect_opt.file_path);
}
file_sink->set_level(spdlog::level::info);
sinks.emplace_back(std::move(file_sink));

// Setup fd sink for stdout and stderr.
#if defined(__APPLE__) || defined(__linux__)
if (stream_redirect_opt.tee_to_stdout) {
int duped_stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno);
auto stdout_sink = std::make_shared<non_owned_fd_sink_st>(duped_stdout_fd);
sinks.emplace_back(std::move(stdout_sink));
}
if (stream_redirect_opt.tee_to_stderr) {
int duped_stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno);
auto stderr_sink = std::make_shared<non_owned_fd_sink_st>(duped_stderr_fd);
sinks.emplace_back(std::move(stderr_sink));
}

#elif defined(_WIN32)
if (stream_redirect_opt.tee_to_stdout) {
HANDLE duped_stdout_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_OUTPUT_HANDLE),
GetCurrentProcess(),
&duped_stdout_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stdout handle";
auto stderr_sink = std::make_shared<non_owned_fd_sink>(duped_stderr_fd);
sinks.emplace_back(std::move(stderr_sink));
}
if (stream_redirect_opt.tee_to_stderr) {
HANDLE duped_stderr_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_ERROR_HANDLE),
GetCurrentProcess(),
&duped_stderr_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stderr handle";
auto stderr_sink = std::make_shared<non_owned_fd_sink>(duped_stderr_fd);
sinks.emplace_back(std::move(stderr_sink));
}
#endif

auto logger = std::make_shared<spdlog::logger>(
/*name=*/absl::StrFormat("pipe-logger-%s", stream_redirect_opt.file_path),
std::move(file_sink));
std::make_move_iterator(sinks.begin()),
std::make_move_iterator(sinks.end()));
logger->set_level(spdlog::level::info);
logger->set_pattern("%v"); // Only message string is logged.
return logger;
Expand All @@ -143,12 +187,22 @@ bool ShouldUsePipeStream(const StreamRedirectionOption &stream_redirect_opt) {
}

RedirectionFileHandle OpenFileForRedirection(const std::string &file_path) {
boost::iostreams::file_descriptor_sink sink{file_path, std::ios_base::out};
auto handle = sink.handle();

boost::iostreams::file_descriptor_sink fd_sink{file_path, std::ios_base::out};
auto handle = fd_sink.handle();
auto ostream =
std::make_shared<boost::iostreams::stream<boost::iostreams::file_descriptor_sink>>(
std::move(fd_sink));

auto logger_sink = std::make_shared<non_owned_fd_sink_st>(handle);
auto logger = std::make_shared<spdlog::logger>(
/*name=*/absl::StrFormat("pipe-logger-%s", file_path), std::move(logger_sink));
logger->set_level(spdlog::level::info);
logger->set_pattern("%v"); // Only message string is logged.

return RedirectionFileHandle{
handle, std::move(ostream), std::move(flush_fn), std::move(close_fn)};
// Lifecycle for the file handle is bound at [ostream] thus [close_fn].
auto close_fn = [ostream = std::move(ostream)]() { ostream->close(); };

return RedirectionFileHandle{handle, std::move(logger), std::move(close_fn)};
}
} // namespace

Expand All @@ -169,71 +223,22 @@ RedirectionFileHandle CreateRedirectionFileHandle(
// Invoked after flush and close finished.
auto on_close_completion = [promise = promise]() { promise->set_value(); };

std::vector<spdlog::sink_ptr> sinks;

#if defined(__APPLE__) || defined(__linux__)
if (stream_redirect_opt.tee_to_stdout) {
int duped_stdout_fd = dup(STDOUT_FILENO);
RAY_CHECK_NE(duped_stdout_fd, -1) << "Fails to duplicate stdout: " << strerror(errno);
auto stdout_sink = std::make_shared<non_owned_fd_sink>(duped_stdout_fd);
sinks.emplace_back(std::move(stdout_sink));
}
if (stream_redirect_opt.tee_to_stderr) {
int duped_stderr_fd = dup(STDERR_FILENO);
RAY_CHECK_NE(duped_stderr_fd, -1) << "Fails to duplicate stderr: " << strerror(errno);
auto stderr_sink = std::make_shared<non_owned_fd_sink>(duped_stderr_fd);
sinks.emplace_back(std::move(stderr_sink));
}

int pipefd[2] = {0};
RAY_CHECK_EQ(pipe(pipefd), 0);
int read_handle = pipefd[0];
int write_handle = pipefd[1];
boost::iostreams::file_descriptor_source pipe_read_source{
read_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
boost::iostreams::file_descriptor_sink pipe_write_sink{
write_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};

#elif defined(_WIN32)
if (stream_redirect_opt.tee_to_stdout) {
HANDLE duped_stdout_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_OUTPUT_HANDLE),
GetCurrentProcess(),
&duped_stdout_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stdout handle";
auto stderr_sink = std::make_shared<non_owned_fd_sink>(duped_stderr_fd);
sinks.emplace_back(std::move(stderr_sink));
}
if (stream_redirect_opt.tee_to_stderr) {
HANDLE duped_stderr_handle;
BOOL result = DuplicateHandle(GetCurrentProcess(),
GetStdHandle(STD_ERROR_HANDLE),
GetCurrentProcess(),
&duped_stderr_handle,
0,
FALSE,
DUPLICATE_SAME_ACCESS);
RAY_CHECK(result) << "Fails to duplicate stderr handle";
auto stderr_sink = std::make_shared<non_owned_fd_sink>(duped_stderr_fd);
sinks.emplace_back(std::move(stderr_sink));
}

HANDLE read_handle = nullptr;
HANDLE write_handle = nullptr;
SECURITY_ATTRIBUTES sa = {sizeof(SECURITY_ATTRIBUTES), nullptr, TRUE};
RAY_CHECK(CreatePipe(&read_handle, &write_handle, &sa, 0)) << "Fails to create pipe";
#endif

boost::iostreams::file_descriptor_source pipe_read_source{
read_handle,
/*file_descriptor_flags=*/boost::iostreams::close_handle};
read_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};
boost::iostreams::file_descriptor_sink pipe_write_sink{
write_handle,
/*file_descriptor_flags=*/boost::iostreams::close_handle};

#endif
write_handle, /*file_descriptor_flags=*/boost::iostreams::close_handle};

auto pipe_instream = std::make_shared<
boost::iostreams::stream<boost::iostreams::file_descriptor_source>>(
Expand All @@ -249,12 +254,10 @@ RedirectionFileHandle CreateRedirectionFileHandle(
};

auto logger = CreateLogger(stream_redirect_opt);
StartStreamDump(std::move(pipe_instream),
std::move(logger),
std::move(on_close_completion));
StartStreamDump(std::move(pipe_instream), logger, std::move(on_close_completion));

RedirectionFileHandle redirection_file_handle{
write_handle, std::move(pipe_ostream), std::move(flush_fn), std::move(close_fn)};
write_handle, logger, std::move(close_fn)};

return redirection_file_handle;
}
Expand Down
23 changes: 15 additions & 8 deletions src/ray/util/pipe_logger.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,9 @@ class RedirectionFileHandle {

// @param termination_synchronizer is used to block wait until destruction operation
// finishes.
RedirectionFileHandle(
MEMFD_TYPE_NON_UNIQUE write_handle,
std::shared_ptr<spdlog::logger> logger,
std::function<void()> close_fn)
RedirectionFileHandle(MEMFD_TYPE_NON_UNIQUE write_handle,
std::shared_ptr<spdlog::logger> logger,
std::function<void()> close_fn)
: write_handle_(write_handle),
logger_(std::move(logger)),
close_fn_(std::move(close_fn)) {}
Expand Down Expand Up @@ -96,14 +95,22 @@ class RedirectionFileHandle {
// TODO(hjiang): Current method only flushes whatever we send to logger, but not those
// in the pipe; a better approach is flush pipe, send FLUSH indicator and block wait
// until logger sync over.
void Flush() {
logger_->flush();
}
void Flush() { logger_->flush(); }

MEMFD_TYPE_NON_UNIQUE GetWriteHandle() const { return write_handle_; }

// Write the given data into redirection handle; currently only for testing usage.
void CompleteWrite(const char *data, size_t len) { pipe_ostream_->write(data, len); }
//
// TODO(hjiang): Use platform compatible API, see
// https://github.com/ray-project/ray/pull/50170
void CompleteWrite(const char *data, size_t len) {
#if defined(__APPLE__) || defined(__linux__)
(void)write(write_handle_, data, len);
#elif defined(_WIN32)
DWORD bytes_written;
(void)WriteFile(fd, data, (DWORD)len, &bytes_written, NULL);
#endif
}

private:
MEMFD_TYPE_NON_UNIQUE write_handle_;
Expand Down
36 changes: 36 additions & 0 deletions src/ray/util/spdlog_payload_formatter.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
// Copyright 2025 The Ray Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <spdlog/sinks/base_sink.h>

namespace ray {

// Default formatter for spdlog appends newliner to content, [spdlog_noop_formatter] appends payload with no modification.
class spdlog_noop_formatter : public spdlog::formatter {
public:
spdlog_noop_formatter() = default;
~spdlog_noop_formatter() override = default;

void format(const spdlog::details::log_msg &msg, spdlog::memory_buf_t &dest) override {
spdlog::details::fmt_helper::append_string_view(msg.payload, dest);
}

std::unique_ptr<formatter> clone() const override {
return std::make_unique<spdlog_noop_formatter>();
}
};

} // namespace ray
14 changes: 7 additions & 7 deletions src/ray/util/tests/pipe_logger_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
stream_redirection_handle.Close();

const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
EXPECT_EQ(stdout_content, "hello\n");

// Pipe logger automatically adds a newliner at the end.
const auto actual_content = ReadEntireFile(test_file_path);
Expand All @@ -175,7 +175,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
stream_redirection_handle.Close();

const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
EXPECT_EQ(stdout_content, "hello\n");

const auto actual_content = ReadEntireFile(test_file_path);
RAY_ASSERT_OK(actual_content);
Expand All @@ -199,7 +199,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
stream_redirection_handle.Close();

const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
EXPECT_EQ(stdout_content, "hello\nworld\n");

// Pipe logger automatically adds a newliner at the end.
const auto actual_content = ReadEntireFile(test_file_path);
Expand All @@ -224,7 +224,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
stream_redirection_handle.Close();

const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
EXPECT_EQ(stdout_content, "hello\nworld\n");

const auto actual_content = ReadEntireFile(test_file_path);
RAY_EXPECT_OK(actual_content);
Expand All @@ -248,7 +248,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
stream_redirection_handle.Close();

const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
EXPECT_EQ(stdout_content, "helloworld\n\n\n");

const auto actual_content = ReadEntireFile(test_file_path);
RAY_EXPECT_OK(actual_content);
Expand All @@ -272,7 +272,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
stream_redirection_handle.Close();

const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
EXPECT_EQ(stdout_content, "hello\n\n\nworld\n");

// Pipe logger automatically adds a newliner at the end.
const auto actual_content = ReadEntireFile(test_file_path);
Expand All @@ -297,7 +297,7 @@ TEST(PipeLoggerCompatTest, CompatibilityTest) {
stream_redirection_handle.Close();

const std::string stdout_content = testing::internal::GetCapturedStdout();
EXPECT_EQ(stdout_content, kContent);
EXPECT_EQ(stdout_content, "hello\n\nworld\n\n");

// Pipe logger automatically adds a newliner at the end.
const auto actual_content = ReadEntireFile(test_file_path);
Expand Down

0 comments on commit d6699e1

Please sign in to comment.