Skip to content

Commit c242539

Browse files
authored
RUST-1588: Add RunCursorCommand (mongodb#912)
* RUST-1636: Add RunCursorCommand
1 parent af30b76 commit c242539

File tree

5 files changed

+191
-3
lines changed

5 files changed

+191
-3
lines changed

src/db.rs

Lines changed: 46 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,14 +20,23 @@ use crate::{
2020
cursor::Cursor,
2121
error::{Error, ErrorKind, Result},
2222
gridfs::{options::GridFsBucketOptions, GridFsBucket},
23-
operation::{Aggregate, AggregateTarget, Create, DropDatabase, ListCollections, RunCommand},
23+
operation::{
24+
Aggregate,
25+
AggregateTarget,
26+
Create,
27+
DropDatabase,
28+
ListCollections,
29+
RunCommand,
30+
RunCursorCommand,
31+
},
2432
options::{
2533
AggregateOptions,
2634
CollectionOptions,
2735
CreateCollectionOptions,
2836
DatabaseOptions,
2937
DropDatabaseOptions,
3038
ListCollectionsOptions,
39+
RunCursorCommandOptions,
3140
},
3241
results::CollectionSpecification,
3342
selection_criteria::SelectionCriteria,
@@ -469,6 +478,42 @@ impl Database {
469478
.await
470479
}
471480

481+
/// Runs a database-level command and returns a cursor to the response.
482+
pub async fn run_cursor_command(
483+
&self,
484+
command: Document,
485+
options: impl Into<Option<RunCursorCommandOptions>>,
486+
) -> Result<Cursor<Document>> {
487+
let options: Option<RunCursorCommandOptions> = options.into();
488+
let selection_criteria = options
489+
.as_ref()
490+
.and_then(|options| options.selection_criteria.clone());
491+
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
492+
let rc_command = RunCursorCommand::new(rcc, options)?;
493+
let client = self.client();
494+
client.execute_cursor_operation(rc_command).await
495+
}
496+
497+
/// Runs a database-level command and returns a cursor to the response.
498+
pub async fn run_cursor_command_with_session(
499+
&self,
500+
command: Document,
501+
options: impl Into<Option<RunCursorCommandOptions>>,
502+
session: &mut ClientSession,
503+
) -> Result<SessionCursor<Document>> {
504+
let mut options: Option<RunCursorCommandOptions> = options.into();
505+
resolve_selection_criteria_with_session!(self, options, Some(&mut *session))?;
506+
let selection_criteria = options
507+
.as_ref()
508+
.and_then(|options| options.selection_criteria.clone());
509+
let rcc = RunCommand::new(self.name().to_string(), command, selection_criteria, None)?;
510+
let rc_command = RunCursorCommand::new(rcc, options)?;
511+
let client = self.client();
512+
client
513+
.execute_session_cursor_operation(rc_command, session)
514+
.await
515+
}
516+
472517
/// Runs a database-level command using the provided `ClientSession`.
473518
///
474519
/// If the `ClientSession` provided is currently in a transaction, `command` must not specify a

src/db/options.rs

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use typed_builder::TypedBuilder;
88
use crate::{
99
bson::{Bson, Document},
1010
concern::{ReadConcern, WriteConcern},
11-
options::Collation,
11+
options::{Collation, CursorType},
1212
selection_criteria::SelectionCriteria,
1313
serde_util,
1414
};
@@ -312,3 +312,23 @@ pub struct ChangeStreamPreAndPostImages {
312312
/// If `true`, change streams will be able to include pre- and post-images.
313313
pub enabled: bool,
314314
}
315+
316+
/// Specifies the options to a
317+
/// [`Database::RunCursorCommand`](../struct.Database.html#method.run_cursor_command) operation.
318+
#[derive(Clone, Debug, Default, TypedBuilder)]
319+
#[builder(field_defaults(default, setter(into)))]
320+
#[non_exhaustive]
321+
pub struct RunCursorCommandOptions {
322+
/// The default read preference for operations.
323+
pub selection_criteria: Option<SelectionCriteria>,
324+
/// The type of cursor to return.
325+
pub cursor_type: Option<CursorType>,
326+
/// Number of documents to return per batch.
327+
pub batch_size: Option<u32>,
328+
/// Optional non-negative integer value. Use this value to configure the maxTimeMS option sent
329+
/// on subsequent getMore commands.
330+
pub max_time: Option<Duration>,
331+
/// Optional BSON value. Use this value to configure the comment option sent on subsequent
332+
/// getMore commands.
333+
pub comment: Option<Bson>,
334+
}

src/operation.rs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ mod list_databases;
1919
mod list_indexes;
2020
mod raw_output;
2121
mod run_command;
22+
mod run_cursor_command;
2223
mod update;
2324

2425
#[cfg(test)]
@@ -71,6 +72,7 @@ pub(crate) use list_indexes::ListIndexes;
7172
#[cfg(feature = "in-use-encryption-unstable")]
7273
pub(crate) use raw_output::RawOutput;
7374
pub(crate) use run_command::RunCommand;
75+
pub(crate) use run_cursor_command::RunCursorCommand;
7476
pub(crate) use update::Update;
7577

7678
const SERVER_4_2_0_WIRE_VERSION: i32 = 8;

src/operation/run_cursor_command.rs

Lines changed: 121 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,121 @@
1+
#[cfg(feature = "in-use-encryption-unstable")]
2+
use bson::doc;
3+
use bson::RawDocumentBuf;
4+
5+
use crate::{
6+
cmap::{conn::PinnedConnectionHandle, Command, RawCommandResponse, StreamDescription},
7+
concern::WriteConcern,
8+
cursor::CursorSpecification,
9+
error::{Error, Result},
10+
operation::{Operation, RunCommand},
11+
options::RunCursorCommandOptions,
12+
selection_criteria::SelectionCriteria,
13+
};
14+
15+
#[derive(Debug, Clone)]
16+
pub(crate) struct RunCursorCommand<'conn> {
17+
run_command: RunCommand<'conn>,
18+
options: Option<RunCursorCommandOptions>,
19+
}
20+
21+
impl<'conn> RunCursorCommand<'conn> {
22+
pub(crate) fn new(
23+
run_command: RunCommand<'conn>,
24+
options: Option<RunCursorCommandOptions>,
25+
) -> Result<Self> {
26+
Ok(Self {
27+
run_command,
28+
options,
29+
})
30+
}
31+
}
32+
33+
impl<'conn> Operation for RunCursorCommand<'conn> {
34+
type O = CursorSpecification;
35+
type Command = RawDocumentBuf;
36+
37+
const NAME: &'static str = "run_cursor_command";
38+
39+
fn build(&mut self, description: &StreamDescription) -> Result<Command<Self::Command>> {
40+
self.run_command.build(description)
41+
}
42+
43+
fn serialize_command(&mut self, cmd: Command<Self::Command>) -> Result<Vec<u8>> {
44+
self.run_command.serialize_command(cmd)
45+
}
46+
47+
fn extract_at_cluster_time(
48+
&self,
49+
response: &bson::RawDocument,
50+
) -> Result<Option<bson::Timestamp>> {
51+
self.run_command.extract_at_cluster_time(response)
52+
}
53+
54+
fn handle_error(&self, error: Error) -> Result<Self::O> {
55+
Err(error)
56+
}
57+
58+
fn selection_criteria(&self) -> Option<&SelectionCriteria> {
59+
self.run_command.selection_criteria()
60+
}
61+
62+
fn is_acknowledged(&self) -> bool {
63+
self.run_command.is_acknowledged()
64+
}
65+
66+
fn write_concern(&self) -> Option<&WriteConcern> {
67+
self.run_command.write_concern()
68+
}
69+
70+
fn supports_read_concern(&self, description: &StreamDescription) -> bool {
71+
self.run_command.supports_read_concern(description)
72+
}
73+
74+
fn supports_sessions(&self) -> bool {
75+
self.run_command.supports_sessions()
76+
}
77+
78+
fn retryability(&self) -> crate::operation::Retryability {
79+
self.run_command.retryability()
80+
}
81+
82+
fn update_for_retry(&mut self) {
83+
self.run_command.update_for_retry()
84+
}
85+
86+
fn pinned_connection(&self) -> Option<&PinnedConnectionHandle> {
87+
self.run_command.pinned_connection()
88+
}
89+
90+
fn name(&self) -> &str {
91+
self.run_command.name()
92+
}
93+
94+
fn handle_response(
95+
&self,
96+
response: RawCommandResponse,
97+
description: &StreamDescription,
98+
) -> Result<Self::O> {
99+
let doc = Operation::handle_response(&self.run_command, response, description)?;
100+
let cursor_info = bson::from_document(doc)?;
101+
let batch_size = match &self.options {
102+
Some(options) => options.batch_size.clone(),
103+
None => None,
104+
};
105+
let max_time = match &self.options {
106+
Some(options) => options.max_time.clone(),
107+
None => None,
108+
};
109+
let comment = match &self.options {
110+
Some(options) => options.comment.clone(),
111+
None => None,
112+
};
113+
Ok(CursorSpecification::new(
114+
cursor_info,
115+
description.server_address.clone(),
116+
batch_size,
117+
max_time,
118+
comment,
119+
))
120+
}
121+
}

src/results.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ use crate::{
66
bson::{serde_helpers, Bson, Document},
77
change_stream::event::ResumeToken,
88
db::options::CreateCollectionOptions,
9-
Namespace,
109
serde_util,
10+
Namespace,
1111
};
1212

1313
use bson::{Binary, RawDocumentBuf};

0 commit comments

Comments
 (0)