Skip to content

Commit e71aa72

Browse files
authored
RUST-1358 Remove most type constraints on cursor values (mongodb#891)
1 parent 6902cc3 commit e71aa72

File tree

4 files changed

+24
-55
lines changed

4 files changed

+24
-55
lines changed

src/change_stream/mod.rs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ use crate::{
8585
#[derivative(Debug)]
8686
pub struct ChangeStream<T>
8787
where
88-
T: DeserializeOwned + Unpin + Send + Sync,
88+
T: DeserializeOwned,
8989
{
9090
/// The cursor to iterate over event instances.
9191
cursor: Cursor<T>,
@@ -103,7 +103,7 @@ where
103103

104104
impl<T> ChangeStream<T>
105105
where
106-
T: DeserializeOwned + Unpin + Send + Sync,
106+
T: DeserializeOwned,
107107
{
108108
pub(crate) fn new(cursor: Cursor<T>, args: WatchArgs, data: ChangeStreamData) -> Self {
109109
let pending_resume: Option<BoxFuture<'static, Result<ChangeStream<T>>>> = None;
@@ -126,7 +126,7 @@ where
126126
}
127127

128128
/// Update the type streamed values will be parsed as.
129-
pub fn with_type<D: DeserializeOwned + Unpin + Send + Sync>(self) -> ChangeStream<D> {
129+
pub fn with_type<D: DeserializeOwned>(self) -> ChangeStream<D> {
130130
ChangeStream {
131131
cursor: self.cursor.with_type(),
132132
args: self.args,
@@ -256,7 +256,7 @@ fn get_resume_token(
256256

257257
impl<T> CursorStream for ChangeStream<T>
258258
where
259-
T: DeserializeOwned + Unpin + Send + Sync,
259+
T: DeserializeOwned,
260260
{
261261
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
262262
loop {
@@ -316,7 +316,7 @@ where
316316

317317
impl<T> Stream for ChangeStream<T>
318318
where
319-
T: DeserializeOwned + Unpin + Send + Sync,
319+
T: DeserializeOwned,
320320
{
321321
type Item = Result<T>;
322322

src/cursor/common.rs

Lines changed: 4 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -1,15 +1,13 @@
11
use std::{
22
collections::VecDeque,
3-
marker::PhantomData,
43
pin::Pin,
54
task::{Context, Poll},
65
time::Duration,
76
};
87

98
use bson::{RawDocument, RawDocumentBuf};
109
use derivative::Derivative;
11-
use futures_core::{future::BoxFuture, Future, Stream};
12-
use serde::{de::DeserializeOwned, Deserialize};
10+
use futures_core::{future::BoxFuture, Future};
1311
#[cfg(test)]
1412
use tokio::sync::oneshot;
1513

@@ -29,7 +27,7 @@ use crate::{
2927
/// An internal cursor that can be used in a variety of contexts depending on its `GetMoreProvider`.
3028
#[derive(Derivative)]
3129
#[derivative(Debug)]
32-
pub(super) struct GenericCursor<P, T>
30+
pub(super) struct GenericCursor<P>
3331
where
3432
P: GetMoreProvider,
3533
{
@@ -40,10 +38,9 @@ where
4038
/// This is an `Option` to allow it to be "taken" when the cursor is no longer needed
4139
/// but may be resumed in the future for `SessionCursor`.
4240
state: Option<CursorState>,
43-
_phantom: PhantomData<T>,
4441
}
4542

46-
impl<P, T> GenericCursor<P, T>
43+
impl<P> GenericCursor<P>
4744
where
4845
P: GetMoreProvider,
4946
{
@@ -64,7 +61,6 @@ where
6461
post_batch_resume_token: None,
6562
pinned_connection,
6663
}),
67-
_phantom: Default::default(),
6864
}
6965
}
7066

@@ -78,7 +74,6 @@ where
7874
provider,
7975
client,
8076
info,
81-
_phantom: Default::default(),
8277
state: state.into(),
8378
}
8479
}
@@ -192,19 +187,6 @@ where
192187
pub(super) fn provider_mut(&mut self) -> &mut P {
193188
&mut self.provider
194189
}
195-
196-
pub(super) fn with_type<'a, D>(self) -> GenericCursor<P, D>
197-
where
198-
D: Deserialize<'a>,
199-
{
200-
GenericCursor {
201-
client: self.client,
202-
provider: self.provider,
203-
info: self.info,
204-
state: self.state,
205-
_phantom: Default::default(),
206-
}
207-
}
208190
}
209191

210192
pub(crate) trait CursorStream {
@@ -217,10 +199,9 @@ pub(crate) enum BatchValue {
217199
Exhausted,
218200
}
219201

220-
impl<P, T> CursorStream for GenericCursor<P, T>
202+
impl<P> CursorStream for GenericCursor<P>
221203
where
222204
P: GetMoreProvider,
223-
T: DeserializeOwned + Unpin,
224205
{
225206
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
226207
// If there is a get more in flight, check on its status.
@@ -300,18 +281,6 @@ where
300281
}
301282
}
302283

303-
impl<P, T> Stream for GenericCursor<P, T>
304-
where
305-
P: GetMoreProvider,
306-
T: DeserializeOwned + Unpin,
307-
{
308-
type Item = Result<T>;
309-
310-
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
311-
stream_poll_next(Pin::into_inner(self), cx)
312-
}
313-
}
314-
315284
/// A trait implemented by objects that can provide batches of documents to a cursor via the getMore
316285
/// command.
317286
pub(super) trait GetMoreProvider: Unpin {

src/cursor/mod.rs

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -104,11 +104,11 @@ pub struct Cursor<T> {
104104
client: Client,
105105
// `wrapped_cursor` is an `Option` so that it can be `None` for the `drop` impl for a cursor
106106
// that's had `with_type` called; in all other circumstances it will be `Some`.
107-
wrapped_cursor: Option<ImplicitSessionCursor<T>>,
107+
wrapped_cursor: Option<ImplicitSessionCursor>,
108108
drop_address: Option<ServerAddress>,
109109
#[cfg(test)]
110110
kill_watcher: Option<oneshot::Sender<()>>,
111-
_phantom: std::marker::PhantomData<T>,
111+
_phantom: std::marker::PhantomData<fn() -> T>,
112112
}
113113

114114
impl<T> Cursor<T> {
@@ -271,7 +271,7 @@ impl<T> Cursor<T> {
271271
{
272272
Cursor {
273273
client: self.client.clone(),
274-
wrapped_cursor: self.wrapped_cursor.take().map(|c| c.with_type()),
274+
wrapped_cursor: self.wrapped_cursor.take(),
275275
drop_address: self.drop_address.take(),
276276
#[cfg(test)]
277277
kill_watcher: self.kill_watcher.take(),
@@ -301,7 +301,7 @@ impl<T> Cursor<T> {
301301

302302
impl<T> CursorStream for Cursor<T>
303303
where
304-
T: DeserializeOwned + Unpin + Send + Sync,
304+
T: DeserializeOwned,
305305
{
306306
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
307307
self.wrapped_cursor.as_mut().unwrap().poll_next_in_batch(cx)
@@ -310,13 +310,13 @@ where
310310

311311
impl<T> Stream for Cursor<T>
312312
where
313-
T: DeserializeOwned + Unpin + Send + Sync,
313+
T: DeserializeOwned,
314314
{
315315
type Item = Result<T>;
316316

317317
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
318318
// This `unwrap` is safe because `wrapped_cursor` is always `Some` outside of `drop`.
319-
Pin::new(self.wrapped_cursor.as_mut().unwrap()).poll_next(cx)
319+
stream_poll_next(self.wrapped_cursor.as_mut().unwrap(), cx)
320320
}
321321
}
322322

@@ -344,7 +344,7 @@ impl<T> Drop for Cursor<T> {
344344

345345
/// A `GenericCursor` that optionally owns its own sessions.
346346
/// This is to be used by cursors associated with implicit sessions.
347-
type ImplicitSessionCursor<T> = GenericCursor<ImplicitSessionGetMoreProvider, T>;
347+
type ImplicitSessionCursor = GenericCursor<ImplicitSessionGetMoreProvider>;
348348

349349
struct ImplicitSessionGetMoreResult {
350350
get_more_result: Result<GetMoreResult>,

src/cursor/session.rs

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@ use super::{
2222
GetMoreProviderResult,
2323
PinnedConnection,
2424
},
25+
stream_poll_next,
2526
BatchValue,
2627
CursorStream,
2728
};
@@ -108,7 +109,7 @@ impl<T> SessionCursor<T> {
108109

109110
impl<T> SessionCursor<T>
110111
where
111-
T: DeserializeOwned + Unpin + Send + Sync,
112+
T: DeserializeOwned,
112113
{
113114
/// Retrieves a [`SessionCursorStream`] to iterate this cursor. The session provided must be the
114115
/// same session used to create the cursor.
@@ -379,8 +380,7 @@ impl<T> Drop for SessionCursor<T> {
379380

380381
/// A `GenericCursor` that borrows its session.
381382
/// This is to be used with cursors associated with explicit sessions borrowed from the user.
382-
type ExplicitSessionCursor<'session, T> =
383-
GenericCursor<ExplicitSessionGetMoreProvider<'session>, T>;
383+
type ExplicitSessionCursor<'session> = GenericCursor<ExplicitSessionGetMoreProvider<'session>>;
384384

385385
/// A type that implements [`Stream`](https://docs.rs/futures/latest/futures/stream/index.html) which can be used to
386386
/// stream the results of a [`SessionCursor`]. Returned from [`SessionCursor::stream`].
@@ -389,12 +389,12 @@ type ExplicitSessionCursor<'session, T> =
389389
/// any further streams created from [`SessionCursor::stream`] will pick up where this one left off.
390390
pub struct SessionCursorStream<'cursor, 'session, T = Document> {
391391
session_cursor: &'cursor mut SessionCursor<T>,
392-
generic_cursor: ExplicitSessionCursor<'session, T>,
392+
generic_cursor: ExplicitSessionCursor<'session>,
393393
}
394394

395395
impl<'cursor, 'session, T> SessionCursorStream<'cursor, 'session, T>
396396
where
397-
T: DeserializeOwned + Unpin + Send + Sync,
397+
T: DeserializeOwned,
398398
{
399399
pub(crate) fn post_batch_resume_token(&self) -> Option<&ResumeToken> {
400400
self.generic_cursor.post_batch_resume_token()
@@ -407,18 +407,18 @@ where
407407

408408
impl<'cursor, 'session, T> Stream for SessionCursorStream<'cursor, 'session, T>
409409
where
410-
T: DeserializeOwned + Unpin + Send + Sync,
410+
T: DeserializeOwned,
411411
{
412412
type Item = Result<T>;
413413

414414
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
415-
Pin::new(&mut self.generic_cursor).poll_next(cx)
415+
stream_poll_next(&mut self.generic_cursor, cx)
416416
}
417417
}
418418

419419
impl<'cursor, 'session, T> CursorStream for SessionCursorStream<'cursor, 'session, T>
420420
where
421-
T: DeserializeOwned + Unpin + Send + Sync,
421+
T: DeserializeOwned,
422422
{
423423
fn poll_next_in_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<BatchValue>> {
424424
self.generic_cursor.poll_next_in_batch(cx)

0 commit comments

Comments
 (0)