Skip to content

Commit 6f58b9d

Browse files
authored
CDRIVER-4617 Refactor libmongoc to use mcd-rpc (#1307)
* Add mcd_rpc_message ingress/egress counter functions * Add mcd_rpc_message compression functions * Add mcd_rpc_message_get_body * Add mcd_rpc_message_check_ok * Refactor mongoc_cluster_t::request_id from uint32_t -> int32_t * Refactor mock server to use mcd-rpc * Refactor server monitor thread to use mcd-rpc * Refactor mongoc-async-cmd to use mcd-rpc * Refactor mongoc_cluster_run_command_opquery to use mcd-rpc * Refactor mongoc_cluster_run_opmsg to use mcd-rpc * Refactor cursor functions to use mcd-rpc * CDRIVER-4625 Address distinct pointer type warnings on Windows * Clean up mock server functions
1 parent daf8cce commit 6f58b9d

24 files changed

+1865
-2060
lines changed

src/libmongoc/src/mongoc/mongoc-async-cmd-private.h

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,12 +21,12 @@
2121

2222
#include <bson/bson.h>
2323

24+
#include "mcd-rpc.h"
2425
#include "mongoc-client.h"
2526
#include "mongoc-async-private.h"
2627
#include "mongoc-array-private.h"
2728
#include "mongoc-buffer-private.h"
2829
#include "mongoc-cmd-private.h"
29-
#include "mongoc-rpc-private.h"
3030
#include "mongoc-stream.h"
3131

3232
BSON_BEGIN_DECLS
@@ -59,12 +59,11 @@ typedef struct _mongoc_async_cmd {
5959
int64_t timeout_msec;
6060
bson_t cmd;
6161
mongoc_buffer_t buffer;
62-
mongoc_array_t array;
6362
mongoc_iovec_t *iovec;
6463
size_t niovec;
6564
size_t bytes_written;
6665
size_t bytes_to_read;
67-
mongoc_rpc_t rpc;
66+
mcd_rpc_message *rpc;
6867
bson_t reply;
6968
bool reply_needs_cleanup;
7069
char *ns;
@@ -85,7 +84,7 @@ mongoc_async_cmd_new (mongoc_async_t *async,
8584
void *setup_ctx,
8685
const char *dbname,
8786
const bson_t *cmd,
88-
const mongoc_opcode_t cmd_opcode,
87+
const int32_t cmd_opcode,
8988
mongoc_async_cmd_cb_t cb,
9089
void *cb_data,
9190
int64_t timeout_msec);

src/libmongoc/src/mongoc/mongoc-async-cmd.c

Lines changed: 66 additions & 60 deletions
Original file line numberDiff line numberDiff line change
@@ -134,42 +134,47 @@ mongoc_async_cmd_run (mongoc_async_cmd_t *acmd)
134134
return false;
135135
}
136136

137-
void
138-
_mongoc_async_cmd_init_send (const mongoc_opcode_t cmd_opcode,
137+
static void
138+
_mongoc_async_cmd_init_send (const int32_t cmd_opcode,
139139
mongoc_async_cmd_t *acmd,
140140
const char *dbname)
141141
{
142-
acmd->rpc.header.msg_len = 0;
143-
acmd->rpc.header.request_id = ++acmd->async->request_id;
144-
acmd->rpc.header.response_to = 0;
142+
BSON_ASSERT (cmd_opcode == MONGOC_OP_CODE_QUERY ||
143+
cmd_opcode == MONGOC_OP_CODE_MSG);
145144

146-
if (MONGOC_OPCODE_QUERY == cmd_opcode) {
147-
acmd->ns = bson_strdup_printf ("%s.$cmd", dbname);
148-
acmd->rpc.header.opcode = MONGOC_OPCODE_QUERY;
149-
acmd->rpc.query.flags = MONGOC_QUERY_SECONDARY_OK;
150-
acmd->rpc.query.collection = acmd->ns;
151-
acmd->rpc.query.skip = 0;
152-
acmd->rpc.query.n_return = -1;
153-
acmd->rpc.query.query = bson_get_data (&acmd->cmd);
154-
acmd->rpc.query.fields = NULL;
155-
}
145+
int32_t message_length = 0;
156146

157-
if (MONGOC_OPCODE_MSG == cmd_opcode) {
158-
acmd->rpc.header.opcode = MONGOC_OPCODE_MSG;
147+
message_length += mcd_rpc_header_set_message_length (acmd->rpc, 0);
148+
message_length +=
149+
mcd_rpc_header_set_request_id (acmd->rpc, ++acmd->async->request_id);
150+
message_length += mcd_rpc_header_set_response_to (acmd->rpc, 0);
151+
message_length += mcd_rpc_header_set_op_code (acmd->rpc, cmd_opcode);
159152

160-
acmd->rpc.msg.msg_len = 0;
161-
acmd->rpc.msg.flags = 0;
162-
acmd->rpc.msg.n_sections = 1;
163-
acmd->rpc.msg.sections[0].payload_type = 0;
164-
acmd->rpc.msg.sections[0].payload.bson_document =
165-
bson_get_data (&acmd->cmd);
153+
if (cmd_opcode == MONGOC_OP_CODE_QUERY) {
154+
acmd->ns = bson_strdup_printf ("%s.$cmd", dbname);
155+
message_length += mcd_rpc_op_query_set_flags (
156+
acmd->rpc, MONGOC_OP_QUERY_FLAG_SECONDARY_OK);
157+
message_length +=
158+
mcd_rpc_op_query_set_full_collection_name (acmd->rpc, acmd->ns);
159+
message_length += mcd_rpc_op_query_set_number_to_skip (acmd->rpc, 0);
160+
message_length += mcd_rpc_op_query_set_number_to_return (acmd->rpc, -1);
161+
message_length +=
162+
mcd_rpc_op_query_set_query (acmd->rpc, bson_get_data (&acmd->cmd));
163+
} else {
164+
mcd_rpc_op_msg_set_sections_count (acmd->rpc, 1u);
165+
message_length +=
166+
mcd_rpc_op_msg_set_flag_bits (acmd->rpc, MONGOC_OP_MSG_FLAG_NONE);
167+
message_length += mcd_rpc_op_msg_section_set_kind (acmd->rpc, 0u, 0);
168+
message_length += mcd_rpc_op_msg_section_set_body (
169+
acmd->rpc, 0u, bson_get_data (&acmd->cmd));
166170
}
167171

172+
mcd_rpc_message_set_length (acmd->rpc, message_length);
173+
168174
/* This will always be hello, which are not allowed to be compressed */
169-
_mongoc_rpc_gather (&acmd->rpc, &acmd->array);
170-
acmd->iovec = (mongoc_iovec_t *) acmd->array.data;
171-
acmd->niovec = acmd->array.len;
172-
_mongoc_rpc_swab_to_le (&acmd->rpc);
175+
acmd->iovec = mcd_rpc_message_to_iovecs (acmd->rpc, &acmd->niovec);
176+
BSON_ASSERT (acmd->iovec);
177+
173178
acmd->bytes_written = 0;
174179
}
175180

@@ -198,17 +203,15 @@ mongoc_async_cmd_new (mongoc_async_t *async,
198203
void *setup_ctx,
199204
const char *dbname,
200205
const bson_t *cmd,
201-
const mongoc_opcode_t cmd_opcode, /* OP_QUERY or OP_MSG */
206+
const int32_t cmd_opcode, /* OP_QUERY or OP_MSG */
202207
mongoc_async_cmd_cb_t cb,
203208
void *cb_data,
204209
int64_t timeout_msec)
205210
{
206-
mongoc_async_cmd_t *acmd;
211+
BSON_ASSERT_PARAM (cmd);
212+
BSON_ASSERT_PARAM (dbname);
207213

208-
BSON_ASSERT (cmd);
209-
BSON_ASSERT (dbname);
210-
211-
acmd = BSON_ALIGNED_ALLOC0 (mongoc_async_cmd_t);
214+
mongoc_async_cmd_t *const acmd = BSON_ALIGNED_ALLOC0 (mongoc_async_cmd_t);
212215
acmd->async = async;
213216
acmd->dns_result = dns_result;
214217
acmd->timeout_msec = timeout_msec;
@@ -222,12 +225,13 @@ mongoc_async_cmd_new (mongoc_async_t *async,
222225
acmd->connect_started = bson_get_monotonic_time ();
223226
bson_copy_to (cmd, &acmd->cmd);
224227

225-
if (MONGOC_OPCODE_MSG == cmd_opcode) {
226-
/* If we're sending an OPCODE_MSG, we need to add the "db" field: */
228+
if (MONGOC_OP_CODE_MSG == cmd_opcode) {
229+
/* If we're sending an OP_MSG, we need to add the "db" field: */
227230
bson_append_utf8 (&acmd->cmd, "$db", 3, "admin", 5);
228231
}
229232

230-
_mongoc_array_init (&acmd->array, sizeof (mongoc_iovec_t));
233+
acmd->rpc = mcd_rpc_message_new ();
234+
acmd->iovec = NULL;
231235
_mongoc_buffer_init (&acmd->buffer, NULL, 0, NULL, NULL);
232236

233237
_mongoc_async_cmd_init_send (cmd_opcode, acmd, dbname);
@@ -255,8 +259,9 @@ mongoc_async_cmd_destroy (mongoc_async_cmd_t *acmd)
255259
bson_destroy (&acmd->reply);
256260
}
257261

258-
_mongoc_array_destroy (&acmd->array);
262+
bson_free (acmd->iovec);
259263
_mongoc_buffer_destroy (&acmd->buffer);
264+
mcd_rpc_message_destroy (acmd->rpc);
260265

261266
bson_free (acmd->ns);
262267
bson_free (acmd);
@@ -348,7 +353,7 @@ _mongoc_async_cmd_phase_send (mongoc_async_cmd_t *acmd)
348353
used_temp_iovec = true;
349354
}
350355

351-
_mongoc_rpc_op_egress_inc (&acmd->rpc);
356+
mcd_rpc_message_egress (acmd->rpc);
352357
bytes = mongoc_stream_writev (acmd->stream, iovec, niovec, 0);
353358

354359
if (used_temp_iovec) {
@@ -462,38 +467,39 @@ _mongoc_async_cmd_phase_recv_rpc (mongoc_async_cmd_t *acmd)
462467
acmd->bytes_to_read = (size_t) (acmd->bytes_to_read - bytes);
463468

464469
if (!acmd->bytes_to_read) {
465-
if (!_mongoc_rpc_scatter (
466-
&acmd->rpc, acmd->buffer.data, acmd->buffer.len)) {
470+
mcd_rpc_message_reset (acmd->rpc);
471+
if (!mcd_rpc_message_from_data_in_place (
472+
acmd->rpc, acmd->buffer.data, acmd->buffer.len, NULL)) {
467473
bson_set_error (&acmd->error,
468474
MONGOC_ERROR_PROTOCOL,
469475
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
470476
"Invalid reply from server.");
471477
return MONGOC_ASYNC_CMD_ERROR;
472478
}
473-
if (BSON_UINT32_FROM_LE (acmd->rpc.header.opcode) ==
474-
MONGOC_OPCODE_COMPRESSED) {
475-
uint8_t *buf = NULL;
476-
size_t len =
477-
BSON_UINT32_FROM_LE (acmd->rpc.compressed.uncompressed_size) +
478-
sizeof (mongoc_rpc_header_t);
479-
480-
buf = bson_malloc0 (len);
481-
if (!_mongoc_rpc_decompress (&acmd->rpc, buf, len)) {
482-
bson_free (buf);
483-
bson_set_error (&acmd->error,
484-
MONGOC_ERROR_PROTOCOL,
485-
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
486-
"Could not decompress server reply");
487-
return MONGOC_ASYNC_CMD_ERROR;
488-
}
479+
mcd_rpc_message_ingress (acmd->rpc);
489480

490-
_mongoc_buffer_destroy (&acmd->buffer);
491-
_mongoc_buffer_init (&acmd->buffer, buf, len, NULL, NULL);
481+
void *decompressed_data;
482+
size_t decompressed_data_len;
483+
484+
if (!mcd_rpc_message_decompress_if_necessary (
485+
acmd->rpc, &decompressed_data, &decompressed_data_len)) {
486+
bson_set_error (&acmd->error,
487+
MONGOC_ERROR_PROTOCOL,
488+
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,
489+
"Could not decompress server reply");
490+
return MONGOC_ASYNC_CMD_ERROR;
492491
}
493492

494-
_mongoc_rpc_swab_from_le (&acmd->rpc);
493+
if (decompressed_data) {
494+
_mongoc_buffer_destroy (&acmd->buffer);
495+
_mongoc_buffer_init (&acmd->buffer,
496+
decompressed_data,
497+
decompressed_data_len,
498+
NULL,
499+
NULL);
500+
}
495501

496-
if (!_mongoc_rpc_get_first_document (&acmd->rpc, &acmd->reply)) {
502+
if (!mcd_rpc_message_get_body (acmd->rpc, &acmd->reply)) {
497503
bson_set_error (&acmd->error,
498504
MONGOC_ERROR_PROTOCOL,
499505
MONGOC_ERROR_PROTOCOL_INVALID_REPLY,

src/libmongoc/src/mongoc/mongoc-client-private.h

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ _mongoc_client_create_stream (mongoc_client_t *client,
167167

168168
bool
169169
_mongoc_client_recv (mongoc_client_t *client,
170-
mongoc_rpc_t *rpc,
170+
mcd_rpc_message *rpc,
171171
mongoc_buffer_t *buffer,
172172
mongoc_server_stream_t *server_stream,
173173
bson_error_t *error);

src/libmongoc/src/mongoc/mongoc-client.c

Lines changed: 31 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -951,25 +951,9 @@ _mongoc_client_create_stream (mongoc_client_t *client,
951951
}
952952

953953

954-
/*
955-
*--------------------------------------------------------------------------
956-
*
957-
* _mongoc_client_recv --
958-
*
959-
* Receives a RPC from a remote MongoDB cluster node.
960-
*
961-
* Returns:
962-
* true if successful; otherwise false and @error is set.
963-
*
964-
* Side effects:
965-
* @error is set if return value is false.
966-
*
967-
*--------------------------------------------------------------------------
968-
*/
969-
970954
bool
971955
_mongoc_client_recv (mongoc_client_t *client,
972-
mongoc_rpc_t *rpc,
956+
mcd_rpc_message *rpc,
973957
mongoc_buffer_t *buffer,
974958
mongoc_server_stream_t *server_stream,
975959
bson_error_t *error)
@@ -978,6 +962,7 @@ _mongoc_client_recv (mongoc_client_t *client,
978962
BSON_ASSERT (rpc);
979963
BSON_ASSERT (buffer);
980964
BSON_ASSERT (server_stream);
965+
BSON_ASSERT_PARAM (error);
981966

982967
return mongoc_cluster_try_recv (
983968
&client->cluster, rpc, buffer, server_stream, error);
@@ -2484,36 +2469,44 @@ _mongoc_client_op_killcursors (mongoc_cluster_t *cluster,
24842469
const char *db,
24852470
const char *collection)
24862471
{
2487-
int64_t started;
2488-
mongoc_rpc_t rpc = {{0}};
2489-
bson_error_t error;
2490-
bool has_ns;
2491-
bool r;
2472+
BSON_ASSERT_PARAM (cluster);
2473+
BSON_ASSERT_PARAM (server_stream);
2474+
BSON_ASSERT (db || true);
2475+
BSON_ASSERT (collection || true);
2476+
2477+
const bool has_ns = db && collection;
2478+
const int64_t started = bson_get_monotonic_time ();
24922479

2493-
/* called by old mongoc_client_kill_cursor without db/collection? */
2494-
has_ns = (db && collection);
2495-
started = bson_get_monotonic_time ();
2480+
mcd_rpc_message *const rpc = mcd_rpc_message_new ();
24962481

2497-
++cluster->request_id;
2482+
{
2483+
int32_t message_length = 0;
24982484

2499-
rpc.header.msg_len = 0;
2500-
rpc.header.request_id = cluster->request_id;
2501-
rpc.header.response_to = 0;
2502-
rpc.header.opcode = MONGOC_OPCODE_KILL_CURSORS;
2503-
rpc.kill_cursors.zero = 0;
2504-
rpc.kill_cursors.cursors = &cursor_id;
2505-
rpc.kill_cursors.n_cursors = 1;
2485+
message_length += mcd_rpc_header_set_message_length (rpc, 0);
2486+
message_length +=
2487+
mcd_rpc_header_set_request_id (rpc, ++cluster->request_id);
2488+
message_length += mcd_rpc_header_set_response_to (rpc, 0);
2489+
message_length +=
2490+
mcd_rpc_header_set_op_code (rpc, MONGOC_OP_CODE_KILL_CURSORS);
2491+
2492+
message_length += sizeof (int32_t); // ZERO
2493+
message_length +=
2494+
mcd_rpc_op_kill_cursors_set_cursor_ids (rpc, &cursor_id, 1);
2495+
2496+
mcd_rpc_message_set_length (rpc, message_length);
2497+
}
25062498

25072499
if (has_ns) {
25082500
_mongoc_client_monitor_op_killcursors (
25092501
cluster, server_stream, cursor_id, operation_id, db, collection);
25102502
}
25112503

2512-
r = mongoc_cluster_legacy_rpc_sendv_to_server (
2513-
cluster, &rpc, server_stream, &error);
2504+
bson_error_t error;
2505+
const bool res = mongoc_cluster_legacy_rpc_sendv_to_server (
2506+
cluster, rpc, server_stream, &error);
25142507

25152508
if (has_ns) {
2516-
if (r) {
2509+
if (res) {
25172510
_mongoc_client_monitor_op_killcursors_succeeded (
25182511
cluster,
25192512
bson_get_monotonic_time () - started,
@@ -2529,6 +2522,8 @@ _mongoc_client_op_killcursors (mongoc_cluster_t *cluster,
25292522
operation_id);
25302523
}
25312524
}
2525+
2526+
mcd_rpc_message_destroy (rpc);
25322527
}
25332528

25342529

0 commit comments

Comments
 (0)