Skip to content

Commit

Permalink
Implement systemd-style readiness notification via NOTIFY_SOCKET
Browse files Browse the repository at this point in the history
The service may specify this with ready-notification=socket:path.
This will result in dinit creating an abstract datagram socket
and perform reads on it. As soon as the READY=1 datagram arrives,
the readiness notification is signaled.
  • Loading branch information
q66 committed Oct 22, 2024
1 parent e7ad5b1 commit 84ca292
Show file tree
Hide file tree
Showing 7 changed files with 169 additions and 51 deletions.
113 changes: 73 additions & 40 deletions src/baseproc-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ void base_process_service::do_smooth_recovery() noexcept

bool base_process_service::bring_up() noexcept
{
if (!open_socket()) {
if (!open_socket() || !open_ready_socket()) {
becoming_inactive();
return false;
}

Expand Down Expand Up @@ -175,7 +176,7 @@ bool base_process_service::start_ps_process(const std::vector<const char *> &cmd
}
}

if (have_notify) {
if (have_notify && ready_socket_fd < 0) {
// Create a notification pipe:
if (bp_sys::pipe2(notify_pipe, 0) != 0) {
log(loglevel_t::ERROR, get_name(), ": can't create notification pipe: ", strerror(errno));
Expand All @@ -185,10 +186,12 @@ bool base_process_service::start_ps_process(const std::vector<const char *> &cmd
// Set the read side as close-on-exec:
int fdflags = bp_sys::fcntl(notify_pipe[0], F_GETFD);
bp_sys::fcntl(notify_pipe[0], F_SETFD, fdflags | FD_CLOEXEC);
}

if (have_notify) {
// add, but don't yet enable, readiness watcher:
try {
rwatcher->add_watch(event_loop, notify_pipe[0], dasynq::IN_EVENTS, false);
rwatcher->add_watch(event_loop, ready_socket_fd >= 0 ? ready_socket_fd : notify_pipe[0], dasynq::IN_EVENTS, false);
ready_watcher_registered = true;
}
catch (std::exception &exc) {
Expand Down Expand Up @@ -251,7 +254,7 @@ bool base_process_service::start_ps_process(const std::vector<const char *> &cmd
run_params.unmask_sigint = onstart_flags.unmask_intr;
run_params.csfd = control_socket[1];
run_params.socket_fd = socket_fd;
run_params.notify_fd = notify_pipe[1];
run_params.notify_fd = ready_socket_fd >= 0 ? ready_socket_fd : notify_pipe[1];
run_params.force_notify_fd = force_notification_fd;
run_params.notify_var = notification_var.c_str();
run_params.env_file = env_file.c_str();
Expand Down Expand Up @@ -469,86 +472,116 @@ void base_process_service::becoming_inactive() noexcept
close(socket_fd);
socket_fd = -1;
}
}

bool base_process_service::open_socket() noexcept
{
if (socket_path.empty() || socket_fd != -1) {
// No socket, or already open
return true;
if (ready_socket_fd != -1) {
close(ready_socket_fd);
ready_socket_fd = -1;
}
free(ready_socket_name);
ready_socket_name = nullptr;
}

const char * saddrname = socket_path.c_str();

static int open_sock(const char *path, const std::string &svcname, int type,
uid_t uid, gid_t gid, int perms, struct sockaddr_un *&name) noexcept {
// Check the specified socket path
struct stat stat_buf;
if (stat(saddrname, &stat_buf) == 0) {
if (stat(path, &stat_buf) == 0) {
if ((stat_buf.st_mode & S_IFSOCK) == 0) {
// Not a socket
log(loglevel_t::ERROR, get_name(), ": activation socket file exists (and is not a socket)");
return false;
log(loglevel_t::ERROR, svcname, ": socket file exists (and is not a socket)");
return -1;
}
}
else if (errno != ENOENT) {
// Other error
log(loglevel_t::ERROR, get_name(), ": error checking activation socket: ", strerror(errno));
return false;
log(loglevel_t::ERROR, svcname, ": error checking socket: ", strerror(errno));
return -1;
}

// Remove stale socket file (if it exists).
// We won't test the return from unlink - if it fails other than due to ENOENT, we should get an
// error when we try to create the socket anyway.
unlink(saddrname);
unlink(path);

uint sockaddr_size = offsetof(struct sockaddr_un, sun_path) + socket_path.length() + 1;
struct sockaddr_un * name = static_cast<sockaddr_un *>(malloc(sockaddr_size));
uint sockaddr_size = offsetof(struct sockaddr_un, sun_path) + strlen(path) + 1;
name = static_cast<sockaddr_un *>(malloc(sockaddr_size));
if (name == nullptr) {
log(loglevel_t::ERROR, get_name(), ": opening activation socket: out of memory");
return false;
log(loglevel_t::ERROR, svcname, ": opening socket: out of memory");
return -1;
}

name->sun_family = AF_UNIX;
strcpy(name->sun_path, saddrname);
strcpy(name->sun_path, path);

int sockfd = dinit_socket(AF_UNIX, SOCK_STREAM, 0, SOCK_NONBLOCK | SOCK_CLOEXEC);
int sockfd = dinit_socket(AF_UNIX, type, 0, SOCK_NONBLOCK | SOCK_CLOEXEC);
if (sockfd == -1) {
log(loglevel_t::ERROR, get_name(), ": error creating activation socket: ", strerror(errno));
log(loglevel_t::ERROR, svcname, ": error creating socket: ", strerror(errno));
free(name);
return false;
return -1;
}

if (bind(sockfd, (struct sockaddr *) name, sockaddr_size) == -1) {
log(loglevel_t::ERROR, get_name(), ": error binding activation socket: ", strerror(errno));
log(loglevel_t::ERROR, svcname, ": error binding socket: ", strerror(errno));
close(sockfd);
free(name);
return false;
return -1;
}

free(name);

// POSIX (1003.1, 2013) says that fchown and fchmod don't necessarily work on sockets. We have to
// use chown and chmod instead.
if (chown(saddrname, socket_uid, socket_gid)) {
log(loglevel_t::ERROR, get_name(), ": error setting activation socket owner/group: ",
if (chown(path, uid, gid)) {
log(loglevel_t::ERROR, svcname, ": error setting socket owner/group: ",
strerror(errno));
close(sockfd);
return false;
return -1;
}

if (chmod(saddrname, socket_perms) == -1) {
log(loglevel_t::ERROR, get_name(), ": Error setting activation socket permissions: ",
if (chmod(path, perms) == -1) {
log(loglevel_t::ERROR, svcname, ": Error setting socket permissions: ",
strerror(errno));
close(sockfd);
return false;
return -1;
}

if (listen(sockfd, 128) == -1) { // 128 "seems reasonable".
log(loglevel_t::ERROR, ": error listening on activation socket: ", strerror(errno));
if (type != SOCK_DGRAM && listen(sockfd, 128) == -1) { // 128 "seems reasonable".
log(loglevel_t::ERROR, ": error listening on socket: ", strerror(errno));
close(sockfd);
return -1;
}

return sockfd;
}

bool base_process_service::open_socket() noexcept
{
if (socket_path.empty() || socket_fd != -1) {
// No socket, or already open
return true;
}

struct sockaddr_un *name = nullptr;
socket_fd = open_sock(socket_path.c_str(), get_name(), SOCK_STREAM, socket_uid,
socket_gid, socket_perms, name);
free(name);

return socket_fd >= 0;
}

bool base_process_service::open_ready_socket() noexcept
{
if (ready_socket_path.empty() || ready_socket_fd != -1) {
// No socket, or already open
return true;
}

ready_socket_fd = open_sock(ready_socket_path.c_str(), get_name(), SOCK_DGRAM,
ready_socket_uid, ready_socket_gid, ready_socket_perms, ready_socket_name);

if (ready_socket_fd < 0) {
free(ready_socket_name);
ready_socket_name = nullptr;
return false;
}

socket_fd = sockfd;
return true;
}

Expand Down
40 changes: 38 additions & 2 deletions src/includes/load-service.h
Original file line number Diff line number Diff line change
Expand Up @@ -225,7 +225,7 @@ enum class setting_id_t {
LOGFILE_GID, LOG_TYPE, LOG_BUFFER_SIZE, CONSUMER_OF, RESTART, SMOOTH_RECOVERY, OPTIONS,
LOAD_OPTIONS, TERM_SIGNAL, TERMSIGNAL /* deprecated */, RESTART_LIMIT_INTERVAL, RESTART_DELAY,
RESTART_LIMIT_COUNT, STOP_TIMEOUT, START_TIMEOUT, RUN_AS, CHAIN_TO, READY_NOTIFICATION,
INITTAB_ID, INITTAB_LINE,
READY_SOCKET_PERMISSIONS, READY_SOCKET_UID, READY_SOCKET_GID, INITTAB_ID, INITTAB_LINE,
// Prefixed with SETTING_ to avoid name collision with system macros:
SETTING_RLIMIT_NOFILE, SETTING_RLIMIT_CORE, SETTING_RLIMIT_DATA, SETTING_RLIMIT_ADDRSPACE,
// Possibly unsupported depending on platform/build options:
Expand Down Expand Up @@ -1282,12 +1282,17 @@ class service_settings_wrapper
auto_restart_mode auto_restart = auto_restart_mode::DEFAULT_AUTO_RESTART;
bool smooth_recovery = false;
string socket_path;
string ready_socket_path;
int socket_perms = 0666;
// Note: Posix allows that uid_t and gid_t may be unsigned types, but eg chown uses -1 as an
// invalid value, so it's safe to assume that we can do the same:
uid_t socket_uid = -1;
gid_t socket_uid_gid = -1; // primary group of socket user if known
gid_t socket_gid = -1;
int ready_socket_perms = 0600;
uid_t ready_socket_uid = -1;
gid_t ready_socket_uid_gid = -1;
gid_t ready_socket_gid = -1;
// Restart limit interval / count; default is 10 seconds, 3 restarts:
timespec restart_interval = { .tv_sec = 10, .tv_nsec = 0 };
int max_restarts = 3;
Expand Down Expand Up @@ -1362,6 +1367,9 @@ class service_settings_wrapper
if (!socket_path.empty()) {
report_lint("'socket-listen' specified, but ignored for the specified (or default) service type'.");
}
if (!ready_socket_path.empty()) {
report_lint("'ready-notification' specified, but ignored for the specified (or default) service type'.");
}
#if USE_UTMPX
if (inittab_id[0] != 0 || inittab_line[0] != 0) {
report_lint("'inittab_line' or 'inittab_id' specified, but ignored for the specified (or default) service type.");
Expand Down Expand Up @@ -1395,7 +1403,7 @@ class service_settings_wrapper
report_error("process ID file ('pid-file') not specified for bgprocess service.");
}

if (readiness_fd != -1 || !readiness_var.empty()) {
if (readiness_fd != -1 || !ready_socket_path.empty() || !readiness_var.empty()) {
report_error("readiness notification ('ready-notification') is not supported "
"for bgprocess services.");
}
Expand All @@ -1421,6 +1429,7 @@ class service_settings_wrapper
};

do_resolve("socket-listen", socket_path);
do_resolve("ready-notification", ready_socket_path);
do_resolve("logfile", logfile);
do_resolve("working-dir", working_dir);
do_resolve("pid-file", pid_file);
Expand All @@ -1429,6 +1438,7 @@ class service_settings_wrapper
// If socket_gid hasn't been explicitly set, but the socket_uid was specified as a name (and
// we therefore recovered the primary group), use the primary group of the specified user.
if (socket_gid == (gid_t)-1) socket_gid = socket_uid_gid;
if (ready_socket_gid == (gid_t)-1) ready_socket_gid = ready_socket_uid_gid;
// Also for logfile_uid/gid, we reset logfile ownership to dinit process uid/gid if uid/gid
// wasn't specified by service
if (logfile_uid == (uid_t) -1) logfile_uid = getuid();
Expand Down Expand Up @@ -1872,12 +1882,38 @@ void process_service_line(settings_wrapper &settings, const char *name, const ch
"ready-notification", input_pos);
}
}
else if (starts_with(notify_setting, "socket:")) {
settings.ready_socket_path = notify_setting.substr(7 /* len 'socket:' */);
if (settings.ready_socket_path.empty()) {
throw service_description_exc(name, "invalid readiness socket path",
"ready-notification", input_pos);
}
}
else {
throw service_description_exc(name, "unrecognised setting: " + notify_setting,
"ready-notification", input_pos);
}
break;
}
case setting_id_t::READY_SOCKET_PERMISSIONS:
{
string sock_perm_str = read_setting_value(input_pos, i, end, nullptr);
settings.ready_socket_perms = parse_perms(input_pos, sock_perm_str, name, "ready-socket-permissions");
break;
}
case setting_id_t::READY_SOCKET_UID:
{
string sock_uid_s = read_setting_value(input_pos, i, end, nullptr);
settings.ready_socket_uid = parse_uid_param(input_pos, sock_uid_s, name, "ready-socket-uid",
&settings.ready_socket_uid_gid);
break;
}
case setting_id_t::READY_SOCKET_GID:
{
string sock_gid_s = read_setting_value(input_pos, i, end, nullptr);
settings.ready_socket_gid = parse_gid_param(input_pos, sock_gid_s, "ready-socket-gid", name);
break;
}
case setting_id_t::INITTAB_ID:
{
string inittab_setting = read_setting_value(input_pos, i, end, nullptr);
Expand Down
24 changes: 24 additions & 0 deletions src/includes/proc-service.h
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,13 @@ class base_process_service : public service_record
int log_output_fd = -1; // If logging via buffer/pipe, write end of the pipe
int log_input_fd = -1; // If logging via buffer/pipe, read end of the pipe

string ready_socket_path; // path to the socket for ready-notification
int ready_socket_perms = 0; // socket permissions ("mode")
uid_t ready_socket_uid = -1; // socket user id or -1
gid_t ready_socket_gid = -1; // socket group id or -1
int ready_socket_fd = -1; // For socket ready notification, this is the file descriptor for the socket.
struct sockaddr_un *ready_socket_name = nullptr; // Since ready socket is UDP, we need this for recvfrom().

// Only one of waiting_restart_timer and waiting_stopstart_timer should be set at any time.
// They indicate that the process timer is armed (and why).
bool waiting_restart_timer : 1;
Expand Down Expand Up @@ -298,6 +305,9 @@ class base_process_service : public service_record
// Open the activation socket, return false on failure
bool open_socket() noexcept;

// Open the readiness socket, return false on failure
bool open_ready_socket() noexcept;

// Get the readiness notification watcher for this service, if it has one; may return nullptr.
virtual ready_notify_watcher *get_ready_watcher() noexcept
{
Expand Down Expand Up @@ -518,6 +528,20 @@ class base_process_service : public service_record
notification_var = std::move(varname);
}

void set_ready_socket_details(string &&socket_path, int socket_perms, uid_t socket_uid, uid_t socket_gid)
noexcept
{
ready_socket_path = std::move(socket_path);
ready_socket_perms = socket_perms;
ready_socket_uid = socket_uid;
ready_socket_gid = socket_gid;
if (ready_socket_path.length() > 0) {
// we want to expose this in the environment
// and this also simplifies the logic elsewhere
set_notification_var("NOTIFY_SOCKET=" + ready_socket_path);
}
}

// The restart/stop timer expired.
void timer_expired() noexcept;

Expand Down
2 changes: 2 additions & 0 deletions src/load-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -742,6 +742,8 @@ service_record * dirload_service_set::load_reload_service(const char *fullname,
rvalps->set_run_as_uid_gid(settings.run_as_uid, settings.run_as_gid);
rvalps->set_notification_fd(settings.readiness_fd);
rvalps->set_notification_var(std::move(settings.readiness_var));
rvalps->set_ready_socket_details(std::move(settings.ready_socket_path),
settings.ready_socket_perms, settings.ready_socket_uid, settings.ready_socket_gid);
rvalps->set_logfile_details(std::move(settings.logfile), settings.logfile_perms,
settings.logfile_uid, settings.logfile_gid);
rvalps->set_log_buf_max(settings.max_log_buffer_sz);
Expand Down
22 changes: 19 additions & 3 deletions src/proc-service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -175,8 +175,20 @@ rearm ready_notify_watcher::fd_event(eventloop_t &, int fd, int flags) noexcept
{
char buf[128];
if (service->get_state() == service_state_t::STARTING) {
// can we actually read anything from the notification pipe?
int r = bp_sys::read(fd, buf, sizeof(buf));
// can we actually read anything from the notification pipe/socket?
ssize_t r;
if (fd != service->ready_socket_fd) {
r = bp_sys::read(fd, buf, sizeof(buf));
}
else {
socklen_t alen = service->ready_socket_path.length() + sizeof(sa_family_t);
r = recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)service->ready_socket_name, &alen);
if (r > 0 && (r != strlen("READY=1") || memcmp(buf, "READY=1", strlen("READY=1")))) {
/* ignore datagram */
errno = EAGAIN;
r = -1;
}
}
if (r > 0) {
if (service->waiting_stopstart_timer) {
service->process_timer.stop_timer(event_loop);
Expand All @@ -195,7 +207,7 @@ rearm ready_notify_watcher::fd_event(eventloop_t &, int fd, int flags) noexcept
}
service->services->process_queues();
}
else {
else if (fd != service->ready_socket_fd) {
// Just keep consuming data from the pipe:
int r = bp_sys::read(fd, buf, sizeof(buf));
if (r == 0) {
Expand All @@ -204,6 +216,10 @@ rearm ready_notify_watcher::fd_event(eventloop_t &, int fd, int flags) noexcept
service->notification_fd = -1;
return rearm::DISARM;
}
} else {
// Just consume the datagram
socklen_t alen = service->ready_socket_path.length() + sizeof(sa_family_t);
recvfrom(fd, buf, sizeof(buf), 0, (struct sockaddr *)service->ready_socket_name, &alen);
}

return rearm::REARM;
Expand Down
Loading

0 comments on commit 84ca292

Please sign in to comment.