Skip to content

Commit 7b73eee

Browse files
committed
Make streams !Unpin
Hedging against the future if we end up using an intrusive list implementation
1 parent e5d2205 commit 7b73eee

File tree

4 files changed

+27
-7
lines changed

4 files changed

+27
-7
lines changed

tokio-postgres/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@ log = "0.4"
3838
parking_lot = "0.9"
3939
percent-encoding = "1.0"
4040
pin-utils = "=0.1.0-alpha.4"
41+
pin-project = "0.4"
4142
phf = "0.7.23"
4243
postgres-protocol = { version = "0.4.1", path = "../postgres-protocol" }
4344
postgres-types = { version = "0.1.0", path = "../postgres-types" }

tokio-postgres/src/lib.rs

+2
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,8 @@ pub use crate::row::{Row, SimpleQueryRow};
114114
#[cfg(feature = "runtime")]
115115
pub use crate::socket::Socket;
116116
pub use crate::statement::{Column, Statement};
117+
pub use crate::query::RowStream;
118+
pub use crate::simple_query::SimpleQueryStream;
117119
#[cfg(feature = "runtime")]
118120
use crate::tls::MakeTlsConnect;
119121
pub use crate::tls::NoTls;

tokio-postgres/src/query.rs

+12-3
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
use crate::client::{InnerClient, Responses};
2+
use pin_project::pin_project;
23
use crate::codec::FrontendMessage;
34
use crate::connection::RequestMessages;
45
use crate::types::{IsNull, ToSql};
@@ -9,6 +10,7 @@ use postgres_protocol::message::backend::Message;
910
use postgres_protocol::message::frontend;
1011
use std::pin::Pin;
1112
use std::task::{Context, Poll};
13+
use std::marker::PhantomPinned;
1214

1315
pub async fn query<'a, I>(
1416
client: &InnerClient,
@@ -24,6 +26,7 @@ where
2426
Ok(RowStream {
2527
statement,
2628
responses,
29+
_p: PhantomPinned,
2730
})
2831
}
2932

@@ -43,6 +46,7 @@ pub async fn query_portal(
4346
Ok(RowStream {
4447
statement: portal.statement().clone(),
4548
responses,
49+
_p: PhantomPinned,
4650
})
4751
}
4852

@@ -145,18 +149,23 @@ where
145149
}
146150
}
147151

152+
/// A stream of table rows.
153+
#[pin_project]
148154
pub struct RowStream {
149155
statement: Statement,
150156
responses: Responses,
157+
#[pin]
158+
_p: PhantomPinned,
151159
}
152160

153161
impl Stream for RowStream {
154162
type Item = Result<Row, Error>;
155163

156-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
157-
match ready!(self.responses.poll_next(cx)?) {
164+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
165+
let this = self.project();
166+
match ready!(this.responses.poll_next(cx)?) {
158167
Message::DataRow(body) => {
159-
Poll::Ready(Some(Ok(Row::new(self.statement.clone(), body)?)))
168+
Poll::Ready(Some(Ok(Row::new(this.statement.clone(), body)?)))
160169
}
161170
Message::EmptyQueryResponse
162171
| Message::CommandComplete(_)

tokio-postgres/src/simple_query.rs

+12-4
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ use postgres_protocol::message::frontend;
1010
use std::pin::Pin;
1111
use std::sync::Arc;
1212
use std::task::{Context, Poll};
13+
use std::marker::PhantomPinned;
14+
use pin_project::pin_project;
1315

1416
pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQueryStream, Error> {
1517
let buf = encode(client, query)?;
@@ -18,6 +20,7 @@ pub async fn simple_query(client: &InnerClient, query: &str) -> Result<SimpleQue
1820
Ok(SimpleQueryStream {
1921
responses,
2022
columns: None,
23+
_p: PhantomPinned,
2124
})
2225
}
2326

@@ -44,17 +47,22 @@ fn encode(client: &InnerClient, query: &str) -> Result<Bytes, Error> {
4447
})
4548
}
4649

50+
/// A stream of simple query results.
51+
#[pin_project]
4752
pub struct SimpleQueryStream {
4853
responses: Responses,
4954
columns: Option<Arc<[String]>>,
55+
#[pin]
56+
_p: PhantomPinned,
5057
}
5158

5259
impl Stream for SimpleQueryStream {
5360
type Item = Result<SimpleQueryMessage, Error>;
5461

55-
fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
62+
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
63+
let this = self.project();
5664
loop {
57-
match ready!(self.responses.poll_next(cx)?) {
65+
match ready!(this.responses.poll_next(cx)?) {
5866
Message::CommandComplete(body) => {
5967
let rows = body
6068
.tag()
@@ -76,10 +84,10 @@ impl Stream for SimpleQueryStream {
7684
.collect::<Vec<_>>()
7785
.map_err(Error::parse)?
7886
.into();
79-
self.columns = Some(columns);
87+
*this.columns = Some(columns);
8088
}
8189
Message::DataRow(body) => {
82-
let row = match &self.columns {
90+
let row = match &this.columns {
8391
Some(columns) => SimpleQueryRow::new(columns.clone(), body)?,
8492
None => return Poll::Ready(Some(Err(Error::unexpected_message()))),
8593
};

0 commit comments

Comments
 (0)