Skip to content

Commit 906de8e

Browse files
author
Stjepan Glavina
committed
Update async-io
1 parent 9b6ba0e commit 906de8e

File tree

3 files changed

+62
-21
lines changed

3 files changed

+62
-21
lines changed

Cargo.toml

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,21 @@ readme = "README.md"
1616
concurrent-queue = "1.2.2"
1717
fastrand = "1.3.5"
1818
futures-lite = "1.4.0"
19+
libc = "0.2.78"
1920
log = "0.4.11"
2021
nb-connect = "1.0.0"
2122
once_cell = "1.4.1"
2223
parking = "2.0.0"
23-
polling = "1.0.1"
24+
polling = "2.0.0"
2425
vec-arena = "1.0.0"
2526
waker-fn = "1.1.0"
2627

28+
[target.'cfg(windows)'.dependencies]
29+
winapi = { version = "0.3.9", features = ["winsock2"] }
30+
2731
[dev-dependencies]
2832
async-channel = "1.4.2"
29-
async-net = "1.3.0"
33+
# async-net = "1.3.0"
3034
blocking = "1.0.0"
3135
signal-hook = "0.1.16"
3236
tempfile = "3.1.0"

src/lib.rs

Lines changed: 34 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,6 @@
5353
//! # std::io::Result::Ok(()) });
5454
//! ```
5555
56-
#![forbid(unsafe_code)]
5756
#![warn(missing_docs, missing_debug_implementations, rust_2018_idioms)]
5857

5958
use std::convert::TryFrom;
@@ -73,7 +72,7 @@ use std::{
7372
};
7473

7574
#[cfg(windows)]
76-
use std::os::windows::io::{AsRawSocket, FromRawSocket, RawSocket};
75+
use std::os::windows::io::{AsRawSocket, RawSocket};
7776

7877
use futures_lite::io::{AsyncRead, AsyncWrite};
7978
use futures_lite::stream::{self, Stream};
@@ -393,8 +392,21 @@ impl<T: AsRawFd> Async<T> {
393392
/// # std::io::Result::Ok(()) });
394393
/// ```
395394
pub fn new(io: T) -> io::Result<Async<T>> {
395+
let fd = io.as_raw_fd();
396+
397+
// Put the file descriptor in non-blocking mode.
398+
unsafe {
399+
let mut res = libc::fcntl(fd, libc::F_GETFL);
400+
if res != -1 {
401+
res = libc::fcntl(fd, libc::F_SETFL, res | libc::O_NONBLOCK);
402+
}
403+
if res == -1 {
404+
return Err(io::Error::last_os_error());
405+
}
406+
}
407+
396408
Ok(Async {
397-
source: Reactor::get().insert_io(io.as_raw_fd())?,
409+
source: Reactor::get().insert_io(fd)?,
398410
io: Some(io),
399411
})
400412
}
@@ -434,8 +446,26 @@ impl<T: AsRawSocket> Async<T> {
434446
/// # std::io::Result::Ok(()) });
435447
/// ```
436448
pub fn new(io: T) -> io::Result<Async<T>> {
449+
let sock = io.as_raw_socket();
450+
451+
// Put the socket in non-blocking mode.
452+
unsafe {
453+
use winapi::ctypes;
454+
use winapi::um::winsock2;
455+
456+
let mut nonblocking = true as ctypes::c_ulong;
457+
let res = winsock2::ioctlsocket(
458+
sock as winsock2::SOCKET,
459+
winsock2::FIONBIO,
460+
&mut nonblocking,
461+
);
462+
if res != 0 {
463+
return Err(io::Error::last_os_error());
464+
}
465+
}
466+
437467
Ok(Async {
438-
source: Reactor::get().insert_io(io.as_raw_socket())?,
468+
source: Reactor::get().insert_io(sock)?,
439469
io: Some(io),
440470
})
441471
}

src/reactor.rs

Lines changed: 22 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -87,18 +87,25 @@ impl Reactor {
8787
#[cfg(unix)] raw: RawFd,
8888
#[cfg(windows)] raw: RawSocket,
8989
) -> io::Result<Arc<Source>> {
90-
// Register the file descriptor.
91-
self.poller.insert(raw)?;
92-
9390
// Create an I/O source for this file descriptor.
94-
let mut sources = self.sources.lock().unwrap();
95-
let key = sources.next_vacant();
96-
let source = Arc::new(Source {
97-
raw,
98-
key,
99-
state: Default::default(),
100-
});
101-
sources.insert(source.clone());
91+
let source = {
92+
let mut sources = self.sources.lock().unwrap();
93+
let key = sources.next_vacant();
94+
let source = Arc::new(Source {
95+
raw,
96+
key,
97+
state: Default::default(),
98+
});
99+
sources.insert(source.clone());
100+
source
101+
};
102+
103+
// Register the file descriptor.
104+
if let Err(err) = self.poller.add(raw, Event::none(source.key)) {
105+
let mut sources = self.sources.lock().unwrap();
106+
sources.remove(source.key);
107+
return Err(err);
108+
}
102109

103110
Ok(source)
104111
}
@@ -107,7 +114,7 @@ impl Reactor {
107114
pub(crate) fn remove_io(&self, source: &Source) -> io::Result<()> {
108115
let mut sources = self.sources.lock().unwrap();
109116
sources.remove(source.key);
110-
self.poller.remove(source.raw)
117+
self.poller.delete(source.raw)
111118
}
112119

113120
/// Registers a timer in the reactor.
@@ -283,7 +290,7 @@ impl ReactorLock<'_> {
283290
// e.g. we were previously interested in both readability and writability,
284291
// but only one of them was emitted.
285292
if !state[READ].is_empty() || !state[WRITE].is_empty() {
286-
self.reactor.poller.interest(
293+
self.reactor.poller.modify(
287294
source.raw,
288295
Event {
289296
key: source.key,
@@ -415,7 +422,7 @@ impl Source {
415422

416423
// Update interest in this I/O handle.
417424
if was_empty {
418-
Reactor::get().poller.interest(
425+
Reactor::get().poller.modify(
419426
self.raw,
420427
Event {
421428
key: self.key,
@@ -466,7 +473,7 @@ impl Source {
466473

467474
// Update interest in this I/O handle.
468475
if was_empty {
469-
Reactor::get().poller.interest(
476+
Reactor::get().poller.modify(
470477
self.raw,
471478
Event {
472479
key: self.key,

0 commit comments

Comments
 (0)