Skip to content

Commit 65229f7

Browse files
authoredFeb 2, 2025··
Replace some futures_util APIs with std variants (#3721)
1 parent 5b26369 commit 65229f7

File tree

12 files changed

+35
-51
lines changed

12 files changed

+35
-51
lines changed
 

‎sqlx-core/src/io/write_and_flush.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,9 @@
11
use crate::error::Error;
2-
use futures_core::Future;
3-
use futures_util::ready;
42
use sqlx_rt::AsyncWrite;
3+
use std::future::Future;
54
use std::io::{BufRead, Cursor};
65
use std::pin::Pin;
7-
use std::task::{Context, Poll};
6+
use std::task::{ready, Context, Poll};
87

98
// Atomic operation that writes the full buffer to the stream, flushes the stream, and then
109
// clears the buffer (even if either of the two previous operations failed).

‎sqlx-core/src/net/socket/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,9 @@ use std::future::Future;
22
use std::io;
33
use std::path::Path;
44
use std::pin::Pin;
5-
use std::task::{Context, Poll};
5+
use std::task::{ready, Context, Poll};
66

77
use bytes::BufMut;
8-
use futures_core::ready;
98

109
pub use buffered::{BufferedSocket, WriteBuffer};
1110

‎sqlx-core/src/net/tls/util.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,8 @@
11
use crate::net::Socket;
22

33
use std::io::{self, Read, Write};
4-
use std::task::{Context, Poll};
4+
use std::task::{ready, Context, Poll};
55

6-
use futures_core::ready;
76
use futures_util::future;
87

98
pub struct StdSocket<S> {

‎sqlx-core/src/pool/inner.rs

+5-11
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ use crate::sync::{AsyncSemaphore, AsyncSemaphoreReleaser};
1010

1111
use std::cmp;
1212
use std::future::Future;
13+
use std::pin::pin;
1314
use std::sync::atomic::{AtomicBool, AtomicU32, AtomicUsize, Ordering};
1415
use std::sync::{Arc, RwLock};
1516
use std::task::Poll;
@@ -130,19 +131,12 @@ impl<DB: Database> PoolInner<DB> {
130131
// This is just going to cause unnecessary churn in `acquire()`.
131132
.filter(|_| self.size() < self.options.max_connections);
132133

133-
let acquire_self = self.semaphore.acquire(1).fuse();
134-
let mut close_event = self.close_event();
134+
let mut acquire_self = pin!(self.semaphore.acquire(1).fuse());
135+
let mut close_event = pin!(self.close_event());
135136

136137
if let Some(parent) = parent {
137-
let acquire_parent = parent.0.semaphore.acquire(1);
138-
let parent_close_event = parent.0.close_event();
139-
140-
futures_util::pin_mut!(
141-
acquire_parent,
142-
acquire_self,
143-
close_event,
144-
parent_close_event
145-
);
138+
let mut acquire_parent = pin!(parent.0.semaphore.acquire(1));
139+
let mut parent_close_event = pin!(parent.0.close_event());
146140

147141
let mut poll_parent = false;
148142

‎sqlx-core/src/pool/mod.rs

+3-3
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@
5656
5757
use std::fmt;
5858
use std::future::Future;
59-
use std::pin::Pin;
59+
use std::pin::{pin, Pin};
6060
use std::sync::Arc;
6161
use std::task::{Context, Poll};
6262
use std::time::{Duration, Instant};
@@ -565,11 +565,11 @@ impl CloseEvent {
565565
.await
566566
.map_or(Ok(()), |_| Err(Error::PoolClosed))?;
567567

568-
futures_util::pin_mut!(fut);
568+
let mut fut = pin!(fut);
569569

570570
// I find that this is clearer in intent than `futures_util::future::select()`
571571
// or `futures_util::select_biased!{}` (which isn't enabled anyway).
572-
futures_util::future::poll_fn(|cx| {
572+
std::future::poll_fn(|cx| {
573573
// Poll `fut` first as the wakeup event is more likely for it than `self`.
574574
if let Poll::Ready(ret) = fut.as_mut().poll(cx) {
575575
return Poll::Ready(Ok(ret));

‎sqlx-mysql/src/any.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@ use sqlx_core::database::Database;
1616
use sqlx_core::describe::Describe;
1717
use sqlx_core::executor::Executor;
1818
use sqlx_core::transaction::TransactionManager;
19-
use std::future;
19+
use std::{future, pin::pin};
2020

2121
sqlx_core::declare_driver_with_optional_migrate!(DRIVER = MySql);
2222

@@ -113,8 +113,7 @@ impl AnyConnectionBackend for MySqlConnection {
113113

114114
Box::pin(async move {
115115
let arguments = arguments?;
116-
let stream = self.run(query, arguments, persistent).await?;
117-
futures_util::pin_mut!(stream);
116+
let mut stream = pin!(self.run(query, arguments, persistent).await?);
118117

119118
while let Some(result) = stream.try_next().await? {
120119
if let Either::Right(row) = result {

‎sqlx-mysql/src/connection/executor.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,8 @@ use either::Either;
2121
use futures_core::future::BoxFuture;
2222
use futures_core::stream::BoxStream;
2323
use futures_core::Stream;
24-
use futures_util::{pin_mut, TryStreamExt};
25-
use std::{borrow::Cow, sync::Arc};
24+
use futures_util::TryStreamExt;
25+
use std::{borrow::Cow, pin::pin, sync::Arc};
2626

2727
impl MySqlConnection {
2828
async fn prepare_statement<'c>(
@@ -263,8 +263,7 @@ impl<'c> Executor<'c> for &'c mut MySqlConnection {
263263

264264
Box::pin(try_stream! {
265265
let arguments = arguments?;
266-
let s = self.run(sql, arguments, persistent).await?;
267-
pin_mut!(s);
266+
let mut s = pin!(self.run(sql, arguments, persistent).await?);
268267

269268
while let Some(v) = s.try_next().await? {
270269
r#yield!(v);

‎sqlx-postgres/src/any.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ use crate::{
55
use futures_core::future::BoxFuture;
66
use futures_core::stream::BoxStream;
77
use futures_util::{stream, StreamExt, TryFutureExt, TryStreamExt};
8-
use std::future;
8+
use std::{future, pin::pin};
99

1010
use sqlx_core::any::{
1111
Any, AnyArguments, AnyColumn, AnyConnectOptions, AnyConnectionBackend, AnyQueryResult, AnyRow,
@@ -115,8 +115,7 @@ impl AnyConnectionBackend for PgConnection {
115115

116116
Box::pin(async move {
117117
let arguments = arguments?;
118-
let stream = self.run(query, arguments, 1, persistent, None).await?;
119-
futures_util::pin_mut!(stream);
118+
let mut stream = pin!(self.run(query, arguments, 1, persistent, None).await?);
120119

121120
if let Some(Either::Right(row)) = stream.try_next().await? {
122121
return Ok(Some(AnyRow::try_from(&row)?));

‎sqlx-postgres/src/connection/executor.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,10 @@ use crate::{
1515
use futures_core::future::BoxFuture;
1616
use futures_core::stream::BoxStream;
1717
use futures_core::Stream;
18-
use futures_util::{pin_mut, TryStreamExt};
18+
use futures_util::TryStreamExt;
1919
use sqlx_core::arguments::Arguments;
2020
use sqlx_core::Either;
21-
use std::{borrow::Cow, sync::Arc};
21+
use std::{borrow::Cow, pin::pin, sync::Arc};
2222

2323
async fn prepare(
2424
conn: &mut PgConnection,
@@ -393,8 +393,7 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
393393

394394
Box::pin(try_stream! {
395395
let arguments = arguments?;
396-
let s = self.run(sql, arguments, 0, persistent, metadata).await?;
397-
pin_mut!(s);
396+
let mut s = pin!(self.run(sql, arguments, 0, persistent, metadata).await?);
398397

399398
while let Some(v) = s.try_next().await? {
400399
r#yield!(v);
@@ -420,8 +419,7 @@ impl<'c> Executor<'c> for &'c mut PgConnection {
420419

421420
Box::pin(async move {
422421
let arguments = arguments?;
423-
let s = self.run(sql, arguments, 1, persistent, metadata).await?;
424-
pin_mut!(s);
422+
let mut s = pin!(self.run(sql, arguments, 1, persistent, metadata).await?);
425423

426424
// With deferred constraints we need to check all responses as we
427425
// could get a OK response (with uncommitted data), only to get an

‎sqlx-sqlite/src/any.rs

+7-6
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ use sqlx_core::database::Database;
1717
use sqlx_core::describe::Describe;
1818
use sqlx_core::executor::Executor;
1919
use sqlx_core::transaction::TransactionManager;
20+
use std::pin::pin;
2021

2122
sqlx_core::declare_driver_with_optional_migrate!(DRIVER = Sqlite);
2223

@@ -105,12 +106,12 @@ impl AnyConnectionBackend for SqliteConnection {
105106
let args = arguments.map(map_arguments);
106107

107108
Box::pin(async move {
108-
let stream = self
109-
.worker
110-
.execute(query, args, self.row_channel_size, persistent, Some(1))
111-
.map_ok(flume::Receiver::into_stream)
112-
.await?;
113-
futures_util::pin_mut!(stream);
109+
let mut stream = pin!(
110+
self.worker
111+
.execute(query, args, self.row_channel_size, persistent, Some(1))
112+
.map_ok(flume::Receiver::into_stream)
113+
.await?
114+
);
114115

115116
if let Some(Either::Right(row)) = stream.try_next().await? {
116117
return Ok(Some(AnyRow::try_from(&row)?));

‎sqlx-sqlite/src/connection/executor.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,7 @@ use sqlx_core::describe::Describe;
88
use sqlx_core::error::Error;
99
use sqlx_core::executor::{Execute, Executor};
1010
use sqlx_core::Either;
11-
use std::future;
11+
use std::{future, pin::pin};
1212

1313
impl<'c> Executor<'c> for &'c mut SqliteConnection {
1414
type Database = Sqlite;
@@ -56,13 +56,11 @@ impl<'c> Executor<'c> for &'c mut SqliteConnection {
5656
let persistent = query.persistent() && arguments.is_some();
5757

5858
Box::pin(async move {
59-
let stream = self
59+
let mut stream = pin!(self
6060
.worker
6161
.execute(sql, arguments, self.row_channel_size, persistent, Some(1))
6262
.map_ok(flume::Receiver::into_stream)
63-
.try_flatten_stream();
64-
65-
futures_util::pin_mut!(stream);
63+
.try_flatten_stream());
6664

6765
while let Some(res) = stream.try_next().await? {
6866
if let Either::Right(row) = res {

‎tests/postgres/postgres.rs

+2-3
Original file line numberDiff line numberDiff line change
@@ -9,7 +9,7 @@ use sqlx::{Column, Connection, Executor, Row, Statement, TypeInfo};
99
use sqlx_core::{bytes::Bytes, error::BoxDynError};
1010
use sqlx_test::{new, pool, setup_if_needed};
1111
use std::env;
12-
use std::pin::Pin;
12+
use std::pin::{pin, Pin};
1313
use std::sync::Arc;
1414
use std::time::Duration;
1515

@@ -637,8 +637,7 @@ async fn pool_smoke_test() -> anyhow::Result<()> {
637637
let pool = pool.clone();
638638
sqlx_core::rt::spawn(async move {
639639
while !pool.is_closed() {
640-
let acquire = pool.acquire();
641-
futures::pin_mut!(acquire);
640+
let mut acquire = pin!(pool.acquire());
642641

643642
// poll the acquire future once to put the waiter in the queue
644643
future::poll_fn(move |cx| {

0 commit comments

Comments
 (0)
Please sign in to comment.