Skip to content

Commit

Permalink
migrate to using plain futures crate
Browse files Browse the repository at this point in the history
  • Loading branch information
rgbkrk committed Dec 13, 2024
1 parent aa75913 commit ae112cc
Show file tree
Hide file tree
Showing 20 changed files with 32 additions and 36 deletions.
5 changes: 1 addition & 4 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,7 @@ tcp-transport = []
[dependencies]
async-dispatcher = { version = "0.1", optional = true }
thiserror = "1"
futures-channel = { version = "0.3", features = ["sink"] }
futures-io = "0.3"
futures-task = "0.3"
futures-util = { version = "0.3", features = ["sink"] }
futures = "0.3"
async-trait = "0.1"
parking_lot = "0.12"
rand = "0.8"
Expand Down
2 changes: 1 addition & 1 deletion examples/task_worker.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
mod async_helpers;

use futures_util::{select, FutureExt};
use futures::{select, FutureExt};
use std::io::Write;
use std::{error::Error, time::Duration};
use zeromq::{Socket, SocketRecv, SocketSend};
Expand Down
4 changes: 2 additions & 2 deletions src/backend.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ use crate::{
use async_trait::async_trait;
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures_channel::mpsc;
use futures_util::SinkExt;
use futures::channel::mpsc;
use futures::SinkExt;
use parking_lot::Mutex;

use std::sync::Arc;
Expand Down
2 changes: 1 addition & 1 deletion src/codec/framed.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::codec::ZmqCodec;

use asynchronous_codec::{FramedRead, FramedWrite};
use futures_io::{AsyncRead, AsyncWrite};
use futures::{AsyncRead, AsyncWrite};

// Enables us to have multiple bounds on the dyn trait in `InnerFramed`
pub trait FrameableRead: AsyncRead + Unpin + Send + Sync {}
Expand Down
5 changes: 2 additions & 3 deletions src/codec/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@ pub(crate) use zmq_codec::ZmqCodec;

use crate::message::ZmqMessage;
use crate::{ZmqError, ZmqResult};

use futures_task::noop_waker;
use futures_util::Sink;
use futures::task::noop_waker;
use futures::Sink;

use std::pin::Pin;
use std::task::{Context, Poll};
Expand Down
4 changes: 2 additions & 2 deletions src/dealer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
};

use async_trait::async_trait;
use futures_channel::mpsc;
use futures_util::StreamExt;
use futures::channel::mpsc;
use futures::StreamExt;

use std::collections::hash_map::RandomState;
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use crate::endpoint::EndpointError;
use crate::task_handle::TaskError;
use crate::ZmqMessage;

use futures_channel::mpsc;
use futures::channel::mpsc;
use thiserror::Error;

pub type ZmqResult<T> = Result<T, ZmqError>;
Expand Down
6 changes: 3 additions & 3 deletions src/fair_queue.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
use futures_task::{waker_ref, ArcWake};
use futures_util::Stream;
use futures::task::{waker_ref, ArcWake};
use futures::Stream;
use parking_lot::Mutex;

use std::cmp::Ordering;
Expand Down Expand Up @@ -164,7 +164,7 @@ impl<S, K: Clone> FairQueue<S, K> {
mod test {
use crate::async_rt;
use crate::fair_queue::FairQueue;
use futures_util::{stream, StreamExt};
use futures::{stream, StreamExt};

#[async_rt::test]
async fn test_fair_queue_ready() {
Expand Down
4 changes: 2 additions & 2 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ use util::PeerIdentity;

use async_trait::async_trait;
use asynchronous_codec::FramedWrite;
use futures_channel::mpsc;
use futures_util::{select, FutureExt};
use futures::channel::mpsc;
use futures::{select, FutureExt};
use parking_lot::Mutex;

use std::collections::HashMap;
Expand Down
4 changes: 2 additions & 2 deletions src/pub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,8 @@ use crate::{

use async_trait::async_trait;
use dashmap::DashMap;
use futures_channel::{mpsc, oneshot};
use futures_util::{select, FutureExt, StreamExt};
use futures::channel::{mpsc, oneshot};
use futures::{select, FutureExt, StreamExt};
use parking_lot::Mutex;

use std::collections::HashMap;
Expand Down
4 changes: 2 additions & 2 deletions src/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,8 @@ use crate::{
};

use async_trait::async_trait;
use futures_channel::mpsc;
use futures_util::StreamExt;
use futures::channel::mpsc;
use futures::StreamExt;

use std::collections::hash_map::RandomState;
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion src/push.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use crate::{
};

use async_trait::async_trait;
use futures_channel::mpsc;
use futures::channel::mpsc;

use std::collections::hash_map::RandomState;
use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion src/rep.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use crate::{SocketType, ZmqResult};

use async_trait::async_trait;
use dashmap::DashMap;
use futures_util::{SinkExt, StreamExt};
use futures::{SinkExt, StreamExt};
use parking_lot::Mutex;

use std::collections::HashMap;
Expand Down
2 changes: 1 addition & 1 deletion src/req.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use async_trait::async_trait;
use bytes::Bytes;
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures_util::{SinkExt, StreamExt};
use futures::{SinkExt, StreamExt};

use std::collections::HashMap;
use std::sync::Arc;
Expand Down
4 changes: 2 additions & 2 deletions src/router.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,8 @@ use crate::{MultiPeerBackend, SocketEvent, SocketOptions, SocketRecv, SocketSend
use crate::{Socket, SocketBackend};

use async_trait::async_trait;
use futures_channel::mpsc;
use futures_util::{SinkExt, StreamExt};
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};

use std::collections::HashMap;
use std::convert::TryInto;
Expand Down
4 changes: 2 additions & 2 deletions src/sub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ use async_trait::async_trait;
use bytes::{BufMut, BytesMut};
use crossbeam_queue::SegQueue;
use dashmap::DashMap;
use futures_channel::mpsc;
use futures_util::{SinkExt, StreamExt};
use futures::channel::mpsc;
use futures::{SinkExt, StreamExt};
use parking_lot::Mutex;

use std::collections::{HashMap, HashSet};
Expand Down
2 changes: 1 addition & 1 deletion src/task_handle.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use crate::async_rt;
use crate::error::{ZmqError, ZmqResult};

use futures_channel::oneshot;
use futures::channel::oneshot;
use thiserror::Error;

#[derive(Error, Debug)]
Expand Down
4 changes: 2 additions & 2 deletions src/transport/ipc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ use crate::endpoint::Endpoint;
use crate::task_handle::TaskHandle;
use crate::ZmqResult;

use futures_channel::oneshot;
use futures_util::{select, FutureExt};
use futures::channel::oneshot;
use futures::{select, FutureExt};

use std::path::Path;

Expand Down
4 changes: 2 additions & 2 deletions src/transport/tcp.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use crate::endpoint::{Endpoint, Host, Port};
use crate::task_handle::TaskHandle;
use crate::ZmqResult;

use futures_util::{select, FutureExt};
use futures::{select, FutureExt};

pub(crate) async fn connect(host: &Host, port: Port) -> ZmqResult<(FramedIo, Endpoint)> {
let raw_socket = TcpStream::connect((host.to_string().as_str(), port)).await?;
Expand All @@ -35,7 +35,7 @@ where
{
let listener = TcpListener::bind((host.to_string().as_str(), port)).await?;
let resolved_addr = listener.local_addr()?;
let (stop_channel, stop_callback) = futures_channel::oneshot::channel::<()>();
let (stop_channel, stop_callback) = futures::channel::oneshot::channel::<()>();
let task_handle = async_rt::task::spawn(async move {
let mut stop_callback = stop_callback.fuse();
loop {
Expand Down
2 changes: 1 addition & 1 deletion src/util.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::*;

use asynchronous_codec::FramedRead;
use bytes::Bytes;
use futures_util::{SinkExt, StreamExt};
use futures::{SinkExt, StreamExt};
use rand::Rng;

use std::convert::{TryFrom, TryInto};
Expand Down

0 comments on commit ae112cc

Please sign in to comment.