Skip to content

Commit fd22755

Browse files
committed
Merge branch 'master' into dev/statx
2 parents 8808611 + 2d36120 commit fd22755

File tree

12 files changed

+264
-51
lines changed

12 files changed

+264
-51
lines changed

compio-driver/Cargo.toml

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -60,24 +60,23 @@ windows-sys = { workspace = true, features = [
6060
# Linux specific dependencies
6161
[target.'cfg(target_os = "linux")'.dependencies]
6262
io-uring = { version = "0.6.2", optional = true }
63-
crossbeam-queue = { workspace = true, optional = true }
6463
polling = { version = "3.3.0", optional = true }
6564
paste = { workspace = true }
6665

6766
# Other platform dependencies
6867
[target.'cfg(all(not(target_os = "linux"), unix))'.dependencies]
69-
crossbeam-queue = { workspace = true }
7068
polling = "3.3.0"
7169

7270
[target.'cfg(unix)'.dependencies]
71+
crossbeam-queue = { workspace = true }
7372
libc = { workspace = true }
7473

7574
[dev-dependencies]
7675
compio-buf = { workspace = true, features = ["arrayvec"] }
7776

7877
[features]
7978
default = ["io-uring"]
80-
polling = ["dep:polling", "dep:crossbeam-queue"]
79+
polling = ["dep:polling"]
8180

8281
# Nightly features
8382
once_cell_try = []

compio-driver/src/fusion/mod.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,8 +11,8 @@ pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
1111
use std::{io, task::Poll, time::Duration};
1212

1313
pub use driver_type::DriverType;
14-
pub use iour::OpCode as IourOpCode;
1514
pub(crate) use iour::{sockaddr_storage, socklen_t};
15+
pub use iour::{OpCode as IourOpCode, OpEntry};
1616
pub use poll::{Decision, OpCode as PollOpCode};
1717
use slab::Slab;
1818

compio-driver/src/fusion/op.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ macro_rules! op {
7979
}
8080

8181
impl<$($ty: $trait),*> iour::OpCode for $name<$($ty),*> {
82-
fn create_entry(self: std::pin::Pin<&mut Self>) -> io_uring::squeue::Entry {
82+
fn create_entry(self: std::pin::Pin<&mut Self>) -> OpEntry {
8383
unsafe { self.map_unchecked_mut(|x| x.inner.iour() ) }.create_entry()
8484
}
8585
}

compio-driver/src/iocp/op.rs

Lines changed: 25 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ use std::{
1010
};
1111

1212
use aligned_array::{Aligned, A8};
13-
use compio_buf::{IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
13+
use compio_buf::{BufResult, IntoInner, IoBuf, IoBufMut, IoVectoredBuf, IoVectoredBufMut};
1414
#[cfg(not(feature = "once_cell_try"))]
1515
use once_cell::sync::OnceCell as OnceLock;
1616
use socket2::SockAddr;
@@ -113,6 +113,30 @@ fn get_wsa_fn<F>(handle: RawFd, fguid: GUID) -> io::Result<Option<F>> {
113113
Ok(fptr)
114114
}
115115

116+
impl<
117+
D: std::marker::Send + Unpin + 'static,
118+
F: (FnOnce() -> BufResult<usize, D>) + std::marker::Send + std::marker::Sync + Unpin + 'static,
119+
> OpCode for Asyncify<F, D>
120+
{
121+
fn is_overlapped(&self) -> bool {
122+
false
123+
}
124+
125+
unsafe fn operate(mut self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> Poll<io::Result<usize>> {
126+
let f = self
127+
.f
128+
.take()
129+
.expect("the operate method could only be called once");
130+
let BufResult(res, data) = f();
131+
self.data = Some(data);
132+
Poll::Ready(res)
133+
}
134+
135+
unsafe fn cancel(self: Pin<&mut Self>, _optr: *mut OVERLAPPED) -> io::Result<()> {
136+
Ok(())
137+
}
138+
}
139+
116140
/// Open or create a file with flags and mode.
117141
pub struct OpenFile {
118142
pub(crate) path: U16CString,

compio-driver/src/iour/mod.rs

Lines changed: 67 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,13 @@
11
#[cfg_attr(all(doc, docsrs), doc(cfg(all())))]
22
#[allow(unused_imports)]
33
pub use std::os::fd::{AsRawFd, FromRawFd, IntoRawFd, RawFd};
4-
use std::{collections::VecDeque, io, os::fd::OwnedFd, pin::Pin, task::Poll, time::Duration};
4+
use std::{
5+
collections::VecDeque, io, os::fd::OwnedFd, pin::Pin, ptr::NonNull, sync::Arc, task::Poll,
6+
time::Duration,
7+
};
58

69
use compio_log::{instrument, trace};
10+
use crossbeam_queue::SegQueue;
711
use io_uring::{
812
cqueue,
913
opcode::{AsyncCancel, Read},
@@ -14,15 +18,35 @@ use io_uring::{
1418
pub(crate) use libc::{sockaddr_storage, socklen_t};
1519
use slab::Slab;
1620

17-
use crate::{syscall, Entry, ProactorBuilder};
21+
use crate::{syscall, AsyncifyPool, Entry, ProactorBuilder};
1822

1923
pub(crate) mod op;
2024
pub(crate) use crate::unix::RawOp;
2125

26+
/// The created entry of [`OpCode`].
27+
pub enum OpEntry {
28+
/// This operation creates an io-uring submission entry.
29+
Submission(squeue::Entry),
30+
/// This operation is a blocking one.
31+
Blocking,
32+
}
33+
34+
impl From<squeue::Entry> for OpEntry {
35+
fn from(value: squeue::Entry) -> Self {
36+
Self::Submission(value)
37+
}
38+
}
39+
2240
/// Abstraction of io-uring operations.
2341
pub trait OpCode {
2442
/// Create submission entry.
25-
fn create_entry(self: Pin<&mut Self>) -> squeue::Entry;
43+
fn create_entry(self: Pin<&mut Self>) -> OpEntry;
44+
45+
/// Call the operation in a blocking way. This method will only be called if
46+
/// [`create_entry`] returns [`OpEntry::Blocking`].
47+
fn call_blocking(self: Pin<&mut Self>) -> io::Result<usize> {
48+
unreachable!("this operation is asynchronous")
49+
}
2650
}
2751

2852
/// Low-level driver of io-uring.
@@ -31,6 +55,8 @@ pub(crate) struct Driver {
3155
squeue: VecDeque<squeue::Entry>,
3256
notifier: Notifier,
3357
notifier_registered: bool,
58+
pool: AsyncifyPool,
59+
pool_completed: Arc<SegQueue<Entry>>,
3460
}
3561

3662
impl Driver {
@@ -45,6 +71,8 @@ impl Driver {
4571
squeue: VecDeque::with_capacity(builder.capacity as usize),
4672
notifier: Notifier::new()?,
4773
notifier_registered: false,
74+
pool: builder.create_or_get_thread_pool(),
75+
pool_completed: Arc::new(SegQueue::new()),
4876
})
4977
}
5078

@@ -118,6 +146,10 @@ impl Driver {
118146
}
119147

120148
fn poll_entries(&mut self, entries: &mut impl Extend<Entry>) {
149+
while let Some(entry) = self.pool_completed.pop() {
150+
entries.extend(Some(entry));
151+
}
152+
121153
let mut cqueue = self.inner.completion();
122154
cqueue.sync();
123155
let completed_entries = cqueue.filter_map(|entry| match entry.user_data() {
@@ -147,11 +179,39 @@ impl Driver {
147179

148180
pub fn push(&mut self, user_data: usize, op: &mut RawOp) -> Poll<io::Result<usize>> {
149181
instrument!(compio_log::Level::TRACE, "push", user_data);
150-
let op = op.as_pin();
182+
let op_pin = op.as_pin();
151183
trace!("push RawOp");
152-
self.squeue
153-
.push_back(op.create_entry().user_data(user_data as _));
154-
Poll::Pending
184+
if let OpEntry::Submission(entry) = op_pin.create_entry() {
185+
self.squeue.push_back(entry.user_data(user_data as _));
186+
Poll::Pending
187+
} else if self.push_blocking(user_data, op)? {
188+
Poll::Pending
189+
} else {
190+
Poll::Ready(Err(io::Error::from_raw_os_error(libc::EBUSY)))
191+
}
192+
}
193+
194+
fn push_blocking(&mut self, user_data: usize, op: &mut RawOp) -> io::Result<bool> {
195+
// Safety: the RawOp is not released before the operation returns.
196+
struct SendWrapper<T>(T);
197+
unsafe impl<T> Send for SendWrapper<T> {}
198+
199+
let op = SendWrapper(NonNull::from(op));
200+
let handle = self.handle()?;
201+
let completed = self.pool_completed.clone();
202+
let is_ok = self
203+
.pool
204+
.dispatch(move || {
205+
#[allow(clippy::redundant_locals)]
206+
let mut op = op;
207+
let op = unsafe { op.0.as_mut() };
208+
let op_pin = op.as_pin();
209+
let res = op_pin.call_blocking();
210+
completed.push(Entry::new(user_data, res));
211+
handle.notify().ok();
212+
})
213+
.is_ok();
214+
Ok(is_ok)
155215
}
156216

157217
pub unsafe fn poll(

0 commit comments

Comments
 (0)