diff --git a/src/qvi-bbuff-rmi.h b/src/qvi-bbuff-rmi.h index e27d682..f044dc0 100644 --- a/src/qvi-bbuff-rmi.h +++ b/src/qvi-bbuff-rmi.h @@ -19,7 +19,6 @@ * c = qvi_hwloc_bitmap_s * d = qv_scope_create_hints_t * h = qvi_hwpool_s * - * h = qvi_line_hwpool_t * * i = int * s = char * * s = std::string @@ -552,7 +551,7 @@ qvi_bbuff_rmi_pack_item_impl( qvi_bbuff_t *buff, const qvi_hwpool_s *data ) { - return data->pack(buff); + return data->packto(buff); } /** @@ -672,7 +671,7 @@ qvi_bbuff_rmi_unpack_item( size_t *bytes_written ) { const int nw = asprintf(s, "%s", buffpos); - if (nw == -1) { + if (qvi_unlikely(nw == -1)) { *s = nullptr; *bytes_written = 0; return QV_ERR_OOR; @@ -762,7 +761,8 @@ qvi_bbuff_rmi_unpack_item( void **dbuff = data.first; *dbuff = calloc(*dsize, sizeof(byte_t)); - if (!*dbuff) { + if (qvi_unlikely(!*dbuff)) { + *bytes_written = 0; return QV_ERR_OOR; } memmove(*dbuff, buffpos, *dsize); @@ -781,17 +781,17 @@ qvi_bbuff_rmi_unpack_item( size_t *bytes_written ) { int rc = qvi_hwloc_bitmap_calloc(cpuset); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; - char *cpusets = (char *)buffpos; + char *const cpusets = (char *)buffpos; // Protect against empty data. if (strcmp(QV_BUFF_RMI_NULL_CPUSET, cpusets) != 0) { rc = qvi_hwloc_bitmap_sscanf(*cpuset, cpusets); - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; } *bytes_written = strlen(cpusets) + 1; out: - if (rc != QV_SUCCESS) { + if (qvi_unlikely(rc != QV_SUCCESS)) { qvi_hwloc_bitmap_delete(cpuset); } return rc; @@ -810,8 +810,11 @@ qvi_bbuff_rmi_unpack_item( int rc = qvi_bbuff_rmi_unpack_item( &raw_cpuset, buffpos, bytes_written ); - if (rc != QV_SUCCESS) return rc; - + if (qvi_unlikely(rc != QV_SUCCESS)) { + *bytes_written = 0; + return rc; + } + // rc = bitmap.set(raw_cpuset); qvi_hwloc_bitmap_delete(&raw_cpuset); return rc; @@ -844,17 +847,17 @@ qvi_bbuff_rmi_unpack_item( int rc = qvi_bbuff_rmi_unpack_item( &cpu.hints, buffpos, &bw ); - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; total_bw += bw; buffpos += bw; // Unpack bitmap. rc = qvi_bbuff_rmi_unpack_item( cpu.cpuset, buffpos, &bw ); - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; total_bw += bw; out: - if (rc != QV_SUCCESS) { + if (qvi_unlikely(rc != QV_SUCCESS)) { total_bw = 0; } *bytes_written = total_bw; diff --git a/src/qvi-hwpool.cc b/src/qvi-hwpool.cc index 1a75a46..b98d3ac 100644 --- a/src/qvi-hwpool.cc +++ b/src/qvi-hwpool.cc @@ -256,7 +256,7 @@ qvi_hwpool_s::release_devices(void) } int -qvi_hwpool_s::pack( +qvi_hwpool_s::packto( qvi_bbuff_t *buff ) const { // Pack the CPU. diff --git a/src/qvi-hwpool.h b/src/qvi-hwpool.h index dd83e77..e026bdb 100644 --- a/src/qvi-hwpool.h +++ b/src/qvi-hwpool.h @@ -155,10 +155,10 @@ struct qvi_hwpool_s { int release_devices(void); /** - * Packs the instance into a bbuff. + * Packs the instance into the provided buffer. */ int - pack( + packto( qvi_bbuff_t *buff ) const; /** diff --git a/src/qvi-mpi.cc b/src/qvi-mpi.cc index b20ff5d..a7a2790 100644 --- a/src/qvi-mpi.cc +++ b/src/qvi-mpi.cc @@ -39,15 +39,17 @@ struct qvi_mpi_comm_s { int rc; if (dup) { rc = MPI_Comm_dup(comm, &mpi_comm); - if (rc != MPI_SUCCESS) throw qvi_runtime_error(); + if (qvi_unlikely(rc != MPI_SUCCESS)) { + throw qvi_runtime_error(); + } } else { mpi_comm = comm; } rc = MPI_Comm_size(mpi_comm, &size); - if (rc != MPI_SUCCESS) throw qvi_runtime_error(); + if (qvi_unlikely(rc != MPI_SUCCESS)) throw qvi_runtime_error(); rc = MPI_Comm_rank(mpi_comm, &rank); - if (rc != MPI_SUCCESS) throw qvi_runtime_error(); + if (qvi_unlikely(rc != MPI_SUCCESS)) throw qvi_runtime_error(); } /** Destructor. */ ~qvi_mpi_comm_s(void) = default; @@ -97,7 +99,7 @@ struct qvi_mpi_s { // Marker used to differentiate between intrinsic and automatic IDs. if (given_id == QVI_MPI_GROUP_NULL) { const int rc = qvi_group_t::next_id(&gid); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; } group_tab.insert({gid, group}); return QV_SUCCESS; @@ -130,7 +132,7 @@ mpi_comm_to_new_node_comm( comm, MPI_COMM_TYPE_SHARED, 0, MPI_INFO_NULL, node_comm ); - if (rc != MPI_SUCCESS) { + if (qvi_unlikely(rc != MPI_SUCCESS)) { qvi_log_error("MPI_Comm_split_type(MPI_COMM_TYPE_SHARED) failed"); *node_comm = MPI_COMM_NULL; return QV_ERR_MPI; @@ -156,7 +158,7 @@ qvi_mpi_group_comm_dup( MPI_Comm *comm ) { const int rc = MPI_Comm_dup(group->qvcomm.mpi_comm, comm); - if (rc != MPI_SUCCESS) return QV_ERR_MPI; + if (qvi_unlikely(rc != MPI_SUCCESS)) return QV_ERR_MPI; return QV_SUCCESS; } @@ -171,7 +173,7 @@ create_intrinsic_comms( // Create node communicator. MPI_Comm node_comm = MPI_COMM_NULL; const int rc = mpi_comm_to_new_node_comm(comm, &node_comm); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // MPI_COMM_SELF duplicate. mpi->self_comm = qvi_mpi_comm_s(MPI_COMM_SELF, true); // Node communicator, no duplicate necessary here: created above. @@ -339,12 +341,12 @@ sleepy_node_barrier( ) { MPI_Request request; int rc = MPI_Ibarrier(node_comm, &request); - if (rc != MPI_SUCCESS) return QV_ERR_MPI; + if (qvi_unlikely(rc != MPI_SUCCESS)) return QV_ERR_MPI; int done = 0; do { rc = MPI_Test(&request, &done, MPI_STATUS_IGNORE); - if (rc != MPI_SUCCESS) return QV_ERR_MPI; + if (qvi_unlikely(rc != MPI_SUCCESS)) return QV_ERR_MPI; usleep(50000); } while (!done); @@ -411,9 +413,7 @@ qvi_mpi_group_gather_bbuffs( } // Root creates new buffers from data gathered from each participant. if (group_id == root) { - // Zero initialize array of pointers to nullptr. - bbuffs = new qvi_bbuff_t *[group_size](); - // TODO(skg) Use dup. + bbuffs = new qvi_bbuff_t *[group_size]; byte_t *bytepos = allbytes.data(); for (int i = 0; i < group_size; ++i) { rc = qvi_bbuff_new(&bbuffs[i]); diff --git a/src/qvi-rmi.cc b/src/qvi-rmi.cc index 81e3cf7..f55ac10 100644 --- a/src/qvi-rmi.cc +++ b/src/qvi-rmi.cc @@ -35,6 +35,24 @@ #include "zmq.h" +/** + * Prints ZMQ error information. Defined as a macro so + * that the line numbers correspond to the error site. + */ +#define qvi_zerr_msg(ers, erno) \ +do { \ + qvi_log_error("{} with errno={} ({})", ers, (erno), qvi_strerr((erno))); \ +} while (0) + +/** + * Prints ZMQ warnings. Defined as a macro so that + * the line numbers correspond to the warning site. + */ +#define qvi_zwrn_msg(ers, erno) \ +do { \ + qvi_log_warn("{} with errno={} ({})", ers, (erno), qvi_strerr((erno))); \ +} while (0) + static constexpr cstr_t ZINPROC_ADDR = "inproc://qvi-rmi-workers"; typedef enum qvi_rpc_funid_e { @@ -50,28 +68,40 @@ typedef enum qvi_rpc_funid_e { FID_SCOPE_GET_INTRINSIC_HWPOOL } qvi_rpc_funid_t; +typedef struct qvi_msg_header_s { + qvi_rpc_funid_t fid = FID_INVALID; + char picture[8] = {'\0'}; +} qvi_msg_header_t; + +/** + * @note: The return value is used for operation status (e.g., was the internal + * machinery successful?). The underlying target's return code is packed into + * the message buffer and is meant for client-side consumption. + */ +typedef int (*qvi_rpc_fun_ptr_t)( + qvi_rmi_server_t *, + qvi_msg_header_t *, + void *, + qvi_bbuff_t ** +); + static void send_server_shutdown_msg( qvi_rmi_server_t *server ); -static void -qvi_zwrn_msg( - cstr_t ers, - int err_no -) { - const int erno = err_no; - qvi_log_warn("{} with errno={} ({})", (ers), erno, qvi_strerr(erno)); -} - static void zsocket_close( void **sock ) { + if (!sock) return; void *isock = *sock; - if (!isock) return; - const int rc = zmq_close(isock); - if (rc != 0) qvi_zwrn_msg("zmq_close() failed", errno); + if (qvi_likely(isock)) { + const int rc = zmq_close(isock); + if (qvi_unlikely(rc != 0)) { + qvi_zwrn_msg("zmq_close() failed", errno); + } + } *sock = nullptr; } @@ -79,23 +109,27 @@ static void zctx_destroy( void **ctx ) { + if (!ctx) return; void *ictx = *ctx; - if (!ictx) return; - const int rc = zmq_ctx_destroy(ictx); - if (rc != 0) qvi_zwrn_msg("zmq_ctx_destroy() failed", errno); + if (qvi_likely(ictx)) { + const int rc = zmq_ctx_destroy(ictx); + if (qvi_unlikely(rc != 0)) { + qvi_zwrn_msg("zmq_ctx_destroy() failed", errno); + } + } *ctx = nullptr; } struct qvi_rmi_server_s { - /** Server configuration */ + /** Server configuration. */ qvi_rmi_config_s config; /** The base resource pool maintained by the server. */ qvi_hwpool_s *hwpool = nullptr; - /** ZMQ context */ + /** ZMQ context. */ void *zctx = nullptr; - /** Loopback socket for managerial messages */ + /** Loopback socket for managerial messages. */ void *zlo = nullptr; - /** The worker thread */ + /** The worker thread. */ pthread_t worker_thread; /** Flag indicating if main thread blocks for workers to complete. */ bool blocks = false; @@ -103,10 +137,10 @@ struct qvi_rmi_server_s { qvi_rmi_server_s(void) { zctx = zmq_ctx_new(); - if (!zctx) throw qvi_runtime_error(); + if (qvi_unlikely(!zctx)) throw qvi_runtime_error(); const int rc = qvi_new(&hwpool); - if (rc != QV_SUCCESS) throw qvi_runtime_error(); + if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error(); } /** Destructor. */ ~qvi_rmi_server_s(void) @@ -133,7 +167,7 @@ struct qvi_rmi_client_s { qvi_rmi_client_s(void) { // Remember clients own the hwloc data, unlike the server. - int rc = qvi_hwloc_new(&config.hwloc); + const int rc = qvi_hwloc_new(&config.hwloc); if (qvi_unlikely(rc != QV_SUCCESS)) throw qvi_runtime_error(); // Create a new ZMQ context. zctx = zmq_ctx_new(); @@ -148,34 +182,39 @@ struct qvi_rmi_client_s { } }; -typedef struct qvi_msg_header_s { - qvi_rpc_funid_t fid = FID_INVALID; -#if QVI_DEBUG_MODE == 1 - char picture[8] = {'\0'}; -#else - char picture[1] = {'\0'}; -#endif -} qvi_msg_header_t; +int +qvi_rmi_server_new( + qvi_rmi_server_t **server +) { + return qvi_new(server); +} -/** - * @note: The return value is used for operation status (e.g., was the internal - * machinery successful?). The underlying target's return code is packed into - * the message buffer and is meant for client-side consumption. - */ -typedef int (*qvi_rpc_fun_ptr_t)( - qvi_rmi_server_t *, - qvi_msg_header_t *, - void *, - qvi_bbuff_t ** -); +void +qvi_rmi_server_delete( + qvi_rmi_server_t **server +) { + qvi_delete(server); +} -static void -qvi_zerr_msg( - cstr_t ers, - int err_no +int +qvi_rmi_client_new( + qvi_rmi_client_t **client +) { + return qvi_new(client); +} + +void +qvi_rmi_client_delete( + qvi_rmi_client_t **client +) { + qvi_delete(client); +} + +qvi_hwloc_t * +qvi_rmi_client_hwloc( + qvi_rmi_client_t *client ) { - const int erno = err_no; - qvi_log_error("{} with errno={} ({})", ers, erno, qvi_strerr(erno)); + return client->config.hwloc; } static void * @@ -185,10 +224,12 @@ zsocket_create_and_connect( const char *addr ) { void *zsock = zmq_socket(zctx, sock_type); - if (!zsock) qvi_zerr_msg("zmq_socket() failed", errno); - + if (qvi_unlikely(!zsock)) { + qvi_zerr_msg("zmq_socket() failed", errno); + return nullptr; + } const int rc = zmq_connect(zsock, addr); - if (rc != 0) { + if (qvi_unlikely(rc != 0)) { qvi_zerr_msg("zmq_connect() failed", errno); zsocket_close(&zsock); return nullptr; @@ -203,10 +244,12 @@ zsocket_create_and_bind( const char *addr ) { void *zsock = zmq_socket(zctx, sock_type); - if (!zsock) qvi_zerr_msg("zmq_socket() failed", errno); - + if (qvi_unlikely(!zsock)) { + qvi_zerr_msg("zmq_socket() failed", errno); + return nullptr; + } const int rc = zmq_bind(zsock, addr); - if (rc != 0) { + if (qvi_unlikely(rc != 0)) { qvi_zerr_msg("zmq_bind() failed", errno); zsocket_close(&zsock); return nullptr; @@ -234,18 +277,14 @@ buffer_append_header( ) { qvi_msg_header_t hdr; hdr.fid = fid; -#if QVI_DEBUG_MODE == 1 const int bcap = sizeof(hdr.picture); const int nw = snprintf(hdr.picture, bcap, "%s", picture); - if (nw >= bcap) { + if (qvi_unlikely(nw >= bcap)) { qvi_log_error( "Debug picture buffer too small. Please submit a bug report." ); return QV_ERR_INTERNAL; } -#else - QVI_UNUSED(picture); -#endif return buff->append(&hdr, sizeof(hdr)); } @@ -276,15 +315,12 @@ zmsg_init_from_bbuff( ) { const size_t buffer_size = bbuff->size(); const int zrc = zmq_msg_init_data( - zmsg, - bbuff->data(), - buffer_size, - msg_free_byte_buffer_cb, - bbuff + zmsg, bbuff->data(), buffer_size, + msg_free_byte_buffer_cb, bbuff ); - if (zrc != 0) { + if (qvi_unlikely(zrc != 0)) { qvi_zerr_msg("zmq_msg_init_data() failed", errno); - return QV_ERR_MSG; + return QV_ERR_RPC; } return QV_SUCCESS; } @@ -296,13 +332,12 @@ zmsg_send( int *bsent ) { int qvrc = QV_SUCCESS; - *bsent = zmq_msg_send(msg, zsock, 0); - if (*bsent == -1) { + if (qvi_unlikely(*bsent == -1)) { qvi_zerr_msg("zmq_msg_send() failed", errno); - qvrc = QV_ERR_MSG; + qvrc = QV_ERR_RPC; } - if (qvrc != QV_SUCCESS) zmq_msg_close(msg); + if (qvi_unlikely(qvrc != QV_SUCCESS)) zmq_msg_close(msg); return qvrc; } @@ -311,23 +346,21 @@ zmsg_recv( void *zsock, zmq_msg_t *mrx ) { - int rc = 0, qvrc = QV_SUCCESS; - - rc = zmq_msg_init(mrx); - if (rc != 0) { + int qvrc = QV_SUCCESS; + int rc = zmq_msg_init(mrx); + if (qvi_unlikely(rc != 0)) { qvi_zerr_msg("zmq_msg_init() failed", errno); - qvrc = QV_ERR_MSG; + qvrc = QV_ERR_RPC; goto out; } // Block until a message is available to be received from socket. rc = zmq_msg_recv(mrx, zsock, 0); - if (rc == -1) { + if (qvi_unlikely(rc == -1)) { qvi_zerr_msg("zmq_msg_recv() failed", errno); - qvrc = QV_ERR_MSG; - goto out; + qvrc = QV_ERR_RPC; } out: - if (qvrc != QV_SUCCESS) zmq_msg_close(mrx); + if (qvi_unlikely(qvrc != QV_SUCCESS)) zmq_msg_close(mrx); return qvrc; } @@ -336,22 +369,22 @@ static int rpc_pack( qvi_bbuff_t **buff, qvi_rpc_funid_t fid, - Types&&... args + Types &&...args ) { std::string picture; qvi_bbuff_t *ibuff = nullptr; int rc = qvi_bbuff_new(&ibuff); - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; // Get the picture based on the types passed. qvi_bbuff_rmi_get_picture(picture, std::forward(args)...); // Fill and add header. rc = buffer_append_header(ibuff, fid, picture.c_str()); - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; rc = qvi_bbuff_rmi_pack(ibuff, std::forward(args)...); out: - if (rc != QV_SUCCESS) { + if (qvi_unlikely(rc != QV_SUCCESS)) { qvi_bbuff_delete(&ibuff); } *buff = ibuff; @@ -362,16 +395,15 @@ template static int rpc_unpack( void *data, - Types&&... args + Types &&...args ) { qvi_msg_header_t hdr; const size_t trim = unpack_msg_header(data, &hdr); -#if QVI_DEBUG_MODE == 1 - std::string picture; // Get the picture based on the types passed. + std::string picture; qvi_bbuff_rmi_get_picture(picture, std::forward(args)...); // Verify it matches the arguments provided. - if (strcmp(picture.c_str(), hdr.picture) != 0) { + if (qvi_unlikely(strcmp(picture.c_str(), hdr.picture) != 0)) { qvi_log_error( "RPC pack/unpack type mismatch: " "expected \"{}\", but detected \"{}\" " @@ -380,9 +412,8 @@ rpc_unpack( picture.c_str(), hdr.fid ); - return QV_ERR_INTERNAL; + return QV_ERR_RPC; } -#endif void *body = data_trim(data, trim); return qvi_bbuff_rmi_unpack(body, std::forward(args)...); } @@ -392,47 +423,45 @@ static inline int rpc_req( void *zsock, qvi_rpc_funid_t fid, - Types&&... args + Types &&...args ) { - int qvrc = QV_SUCCESS; int buffer_size = 0; qvi_bbuff_t *buff = nullptr; int rc = rpc_pack(&buff, fid, std::forward(args)...); - if (rc != QV_SUCCESS) { + if (qvi_unlikely(rc != QV_SUCCESS)) { qvi_bbuff_delete(&buff); return rc; } zmq_msg_t msg; rc = zmsg_init_from_bbuff(buff, &msg); - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; // Cache buffer size here because our call to qvi_bbuff_size() after // zmsg_send() may be invalid because msg_free_byte_buffer_cb() may have // already been called. buffer_size = (int)buff->size(); int nbytes_sent; - qvrc = zmsg_send(zsock, &msg, &nbytes_sent); - if (nbytes_sent != buffer_size) { + rc = zmsg_send(zsock, &msg, &nbytes_sent); + if (qvi_unlikely(nbytes_sent != buffer_size)) { qvi_zerr_msg("zmq_msg_send() truncated", errno); - qvrc = QV_ERR_MSG; - goto out; + rc = QV_ERR_RPC; } out: - if (qvrc != QV_SUCCESS) zmq_msg_close(&msg); + if (qvi_unlikely(rc != QV_SUCCESS)) zmq_msg_close(&msg); // Else freeing of buffer and message resources is done for us. - return qvrc; + return rc; } template static inline int rpc_rep( void *zsock, - Types&&... args + Types &&...args ) { zmq_msg_t msg; int rc = zmsg_recv(zsock, &msg); - if (rc != QV_SUCCESS) goto out; + if (qvi_unlikely(rc != QV_SUCCESS)) goto out; rc = rpc_unpack(zmq_msg_data(&msg), std::forward(args)...); out: zmq_msg_close(&msg); @@ -475,11 +504,10 @@ rpc_ssi_hello( // TODO(skg) This will go into some registry somewhere. pid_t whoisit; const int rc = qvi_bbuff_rmi_unpack(input, &whoisit); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // Pack relevant configuration information. return rpc_pack( - output, - hdr->fid, + output, hdr->fid, server->config.url, server->config.hwtopo_path ); @@ -504,13 +532,12 @@ rpc_ssi_cpubind( ) { pid_t who; int qvrc = qvi_bbuff_rmi_unpack(input, &who); - if (qvrc != QV_SUCCESS) return qvrc; + if (qvi_unlikely(qvrc != QV_SUCCESS)) return qvrc; hwloc_cpuset_t bitmap = nullptr; const int rpcrc = qvi_hwloc_task_get_cpubind( server->config.hwloc, who, &bitmap ); - qvrc = rpc_pack(output, hdr->fid, rpcrc, bitmap); qvi_hwloc_bitmap_delete(&bitmap); @@ -527,7 +554,7 @@ rpc_ssi_set_cpubind( pid_t who; hwloc_cpuset_t cpuset = nullptr; const int qvrc = qvi_bbuff_rmi_unpack(input, &who, &cpuset); - if (qvrc != QV_SUCCESS) return qvrc; + if (qvi_unlikely(qvrc != QV_SUCCESS)) return qvrc; const int rpcrc = qvi_hwloc_task_set_cpubind_from_cpuset( server->config.hwloc, who, cpuset @@ -718,7 +745,7 @@ server_rpc_dispatch( const size_t trim = unpack_msg_header(data, &hdr); void *body = data_trim(data, trim); - auto fidfunp = rpc_dispatch_table.find(hdr.fid); + const auto fidfunp = rpc_dispatch_table.find(hdr.fid); if (qvi_unlikely(fidfunp == rpc_dispatch_table.end())) { qvi_log_error("Unknown function ID ({}) in RPC. Aborting.", hdr.fid); rc = QV_ERR_RPC; @@ -782,13 +809,6 @@ server_go( return nullptr; } -int -qvi_rmi_server_new( - qvi_rmi_server_t **server -) { - return qvi_new(server); -} - static void send_server_shutdown_msg( qvi_rmi_server_t *server @@ -797,13 +817,6 @@ send_server_shutdown_msg( (void)rpc_rep(server->zlo, QVI_BBUFF_RMI_ZERO_MSG); } -void -qvi_rmi_server_delete( - qvi_rmi_server_t **server -) { - qvi_delete(server); -} - int qvi_rmi_server_config( qvi_rmi_server_t *server, @@ -835,7 +848,7 @@ server_start_threads( void *clients = zsocket_create_and_bind( server->zctx, ZMQ_ROUTER, server->config.url.c_str() ); - if (!clients) { + if (qvi_unlikely(!clients)) { cstr_t ers = "zsocket_create_and_bind() failed"; qvi_log_error("{}", ers); return nullptr; @@ -844,7 +857,7 @@ server_start_threads( void *workers = zsocket_create_and_bind( server->zctx, ZMQ_DEALER, ZINPROC_ADDR ); - if (!workers) { + if (qvi_unlikely(!workers)) { cstr_t ers = "zsocket_create_and_bind() failed"; qvi_log_error("{}", ers); return nullptr; @@ -852,7 +865,7 @@ server_start_threads( pthread_t worker; int rc = pthread_create(&worker, nullptr, server_go, server); - if (rc != 0) { + if (qvi_unlikely(rc != 0)) { cstr_t ers = "pthread_create() failed"; qvi_log_error("{} with rc={} ({})", ers, rc, qvi_strerr(rc)); } @@ -877,7 +890,7 @@ qvi_rmi_server_start( server->zlo = zsocket_create_and_connect( server->zctx, ZMQ_REQ, server->config.url.c_str() ); - if (!server->zlo) return QV_ERR_MSG; + if (!server->zlo) return QV_ERR_RPC; int rc = pthread_create( &server->worker_thread, nullptr, @@ -893,24 +906,9 @@ qvi_rmi_server_start( server->blocks = true; pthread_join(server->worker_thread, nullptr); } - return qvrc; } -int -qvi_rmi_client_new( - qvi_rmi_client_t **client -) { - return qvi_new(client); -} - -void -qvi_rmi_client_delete( - qvi_rmi_client_t **client -) { - qvi_delete(client); -} - static int hello_handshake( qvi_rmi_client_t *client @@ -918,7 +916,7 @@ hello_handshake( const int rc = rpc_req( client->zsock, FID_HELLO, getpid() ); - if (rc != QV_SUCCESS) return rc; + if (qvi_unlikely(rc != QV_SUCCESS)) return rc; return rpc_rep( client->zsock, @@ -935,7 +933,7 @@ qvi_rmi_client_connect( client->zsock = zsocket_create_and_connect( client->zctx, ZMQ_REQ, url.c_str() ); - if (qvi_unlikely(!client->zsock)) return QV_ERR_MSG; + if (qvi_unlikely(!client->zsock)) return QV_ERR_RPC; int rc = hello_handshake(client); if (qvi_unlikely(rc != QV_SUCCESS)) return rc; @@ -949,13 +947,6 @@ qvi_rmi_client_connect( return qvi_hwloc_topology_load(client->config.hwloc); } -qvi_hwloc_t * -qvi_rmi_client_hwloc( - qvi_rmi_client_t *client -) { - return client->config.hwloc; -} - //////////////////////////////////////////////////////////////////////////////// // Client-Side (Public) RPC Stub Definitions //////////////////////////////////////////////////////////////////////////////// @@ -968,7 +959,7 @@ qvi_rmi_cpubind( int qvrc = rpc_req(client->zsock, FID_CPUBIND, who); if (qvi_unlikely(qvrc != QV_SUCCESS)) return qvrc; // Should be set by rpc_rep, so assume an error. - int rpcrc = QV_ERR_MSG; + int rpcrc = QV_ERR_RPC; qvrc = rpc_rep(client->zsock, &rpcrc, cpuset); if (qvi_unlikely(qvrc != QV_SUCCESS)) { qvi_hwloc_bitmap_delete(cpuset); @@ -986,7 +977,7 @@ qvi_rmi_set_cpubind( int qvrc = rpc_req(client->zsock, FID_SET_CPUBIND, who, cpuset); if (qvi_unlikely(qvrc != QV_SUCCESS)) return qvrc; // Should be set by rpc_rep, so assume an error. - int rpcrc = QV_ERR_MSG; + int rpcrc = QV_ERR_RPC; qvrc = rpc_rep(client->zsock, &rpcrc); if (qvi_unlikely(qvrc != QV_SUCCESS)) return qvrc; return rpcrc; @@ -1010,7 +1001,7 @@ qvi_rmi_get_intrinsic_hwpool( if (qvrc != QV_SUCCESS) return qvrc; // Should be set by rpc_rep, so assume an error. - int rpcrc = QV_ERR_MSG; + int rpcrc = QV_ERR_RPC; qvrc = rpc_rep( client->zsock, &rpcrc, hwpool ); @@ -1032,7 +1023,7 @@ qvi_rmi_obj_type_depth( if (qvrc != QV_SUCCESS) return qvrc; // Should be set by rpc_rep, so assume an error. - int rpcrc = QV_ERR_MSG; + int rpcrc = QV_ERR_RPC; qvrc = rpc_rep(client->zsock, &rpcrc, depth); if (qvrc != QV_SUCCESS) return qvrc; @@ -1053,9 +1044,8 @@ qvi_rmi_get_nobjs_in_cpuset( cpuset ); if (qvrc != QV_SUCCESS) return qvrc; - // Should be set by rpc_rep, so assume an error. - int rpcrc = QV_ERR_MSG; + int rpcrc = QV_ERR_RPC; qvrc = rpc_rep(client->zsock, &rpcrc, nobjs); if (qvrc != QV_SUCCESS) return qvrc; @@ -1082,7 +1072,7 @@ qvi_rmi_get_device_in_cpuset( if (qvrc != QV_SUCCESS) return qvrc; // Should be set by rpc_rep, so assume an error. - int rpcrc = QV_ERR_MSG; + int rpcrc = QV_ERR_RPC; qvrc = rpc_rep(client->zsock, &rpcrc, dev_id); if (qvrc != QV_SUCCESS) return qvrc; diff --git a/src/qvi-scope.cc b/src/qvi-scope.cc index 59524df..6bfb6e4 100644 --- a/src/qvi-scope.cc +++ b/src/qvi-scope.cc @@ -319,7 +319,7 @@ gather_hwpools( const uint_t group_size = group->size(); // Pack the hardware pool into a buffer. qvi_bbuff_t txbuff; - int rc = txpool->pack(&txbuff); + int rc = txpool->packto(&txbuff); if (qvi_unlikely(rc != QV_SUCCESS)) return rc; // Gather the values to the root. bool shared = false; @@ -416,7 +416,7 @@ scatter_hwpools( rc = qvi_bbuff_new(&txbuffs[i]); if (rc != QV_SUCCESS) break; - rc = pools[i]->pack(txbuffs[i]); + rc = pools[i]->packto(txbuffs[i]); if (rc != QV_SUCCESS) break; } if (rc != QV_SUCCESS) goto out; diff --git a/src/qvi-task.cc b/src/qvi-task.cc index 3fb5f63..7b6b2fc 100644 --- a/src/qvi-task.cc +++ b/src/qvi-task.cc @@ -113,9 +113,9 @@ qvi_task_s::bind_pop(void) int qvi_task_s::bind_top( - hwloc_cpuset_t *dest + hwloc_cpuset_t *result ) { - return qvi_hwloc_bitmap_dup(m_stack.top().cdata(), dest); + return qvi_hwloc_bitmap_dup(m_stack.top().cdata(), result); } /* diff --git a/src/qvi-task.h b/src/qvi-task.h index 5096a1b..a52e016 100644 --- a/src/qvi-task.h +++ b/src/qvi-task.h @@ -67,7 +67,7 @@ struct qvi_task_s { /** Returns the task's current cpuset. */ int bind_top( - hwloc_cpuset_t *dest + hwloc_cpuset_t *result ); };