From 1ed087d3651f73f92e45583e6505d4f788527a36 Mon Sep 17 00:00:00 2001 From: "Samuel K. Gutierrez" Date: Tue, 4 Feb 2025 15:09:30 -0700 Subject: [PATCH] Simplify RMI server architecture. * Reduce the number of threads used in the RMI server architecture. They seem unnecessary and confusing to me. Signed-off-by: Samuel K. Gutierrez --- include/quo-vadis-pthread.h | 18 ++--- src/quo-vadisd.cc | 5 +- src/qvi-rmi.cc | 105 +++++-------------------- src/qvi-rmi.h | 8 +- tests/internal/CMakeLists.txt | 29 +------ tests/internal/test-rmi-server.cc | 123 ------------------------------ tests/internal/test-rmi.cc | 31 ++++---- 7 files changed, 51 insertions(+), 268 deletions(-) delete mode 100644 tests/internal/test-rmi-server.cc diff --git a/include/quo-vadis-pthread.h b/include/quo-vadis-pthread.h index a49a897..6b64fe0 100644 --- a/include/quo-vadis-pthread.h +++ b/include/quo-vadis-pthread.h @@ -39,15 +39,15 @@ extern "C" { // - explicit: threads are placed according to a list of OS proc IDs (required) // TODO(skg) Do we need all of these synonyms? typedef enum { - QV_POLICY_PACKED = 1, - QV_POLICY_COMPACT = 1, - QV_POLICY_CLOSE = 1, - QV_POLICY_SPREAD = 2, - QV_POLICY_DISTRIBUTE = 3, - QV_POLICY_ALTERNATE = 3, - QV_POLICY_CORESFIRST = 3, - QV_POLICY_SCATTER = 4, - QV_POLICY_CHOOSE = 5 + QV_POLICY_PACKED = 1, + QV_POLICY_COMPACT = 1, + QV_POLICY_CLOSE = 1, + QV_POLICY_SPREAD = 2, + QV_POLICY_DISTRIBUTE = 3, + QV_POLICY_ALTERNATE = 3, + QV_POLICY_CORESFIRST = 3, + QV_POLICY_SCATTER = 4, + QV_POLICY_CHOOSE = 5 } qv_pthread_placement_t; /** diff --git a/src/quo-vadisd.cc b/src/quo-vadisd.cc index 43bda19..32e11cd 100644 --- a/src/quo-vadisd.cc +++ b/src/quo-vadisd.cc @@ -1,6 +1,6 @@ /* -*- Mode: C++; c-basic-offset:4; indent-tabs-mode:nil -*- */ /* - * Copyright (c) 2020-2024 Triad National Security, LLC + * Copyright (c) 2020-2025 Triad National Security, LLC * All rights reserved. * * Copyright (c) 2020-2021 Lawrence Livermore National Security, LLC @@ -132,8 +132,7 @@ rmi_start( cstr_t ers = nullptr; - static const bool blocks = true; - int rc = qvi_rmi_server_start(ctx.rmi, blocks); + int rc = qvi_rmi_server_start(ctx.rmi); if (rc != QV_SUCCESS) { ers = "qvi_rmi_server_start() failed"; } diff --git a/src/qvi-rmi.cc b/src/qvi-rmi.cc index 8b668d2..de3eb89 100644 --- a/src/qvi-rmi.cc +++ b/src/qvi-rmi.cc @@ -53,8 +53,6 @@ do { \ qvi_log_warn("{} with errno={} ({})", ers, (erno), qvi_strerr((erno))); \ } while (0) -static constexpr cstr_t ZINPROC_ADDR = "inproc://qvi-rmi-workers"; - typedef enum qvi_rpc_funid_e { FID_INVALID = 0, FID_SERVER_SHUTDOWN, @@ -85,11 +83,6 @@ typedef int (*qvi_rpc_fun_ptr_t)( qvi_bbuff ** ); -static void -send_server_shutdown_msg( - qvi_rmi_server_t *server -); - static void zsocket_close( void **sock @@ -129,10 +122,6 @@ struct qvi_rmi_server_s { void *zctx = nullptr; /** Loopback socket for managerial messages. */ void *zlo = nullptr; - /** The worker thread. */ - pthread_t worker_thread; - /** Flag indicating if main thread blocks for workers to complete. */ - bool blocks = false; /** Constructor. */ qvi_rmi_server_s(void) { @@ -145,14 +134,10 @@ struct qvi_rmi_server_s { /** Destructor. */ ~qvi_rmi_server_s(void) { - send_server_shutdown_msg(this); zsocket_close(&zlo); zctx_destroy(&zctx); unlink(config.hwtopo_path.c_str()); qvi_delete(&hwpool); - if (!blocks) { - pthread_join(worker_thread, nullptr); - } } }; @@ -772,16 +757,15 @@ server_rpc_dispatch( return (shutdown ? QV_SUCCESS_SHUTDOWN : rc); } -static void * +static int server_go( - void *data + qvi_rmi_server_t *server ) { - qvi_rmi_server_t *const server = (qvi_rmi_server_t *)data; - void *zworksock = zsocket_create_and_connect( - server->zctx, ZMQ_REP, ZINPROC_ADDR + void *zworksock = zsocket_create_and_bind( + server->zctx, ZMQ_REP, server->config.url.c_str() ); - if (qvi_unlikely(!zworksock)) return nullptr; + if (qvi_unlikely(!zworksock)) return QV_ERR_SYS; int rc, bsent; volatile int bsentt = 0; @@ -807,15 +791,7 @@ server_go( if (qvi_unlikely(rc != QV_SUCCESS && rc != QV_SUCCESS_SHUTDOWN)) { qvi_log_error("RX/TX loop exited with rc={} ({})", rc, qv_strerr(rc)); } - return nullptr; -} - -static void -send_server_shutdown_msg( - qvi_rmi_server_t *server -) { - (void)rpc_req(server->zlo, FID_SERVER_SHUTDOWN, QVI_BBUFF_RMI_ZERO_MSG); - (void)rpc_rep(server->zlo, QVI_BBUFF_RMI_ZERO_MSG); + return QV_SUCCESS; } int @@ -840,49 +816,9 @@ server_populate_base_hwpool( return server->hwpool->initialize(hwloc, cpuset); } -static void * -server_start_threads( - void *data -) { - qvi_rmi_server_t *server = (qvi_rmi_server_t *)data; - - void *clients = zsocket_create_and_bind( - server->zctx, ZMQ_ROUTER, server->config.url.c_str() - ); - if (qvi_unlikely(!clients)) { - cstr_t ers = "zsocket_create_and_bind() failed"; - qvi_log_error("{}", ers); - return nullptr; - } - - void *workers = zsocket_create_and_bind( - server->zctx, ZMQ_DEALER, ZINPROC_ADDR - ); - if (qvi_unlikely(!workers)) { - cstr_t ers = "zsocket_create_and_bind() failed"; - qvi_log_error("{}", ers); - return nullptr; - } - - pthread_t worker; - int rc = pthread_create(&worker, nullptr, server_go, server); - if (qvi_unlikely(rc != 0)) { - cstr_t ers = "pthread_create() failed"; - qvi_log_error("{} with rc={} ({})", ers, rc, qvi_strerr(rc)); - } - // The zmq_proxy() function always returns -1 and errno set to ETERM. - zmq_proxy(clients, workers, nullptr); - pthread_join(worker, nullptr); - zsocket_close(&workers); - - zsocket_close(&clients); - return nullptr; -} - int qvi_rmi_server_start( - qvi_rmi_server_t *server, - bool block + qvi_rmi_server_t *server ) { // First populate the base hardware resource pool. int qvrc = server_populate_base_hwpool(server); @@ -891,23 +827,9 @@ qvi_rmi_server_start( server->zlo = zsocket_create_and_connect( server->zctx, ZMQ_REQ, server->config.url.c_str() ); - if (qvi_unlikely(!server->zlo)) return QV_ERR_RPC; - - const int rc = pthread_create( - &server->worker_thread, nullptr, - server_start_threads, server - ); - if (qvi_unlikely(rc != 0)) { - cstr_t ers = "pthread_create() failed"; - qvi_log_error("{} with rc={} ({})", ers, rc, qvi_strerr(rc)); - qvrc = QV_ERR_SYS; - } + if (qvi_unlikely(!server->zlo)) return QV_ERR_SYS; - if (block && qvrc == QV_SUCCESS) { - server->blocks = true; - pthread_join(server->worker_thread, nullptr); - } - return qvrc; + return server_go(server); } static int @@ -1096,6 +1018,15 @@ qvi_rmi_get_cpuset_for_nobjs( ); } +int +qvi_rmi_send_shutdown_message( + qvi_rmi_client_t *client +) { + return rpc_req( + client->zsock, FID_SERVER_SHUTDOWN + ); +} + /* * vim: ft=cpp ts=4 sts=4 sw=4 expandtab */ diff --git a/src/qvi-rmi.h b/src/qvi-rmi.h index 0740aea..eb71860 100644 --- a/src/qvi-rmi.h +++ b/src/qvi-rmi.h @@ -81,8 +81,7 @@ qvi_rmi_server_config( */ int qvi_rmi_server_start( - qvi_rmi_server_t *server, - bool block + qvi_rmi_server_t *server ); /** @@ -183,6 +182,11 @@ qvi_rmi_get_cpuset_for_nobjs( hwloc_cpuset_t *result ); +int +qvi_rmi_send_shutdown_message( + qvi_rmi_client_t *client +); + #ifdef __cplusplus } #endif diff --git a/tests/internal/CMakeLists.txt b/tests/internal/CMakeLists.txt index 737eb30..9abf1de 100644 --- a/tests/internal/CMakeLists.txt +++ b/tests/internal/CMakeLists.txt @@ -1,6 +1,6 @@ # -*- Mode: C++; c-basic-offset:4; indent-tabs-mode:nil -*- # -# Copyright (c) 2020-2024 Triad National Security, LLC +# Copyright (c) 2020-2025 Triad National Security, LLC # All rights reserved. # # Copyright (c) 2020-2021 Lawrence Livermore National Security, LLC @@ -31,26 +31,6 @@ add_test( test-hwloc ) -################################################################################ -################################################################################ -add_executable( - test-rmi-server - test-rmi-server.cc -) - -target_link_libraries( - test-rmi-server - quo-vadis -) - -add_test( - NAME - rmi-server - COMMAND - bash -c "export URL=\"tcp://127.0.0.1:55995\" && \ - ${CMAKE_CURRENT_BINARY_DIR}/test-rmi-server $URL" -) - ################################################################################ ################################################################################ add_executable( @@ -68,12 +48,10 @@ add_test( rmi COMMAND bash -c "export URL=\"tcp://127.0.0.1:55995\" && \ - ( ${CMAKE_SOURCE_DIR}/tests/exec-timeout.sh \ - \"${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -s\" 5 & ) && \ - ( ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -c & ) && \ + ( ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -s & ) && \ ( ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -c & ) && \ ( ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -c & ) && \ - ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -c" + ${CMAKE_CURRENT_BINARY_DIR}/test-rmi $URL -cc" ) # Use the C linker to test for C/C++ linkage problems. @@ -87,7 +65,6 @@ set_target_properties( # Set core test properties. set_tests_properties( hwloc - rmi-server rmi PROPERTIES TIMEOUT 60 diff --git a/tests/internal/test-rmi-server.cc b/tests/internal/test-rmi-server.cc deleted file mode 100644 index 40c9074..0000000 --- a/tests/internal/test-rmi-server.cc +++ /dev/null @@ -1,123 +0,0 @@ -/* -*- Mode: C++; c-basic-offset:4; indent-tabs-mode:nil -*- */ -/* - * Copyright (c) 2020-2023 Triad National Security, LLC - * All rights reserved. - * - * Copyright (c) 2020-2021 Lawrence Livermore National Security, LLC - * All rights reserved. - * - * This file is part of the quo-vadis project. See the LICENSE file at the - * top-level directory of this distribution. - */ - -/** - * @file test-rmi.cc - */ - - -#include "quo-vadis.h" -#include "qvi-utils.h" -#include "qvi-hwloc.h" -#include "qvi-rmi.h" -#include "common-test-utils.h" // IWYU pragma: keep - -static int -server( - const char *url -) { - printf("# [%d] Starting Server (%s)\n", getpid(), url); - - char const *ers = NULL; - const char *basedir = qvi_tmpdir(); - char *path = nullptr; - double start = qvi_time(), end; - qvi_rmi_config_s config; - - qvi_rmi_server_t *server = NULL; - int rc = qvi_rmi_server_new(&server); - if (rc != QV_SUCCESS) { - ers = "qvi_rmi_server_new() failed"; - goto out; - } - - qvi_hwloc_t *hwloc; - rc = qvi_hwloc_new(&hwloc); - if (rc != QV_SUCCESS) { - ers = "qvi_hwloc_new() failed"; - goto out; - } - - rc = qvi_hwloc_topology_init(hwloc, NULL); - if (rc != QV_SUCCESS) { - ers = "qvi_hwloc_topology_init() failed"; - goto out; - } - - rc = qvi_hwloc_topology_load(hwloc); - if (rc != QV_SUCCESS) { - ers = "qvi_hwloc_topology_load() failed"; - goto out; - } - - config.url = std::string(url); - config.hwloc = hwloc; - - rc = qvi_hwloc_topology_export( - hwloc, basedir, &path - ); - if (rc != QV_SUCCESS) { - ers = "qvi_hwloc_topology_export() failed"; - goto out; - } - - config.hwtopo_path = std::string(path); - free(path); - - rc = qvi_rmi_server_config(server, &config); - if (rc != QV_SUCCESS) { - ers = "qvi_rmi_server_config() failed"; - goto out; - } - - rc = qvi_rmi_server_start(server, false); - if (rc != QV_SUCCESS) { - ers = "qvi_rmi_server_start() failed"; - goto out; - } - end = qvi_time(); - printf("# [%d] Server Start Time %lf seconds\n", getpid(), end - start); -out: - sleep(4); - qvi_rmi_server_delete(&server); - qvi_hwloc_delete(&hwloc); - if (ers) { - fprintf(stderr, "\n%s (rc=%d, %s)\n", ers, rc, qv_strerr(rc)); - return EXIT_FAILURE; - } - return EXIT_SUCCESS; -} - -static void -usage(const char *appn) -{ - fprintf(stderr, "Usage: %s URL\n", appn); -} - -int -main( - int argc, - char **argv -) { - setbuf(stdout, NULL); - - if (argc != 2) { - usage(argv[0]); - return EXIT_FAILURE; - } - - return server(argv[1]); -} - -/* - * vim: ft=cpp ts=4 sts=4 sw=4 expandtab - */ diff --git a/tests/internal/test-rmi.cc b/tests/internal/test-rmi.cc index af08649..952c195 100644 --- a/tests/internal/test-rmi.cc +++ b/tests/internal/test-rmi.cc @@ -1,20 +1,9 @@ /* -*- Mode: C++; c-basic-offset:4; indent-tabs-mode:nil -*- */ -/* - * Copyright (c) 2020-2024 Triad National Security, LLC - * All rights reserved. - * - * Copyright (c) 2020-2021 Lawrence Livermore National Security, LLC - * All rights reserved. - * - * This file is part of the quo-vadis project. See the LICENSE file at the - * top-level directory of this distribution. - */ /** * @file test-rmi.cc */ - #include "quo-vadis.h" #include "qvi-utils.h" #include "qvi-hwloc.h" @@ -29,7 +18,6 @@ server( char const *ers = NULL; const char *basedir = qvi_tmpdir(); char *path = nullptr; - double start = qvi_time(), end; qvi_rmi_config_s config; qvi_rmi_server_t *server = NULL; @@ -77,15 +65,13 @@ server( goto out; } - rc = qvi_rmi_server_start(server, false); + rc = qvi_rmi_server_start(server); if (rc != QV_SUCCESS) { ers = "qvi_rmi_server_start() failed"; goto out; } - end = qvi_time(); - printf("# [%d] Server Start Time %lf seconds\n", getpid(), end - start); + printf("# [%d] Server Started\n", getpid()); out: - sleep(4); qvi_rmi_server_delete(&server); qvi_hwloc_delete(&hwloc); if (ers) { @@ -97,7 +83,8 @@ server( static int client( - const char *url + const char *url, + bool send_shutdown_msg ) { printf("# [%d] Starting Client (%s)\n", getpid(), url); @@ -123,11 +110,16 @@ client( ers = "qvi_rmi_cpubind() failed"; goto out; } + char *res; qvi_hwloc_bitmap_asprintf(bitmap, &res); printf("# [%d] cpubind = %s\n", who, res); hwloc_bitmap_free(bitmap); free(res); + + if (send_shutdown_msg) { + rc = qvi_rmi_send_shutdown_message(client); + } out: qvi_rmi_client_delete(&client); if (ers) { @@ -160,7 +152,10 @@ main( rc = server(argv[1]); } else if (strcmp(argv[2], "-c") == 0) { - rc = client(argv[1]); + rc = client(argv[1], false); + } + else if (strcmp(argv[2], "-cc") == 0) { + rc = client(argv[1], true); } else { usage(argv[0]);