Skip to content

Commit

Permalink
Simplify RMI server architecture.
Browse files Browse the repository at this point in the history
* Reduce the number of threads used in the RMI server architecture. They
  seem unnecessary and confusing to me.

Signed-off-by: Samuel K. Gutierrez <[email protected]>
  • Loading branch information
samuelkgutierrez committed Feb 4, 2025
1 parent dbfeaf4 commit 1ed087d
Show file tree
Hide file tree
Showing 7 changed files with 51 additions and 268 deletions.
18 changes: 9 additions & 9 deletions include/quo-vadis-pthread.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,15 +39,15 @@ extern "C" {
// - explicit: threads are placed according to a list of OS proc IDs (required)
// TODO(skg) Do we need all of these synonyms?
typedef enum {
QV_POLICY_PACKED = 1,
QV_POLICY_COMPACT = 1,
QV_POLICY_CLOSE = 1,
QV_POLICY_SPREAD = 2,
QV_POLICY_DISTRIBUTE = 3,
QV_POLICY_ALTERNATE = 3,
QV_POLICY_CORESFIRST = 3,
QV_POLICY_SCATTER = 4,
QV_POLICY_CHOOSE = 5
QV_POLICY_PACKED = 1,
QV_POLICY_COMPACT = 1,
QV_POLICY_CLOSE = 1,
QV_POLICY_SPREAD = 2,
QV_POLICY_DISTRIBUTE = 3,
QV_POLICY_ALTERNATE = 3,
QV_POLICY_CORESFIRST = 3,
QV_POLICY_SCATTER = 4,
QV_POLICY_CHOOSE = 5
} qv_pthread_placement_t;

/**
Expand Down
5 changes: 2 additions & 3 deletions src/quo-vadisd.cc
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/* -*- Mode: C++; c-basic-offset:4; indent-tabs-mode:nil -*- */
/*
* Copyright (c) 2020-2024 Triad National Security, LLC
* Copyright (c) 2020-2025 Triad National Security, LLC
* All rights reserved.
*
* Copyright (c) 2020-2021 Lawrence Livermore National Security, LLC
Expand Down Expand Up @@ -132,8 +132,7 @@ rmi_start(

cstr_t ers = nullptr;

static const bool blocks = true;
int rc = qvi_rmi_server_start(ctx.rmi, blocks);
int rc = qvi_rmi_server_start(ctx.rmi);
if (rc != QV_SUCCESS) {
ers = "qvi_rmi_server_start() failed";
}
Expand Down
105 changes: 18 additions & 87 deletions src/qvi-rmi.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,6 @@ do { \
qvi_log_warn("{} with errno={} ({})", ers, (erno), qvi_strerr((erno))); \
} while (0)

static constexpr cstr_t ZINPROC_ADDR = "inproc://qvi-rmi-workers";

typedef enum qvi_rpc_funid_e {
FID_INVALID = 0,
FID_SERVER_SHUTDOWN,
Expand Down Expand Up @@ -85,11 +83,6 @@ typedef int (*qvi_rpc_fun_ptr_t)(
qvi_bbuff **
);

static void
send_server_shutdown_msg(
qvi_rmi_server_t *server
);

static void
zsocket_close(
void **sock
Expand Down Expand Up @@ -129,10 +122,6 @@ struct qvi_rmi_server_s {
void *zctx = nullptr;
/** Loopback socket for managerial messages. */
void *zlo = nullptr;
/** The worker thread. */
pthread_t worker_thread;
/** Flag indicating if main thread blocks for workers to complete. */
bool blocks = false;
/** Constructor. */
qvi_rmi_server_s(void)
{
Expand All @@ -145,14 +134,10 @@ struct qvi_rmi_server_s {
/** Destructor. */
~qvi_rmi_server_s(void)
{
send_server_shutdown_msg(this);
zsocket_close(&zlo);
zctx_destroy(&zctx);
unlink(config.hwtopo_path.c_str());
qvi_delete(&hwpool);
if (!blocks) {
pthread_join(worker_thread, nullptr);
}
}
};

Expand Down Expand Up @@ -772,16 +757,15 @@ server_rpc_dispatch(
return (shutdown ? QV_SUCCESS_SHUTDOWN : rc);
}

static void *
static int
server_go(
void *data
qvi_rmi_server_t *server
) {
qvi_rmi_server_t *const server = (qvi_rmi_server_t *)data;

void *zworksock = zsocket_create_and_connect(
server->zctx, ZMQ_REP, ZINPROC_ADDR
void *zworksock = zsocket_create_and_bind(
server->zctx, ZMQ_REP, server->config.url.c_str()
);
if (qvi_unlikely(!zworksock)) return nullptr;
if (qvi_unlikely(!zworksock)) return QV_ERR_SYS;

int rc, bsent;
volatile int bsentt = 0;
Expand All @@ -807,15 +791,7 @@ server_go(
if (qvi_unlikely(rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN)) {
qvi_log_error("RX/TX loop exited with rc={} ({})", rc, qv_strerr(rc));
}
return nullptr;
}

static void
send_server_shutdown_msg(
qvi_rmi_server_t *server
) {
(void)rpc_req(server->zlo, FID_SERVER_SHUTDOWN, QVI_BBUFF_RMI_ZERO_MSG);
(void)rpc_rep(server->zlo, QVI_BBUFF_RMI_ZERO_MSG);
return QV_SUCCESS;
}

int
Expand All @@ -840,49 +816,9 @@ server_populate_base_hwpool(
return server->hwpool->initialize(hwloc, cpuset);
}

static void *
server_start_threads(
void *data
) {
qvi_rmi_server_t *server = (qvi_rmi_server_t *)data;

void *clients = zsocket_create_and_bind(
server->zctx, ZMQ_ROUTER, server->config.url.c_str()
);
if (qvi_unlikely(!clients)) {
cstr_t ers = "zsocket_create_and_bind() failed";
qvi_log_error("{}", ers);
return nullptr;
}

void *workers = zsocket_create_and_bind(
server->zctx, ZMQ_DEALER, ZINPROC_ADDR
);
if (qvi_unlikely(!workers)) {
cstr_t ers = "zsocket_create_and_bind() failed";
qvi_log_error("{}", ers);
return nullptr;
}

pthread_t worker;
int rc = pthread_create(&worker, nullptr, server_go, server);
if (qvi_unlikely(rc != 0)) {
cstr_t ers = "pthread_create() failed";
qvi_log_error("{} with rc={} ({})", ers, rc, qvi_strerr(rc));
}
// The zmq_proxy() function always returns -1 and errno set to ETERM.
zmq_proxy(clients, workers, nullptr);
pthread_join(worker, nullptr);
zsocket_close(&workers);

zsocket_close(&clients);
return nullptr;
}

int
qvi_rmi_server_start(
qvi_rmi_server_t *server,
bool block
qvi_rmi_server_t *server
) {
// First populate the base hardware resource pool.
int qvrc = server_populate_base_hwpool(server);
Expand All @@ -891,23 +827,9 @@ qvi_rmi_server_start(
server->zlo = zsocket_create_and_connect(
server->zctx, ZMQ_REQ, server->config.url.c_str()
);
if (qvi_unlikely(!server->zlo)) return QV_ERR_RPC;

const int rc = pthread_create(
&server->worker_thread, nullptr,
server_start_threads, server
);
if (qvi_unlikely(rc != 0)) {
cstr_t ers = "pthread_create() failed";
qvi_log_error("{} with rc={} ({})", ers, rc, qvi_strerr(rc));
qvrc = QV_ERR_SYS;
}
if (qvi_unlikely(!server->zlo)) return QV_ERR_SYS;

if (block && qvrc == QV_SUCCESS) {
server->blocks = true;
pthread_join(server->worker_thread, nullptr);
}
return qvrc;
return server_go(server);
}

static int
Expand Down Expand Up @@ -1096,6 +1018,15 @@ qvi_rmi_get_cpuset_for_nobjs(
);
}

int
qvi_rmi_send_shutdown_message(
qvi_rmi_client_t *client
) {
return rpc_req(
client->zsock, FID_SERVER_SHUTDOWN
);
}

/*
* vim: ft=cpp ts=4 sts=4 sw=4 expandtab
*/
8 changes: 6 additions & 2 deletions src/qvi-rmi.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,7 @@ qvi_rmi_server_config(
*/
int
qvi_rmi_server_start(
qvi_rmi_server_t *server,
bool block
qvi_rmi_server_t *server
);

/**
Expand Down Expand Up @@ -183,6 +182,11 @@ qvi_rmi_get_cpuset_for_nobjs(
hwloc_cpuset_t *result
);

int
qvi_rmi_send_shutdown_message(
qvi_rmi_client_t *client
);

#ifdef __cplusplus
}
#endif
Expand Down
29 changes: 3 additions & 26 deletions tests/internal/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# -*- Mode: C++; c-basic-offset:4; indent-tabs-mode:nil -*-
#
# Copyright (c) 2020-2024 Triad National Security, LLC
# Copyright (c) 2020-2025 Triad National Security, LLC
# All rights reserved.
#
# Copyright (c) 2020-2021 Lawrence Livermore National Security, LLC
Expand Down Expand Up @@ -31,26 +31,6 @@ add_test(
test-hwloc
)

################################################################################
################################################################################
add_executable(
test-rmi-server
test-rmi-server.cc
)

target_link_libraries(
test-rmi-server
quo-vadis
)

add_test(
NAME
rmi-server
COMMAND
bash -c "export URL=\"tcp://127.0.0.1:55995\" && \
${CMAKE_CURRENT_BINARY_DIR}/test-rmi-server $URL"
)

################################################################################
################################################################################
add_executable(
Expand All @@ -68,12 +48,10 @@ add_test(
rmi
COMMAND
bash -c "export URL=\"tcp://127.0.0.1:55995\" && \
( ${CMAKE_SOURCE_DIR}/tests/exec-timeout.sh \
\"${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -s\" 5 & ) && \
( ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -c & ) && \
( ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -s & ) && \
( ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -c & ) && \
( ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -c & ) && \
${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -c"
${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -cc"
)

# Use the C linker to test for C/C++ linkage problems.
Expand All @@ -87,7 +65,6 @@ set_target_properties(
# Set core test properties.
set_tests_properties(
hwloc
rmi-server
rmi
PROPERTIES
TIMEOUT 60
Expand Down
Loading

0 comments on commit 1ed087d

Please sign in to comment.