Skip to content

Commit 320e6dc

Browse files
authored
Merge pull request #167 from Berrysoft/dev/rawop-drop
feat(driver): drop completed `RawOp` on `Proactor` dropping
2 parents b692a4d + 293d5a3 commit 320e6dc

File tree

8 files changed

+146
-52
lines changed

8 files changed

+146
-52
lines changed

compio-driver/src/fusion/mod.rs

+4-5
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@ pub use poll::{Decision, OpCode as PollOpCode};
1717
use slab::Slab;
1818

1919
pub(crate) use crate::unix::RawOp;
20-
use crate::{Entry, ProactorBuilder};
20+
use crate::{Entry, OutEntries, ProactorBuilder};
2121

2222
mod driver_type {
2323
use std::sync::atomic::{AtomicU8, Ordering};
@@ -153,12 +153,11 @@ impl Driver {
153153
pub unsafe fn poll(
154154
&mut self,
155155
timeout: Option<Duration>,
156-
entries: &mut impl Extend<Entry>,
157-
registry: &mut Slab<RawOp>,
156+
entries: OutEntries<impl Extend<Entry>>,
158157
) -> io::Result<()> {
159158
match &mut self.fuse {
160-
FuseDriver::Poll(driver) => driver.poll(timeout, entries, registry),
161-
FuseDriver::IoUring(driver) => driver.poll(timeout, entries, registry),
159+
FuseDriver::Poll(driver) => driver.poll(timeout, entries),
160+
FuseDriver::IoUring(driver) => driver.poll(timeout, entries),
162161
}
163162
}
164163

compio-driver/src/iocp/mod.rs

+35-8
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ use windows_sys::Win32::{
3535
},
3636
};
3737

38-
use crate::{syscall, AsyncifyPool, Entry, ProactorBuilder};
38+
use crate::{syscall, AsyncifyPool, Entry, OutEntries, ProactorBuilder};
3939

4040
pub(crate) mod op;
4141

@@ -330,8 +330,7 @@ impl Driver {
330330
pub unsafe fn poll(
331331
&mut self,
332332
timeout: Option<Duration>,
333-
entries: &mut impl Extend<Entry>,
334-
_registry: &mut Slab<RawOp>,
333+
mut entries: OutEntries<impl Extend<Entry>>,
335334
) -> io::Result<()> {
336335
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
337336
// Prevent stack growth.
@@ -430,28 +429,56 @@ impl<T> Overlapped<T> {
430429
}
431430
}
432431

433-
pub(crate) struct RawOp(NonNull<Overlapped<dyn OpCode>>);
432+
pub(crate) struct RawOp {
433+
op: NonNull<Overlapped<dyn OpCode>>,
434+
// The two flags here are manual reference counting. The driver holds the strong ref until it
435+
// completes; the runtime holds the strong ref until the future is dropped.
436+
completed: bool,
437+
cancelled: bool,
438+
}
434439

435440
impl RawOp {
436441
pub(crate) fn new(user_data: usize, op: impl OpCode + 'static) -> Self {
437442
let op = Overlapped::new(user_data, op);
438443
let op = Box::new(op) as Box<Overlapped<dyn OpCode>>;
439-
Self(unsafe { NonNull::new_unchecked(Box::into_raw(op)) })
444+
Self {
445+
op: unsafe { NonNull::new_unchecked(Box::into_raw(op)) },
446+
completed: false,
447+
cancelled: false,
448+
}
440449
}
441450

442451
pub fn as_op_pin(&mut self) -> Pin<&mut dyn OpCode> {
443-
unsafe { Pin::new_unchecked(&mut self.0.as_mut().op) }
452+
unsafe { Pin::new_unchecked(&mut self.op.as_mut().op) }
444453
}
445454

446455
pub fn as_mut_ptr(&mut self) -> *mut Overlapped<dyn OpCode> {
447-
self.0.as_ptr()
456+
self.op.as_ptr()
457+
}
458+
459+
pub fn set_completed(&mut self) -> bool {
460+
self.completed = true;
461+
self.cancelled
462+
}
463+
464+
pub fn set_cancelled(&mut self) -> bool {
465+
self.cancelled = true;
466+
self.completed
448467
}
449468

450469
/// # Safety
451470
/// The caller should ensure the correct type.
452471
pub unsafe fn into_inner<T: OpCode>(self) -> T {
453472
let this = ManuallyDrop::new(self);
454-
let this: Box<Overlapped<T>> = Box::from_raw(this.0.cast().as_ptr());
473+
let this: Box<Overlapped<T>> = Box::from_raw(this.op.cast().as_ptr());
455474
this.op
456475
}
457476
}
477+
478+
impl Drop for RawOp {
479+
fn drop(&mut self) {
480+
if self.completed {
481+
let _ = unsafe { Box::from_raw(self.op.as_ptr()) };
482+
}
483+
}
484+
}

compio-driver/src/iour/mod.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@ use io_uring::{
1818
pub(crate) use libc::{sockaddr_storage, socklen_t};
1919
use slab::Slab;
2020

21-
use crate::{syscall, AsyncifyPool, Entry, ProactorBuilder};
21+
use crate::{syscall, AsyncifyPool, Entry, OutEntries, ProactorBuilder};
2222

2323
pub(crate) mod op;
2424
pub(crate) use crate::unix::RawOp;
@@ -217,8 +217,7 @@ impl Driver {
217217
pub unsafe fn poll(
218218
&mut self,
219219
timeout: Option<Duration>,
220-
entries: &mut impl Extend<Entry>,
221-
_registry: &mut Slab<RawOp>,
220+
mut entries: OutEntries<impl Extend<Entry>>,
222221
) -> io::Result<()> {
223222
instrument!(compio_log::Level::TRACE, "poll", ?timeout);
224223
if !self.notifier_registered {
@@ -239,7 +238,7 @@ impl Driver {
239238

240239
self.submit_auto(timeout, ended)?;
241240

242-
self.poll_entries(entries);
241+
self.poll_entries(&mut entries);
243242

244243
if ended {
245244
trace!("polling ended");

compio-driver/src/lib.rs

+41-2
Original file line numberDiff line numberDiff line change
@@ -197,9 +197,18 @@ impl Proactor {
197197
/// but just don't return from [`Proactor::poll`]. Therefore, although an
198198
/// operation is cancelled, you should not reuse its `user_data`.
199199
///
200-
/// It is well-defined to cancel before polling. If the submitted operation
200+
/// It is *safe* to cancel before polling. If the submitted operation
201201
/// contains a cancelled user-defined data, the operation will be ignored.
202+
/// However, to make the operation dropped correctly, you should cancel
203+
/// after push.
202204
pub fn cancel(&mut self, user_data: usize) {
205+
if let Some(op) = self.ops.get_mut(user_data) {
206+
if op.set_cancelled() {
207+
// The op is completed.
208+
self.ops.remove(user_data);
209+
return;
210+
}
211+
}
203212
self.driver.cancel(user_data, &mut self.ops);
204213
}
205214

@@ -227,7 +236,8 @@ impl Proactor {
227236
entries: &mut impl Extend<Entry>,
228237
) -> io::Result<()> {
229238
unsafe {
230-
self.driver.poll(timeout, entries, &mut self.ops)?;
239+
self.driver
240+
.poll(timeout, OutEntries::new(entries, &mut self.ops))?;
231241
}
232242
Ok(())
233243
}
@@ -320,6 +330,35 @@ impl Entry {
320330
}
321331
}
322332

333+
// The output entries need to be marked as `completed`. If an entry has been
334+
// marked as `cancelled`, it will be removed from the registry.
335+
struct OutEntries<'a, 'b, E> {
336+
entries: &'b mut E,
337+
registry: &'a mut Slab<RawOp>,
338+
}
339+
340+
impl<'a, 'b, E> OutEntries<'a, 'b, E> {
341+
pub fn new(entries: &'b mut E, registry: &'a mut Slab<RawOp>) -> Self {
342+
Self { entries, registry }
343+
}
344+
345+
#[allow(dead_code)]
346+
pub fn registry(&mut self) -> &mut Slab<RawOp> {
347+
self.registry
348+
}
349+
}
350+
351+
impl<E: Extend<Entry>> Extend<Entry> for OutEntries<'_, '_, E> {
352+
fn extend<T: IntoIterator<Item = Entry>>(&mut self, iter: T) {
353+
self.entries.extend(iter.into_iter().map(|e| {
354+
if self.registry[e.user_data()].set_completed() {
355+
self.registry.remove(e.user_data());
356+
}
357+
e
358+
}))
359+
}
360+
}
361+
323362
#[derive(Debug, Clone)]
324363
enum ThreadPoolBuilder {
325364
Create { limit: usize, recv_limit: Duration },

compio-driver/src/poll/mod.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ pub(crate) use libc::{sockaddr_storage, socklen_t};
1919
use polling::{Event, Events, Poller};
2020
use slab::Slab;
2121

22-
use crate::{syscall, AsyncifyPool, Entry, ProactorBuilder};
22+
use crate::{syscall, AsyncifyPool, Entry, OutEntries, ProactorBuilder};
2323

2424
pub(crate) mod op;
2525

@@ -268,8 +268,7 @@ impl Driver {
268268
pub unsafe fn poll(
269269
&mut self,
270270
timeout: Option<Duration>,
271-
entries: &mut impl Extend<Entry>,
272-
registry: &mut Slab<RawOp>,
271+
mut entries: OutEntries<impl Extend<Entry>>,
273272
) -> io::Result<()> {
274273
self.poll.wait(&mut self.events, timeout)?;
275274
if self.events.is_empty() && self.pool_completed.is_empty() && timeout.is_some() {
@@ -288,7 +287,7 @@ impl Driver {
288287
if self.cancelled.remove(&user_data) {
289288
entries.extend(Some(entry_cancelled(user_data)));
290289
} else {
291-
let op = registry[user_data].as_pin();
290+
let op = entries.registry()[user_data].as_pin();
292291
let res = match op.on_event(&event) {
293292
Poll::Pending => {
294293
// The operation should go back to the front.

compio-driver/src/unix/mod.rs

+32-4
Original file line numberDiff line numberDiff line change
@@ -7,22 +7,50 @@ use std::{mem::ManuallyDrop, pin::Pin, ptr::NonNull};
77

88
use crate::OpCode;
99

10-
pub(crate) struct RawOp(NonNull<dyn OpCode>);
10+
pub(crate) struct RawOp {
11+
op: NonNull<dyn OpCode>,
12+
// The two flags here are manual reference counting. The driver holds the strong ref until it
13+
// completes; the runtime holds the strong ref until the future is dropped.
14+
completed: bool,
15+
cancelled: bool,
16+
}
1117

1218
impl RawOp {
1319
pub(crate) fn new(_user_data: usize, op: impl OpCode + 'static) -> Self {
1420
let op = Box::new(op);
15-
Self(unsafe { NonNull::new_unchecked(Box::into_raw(op as Box<dyn OpCode>)) })
21+
Self {
22+
op: unsafe { NonNull::new_unchecked(Box::into_raw(op as Box<dyn OpCode>)) },
23+
completed: false,
24+
cancelled: false,
25+
}
1626
}
1727

1828
pub fn as_pin(&mut self) -> Pin<&mut dyn OpCode> {
19-
unsafe { Pin::new_unchecked(self.0.as_mut()) }
29+
unsafe { Pin::new_unchecked(self.op.as_mut()) }
30+
}
31+
32+
pub fn set_completed(&mut self) -> bool {
33+
self.completed = true;
34+
self.cancelled
35+
}
36+
37+
pub fn set_cancelled(&mut self) -> bool {
38+
self.cancelled = true;
39+
self.completed
2040
}
2141

2242
/// # Safety
2343
/// The caller should ensure the correct type.
2444
pub unsafe fn into_inner<T: OpCode>(self) -> T {
2545
let this = ManuallyDrop::new(self);
26-
*Box::from_raw(this.0.cast().as_ptr())
46+
*Box::from_raw(this.op.cast().as_ptr())
47+
}
48+
}
49+
50+
impl Drop for RawOp {
51+
fn drop(&mut self) {
52+
if self.completed {
53+
let _ = unsafe { Box::from_raw(self.op.as_ptr()) };
54+
}
2755
}
2856
}

compio-runtime/src/runtime/mod.rs

+3-5
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ impl RuntimeInner {
143143
}
144144

145145
pub fn cancel_op<T>(&self, user_data: Key<T>) {
146+
self.op_runtime.borrow_mut().remove(*user_data);
146147
self.driver.borrow_mut().cancel(*user_data);
147-
self.op_runtime.borrow_mut().cancel(*user_data);
148148
}
149149

150150
#[cfg(feature = "time")]
@@ -165,7 +165,7 @@ impl RuntimeInner {
165165
let res = self
166166
.driver
167167
.borrow_mut()
168-
.pop(&mut op.entry.into_iter())
168+
.pop(&mut op.into_completed().into_iter())
169169
.next()
170170
.expect("the result should have come");
171171
Poll::Ready(res.map_buffer(|op| unsafe { op.into_op::<T>() }))
@@ -204,9 +204,7 @@ impl RuntimeInner {
204204
Ok(_) => {
205205
debug!("poll driver ok, entries: {}", entries.len());
206206
for entry in entries {
207-
self.op_runtime
208-
.borrow_mut()
209-
.update_result(entry.user_data(), entry);
207+
self.op_runtime.borrow_mut().update_result(entry);
210208
}
211209
}
212210
Err(e) => match e.kind() {

compio-runtime/src/runtime/op.rs

+25-20
Original file line numberDiff line numberDiff line change
@@ -11,10 +11,20 @@ use compio_driver::{Entry, OpCode};
1111
use crate::{key::Key, Runtime};
1212

1313
#[derive(Default)]
14-
pub(crate) struct RegisteredOp {
15-
pub waker: Option<Waker>,
16-
pub entry: Option<Entry>,
17-
pub cancelled: bool,
14+
pub(crate) enum RegisteredOp {
15+
#[default]
16+
Default,
17+
Submitted(Waker),
18+
Completed(Entry),
19+
}
20+
21+
impl RegisteredOp {
22+
pub fn into_completed(self) -> Option<Entry> {
23+
match self {
24+
Self::Completed(op) => Some(op),
25+
_ => None,
26+
}
27+
}
1828
}
1929

2030
#[derive(Default)]
@@ -24,29 +34,24 @@ pub(crate) struct OpRuntime {
2434

2535
impl OpRuntime {
2636
pub fn update_waker(&mut self, key: usize, waker: Waker) {
27-
self.ops.entry(key).or_default().waker = Some(waker);
37+
*self.ops.entry(key).or_default() = RegisteredOp::Submitted(waker)
2838
}
2939

30-
pub fn update_result(&mut self, key: usize, entry: Entry) {
40+
pub fn update_result(&mut self, entry: Entry) {
41+
let key = entry.user_data();
3142
let op = self.ops.entry(key).or_default();
32-
if let Some(waker) = op.waker.take() {
33-
waker.wake();
34-
}
35-
op.entry = Some(entry);
36-
if op.cancelled {
37-
self.remove(key);
43+
match op {
44+
RegisteredOp::Default => *op = RegisteredOp::Completed(entry),
45+
RegisteredOp::Submitted(waker) => {
46+
waker.wake_by_ref();
47+
*op = RegisteredOp::Completed(entry);
48+
}
49+
RegisteredOp::Completed(res) => *res = entry,
3850
}
3951
}
4052

4153
pub fn has_result(&mut self, key: usize) -> bool {
42-
self.ops
43-
.get_mut(&key)
44-
.map(|op| op.entry.is_some())
45-
.unwrap_or_default()
46-
}
47-
48-
pub fn cancel(&mut self, key: usize) {
49-
self.ops.entry(key).or_default().cancelled = true;
54+
matches!(self.ops.get_mut(&key), Some(RegisteredOp::Completed(_)))
5055
}
5156

5257
pub fn remove(&mut self, key: usize) -> RegisteredOp {

0 commit comments

Comments
 (0)