11
11
use futures:: ready;
12
12
use futures:: stream:: Stream ;
13
13
use futures:: task:: { Context , Poll } ;
14
- use mio:: event:: Evented ;
15
- use mio:: unix:: EventedFd ;
16
- use mio:: { PollOpt , Ready , Token } ;
17
- use tokio:: io:: PollEvented ;
14
+ use tokio:: io:: unix:: { AsyncFd , TryIoError } ;
18
15
19
- use std:: io;
20
16
use std:: os:: unix:: io:: AsRawFd ;
21
17
use std:: pin:: Pin ;
22
18
23
19
use super :: event_err;
24
20
use super :: { LineEvent , LineEventHandle , Result } ;
25
21
26
- struct PollWrapper {
27
- handle : LineEventHandle ,
28
- }
29
-
30
- impl Evented for PollWrapper {
31
- fn register (
32
- & self ,
33
- poll : & mio:: Poll ,
34
- token : Token ,
35
- interest : Ready ,
36
- opts : PollOpt ,
37
- ) -> io:: Result < ( ) > {
38
- EventedFd ( & self . handle . file . as_raw_fd ( ) ) . register ( poll, token, interest, opts)
39
- }
40
-
41
- fn reregister (
42
- & self ,
43
- poll : & mio:: Poll ,
44
- token : Token ,
45
- interest : Ready ,
46
- opts : PollOpt ,
47
- ) -> io:: Result < ( ) > {
48
- EventedFd ( & self . handle . file . as_raw_fd ( ) ) . reregister ( poll, token, interest, opts)
49
- }
50
-
51
- fn deregister ( & self , poll : & mio:: Poll ) -> io:: Result < ( ) > {
52
- EventedFd ( & self . handle . file . as_raw_fd ( ) ) . deregister ( poll)
53
- }
54
- }
55
-
56
22
/// Wrapper around a `LineEventHandle` which implements a `futures::stream::Stream` for interrupts.
57
23
///
58
24
/// # Example
@@ -88,7 +54,7 @@ impl Evented for PollWrapper {
88
54
/// # }
89
55
/// ```
90
56
pub struct AsyncLineEventHandle {
91
- evented : PollEvented < PollWrapper > ,
57
+ asyncfd : AsyncFd < LineEventHandle > ,
92
58
}
93
59
94
60
impl AsyncLineEventHandle {
@@ -106,36 +72,35 @@ impl AsyncLineEventHandle {
106
72
}
107
73
108
74
Ok ( AsyncLineEventHandle {
109
- evented : PollEvented :: new ( PollWrapper { handle } ) ?,
75
+ asyncfd : AsyncFd :: new ( handle) ?,
110
76
} )
111
77
}
112
78
}
113
79
114
80
impl Stream for AsyncLineEventHandle {
115
81
type Item = Result < LineEvent > ;
116
82
117
- fn poll_next ( self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Option < Self :: Item > > {
118
- let ready = Ready :: readable ( ) ;
119
- if let Err ( e ) = ready ! ( self . evented . poll_read_ready ( cx, ready ) ) {
120
- return Poll :: Ready ( Some ( Err ( e . into ( ) ) ) ) ;
121
- }
122
-
123
- match self . evented . get_ref ( ) . handle . read_event ( ) {
124
- Ok ( Some ( event) ) => Poll :: Ready ( Some ( Ok ( event) ) ) ,
125
- Ok ( None ) => Poll :: Ready ( Some ( Err ( event_err ( nix :: Error :: Sys (
126
- nix :: errno :: Errno :: EIO ,
127
- ) ) ) ) ) ,
128
- Err ( nix :: Error :: Sys ( nix :: errno :: Errno :: EAGAIN ) ) => {
129
- self . evented . clear_read_ready ( cx , ready ) ? ;
130
- Poll :: Pending
83
+ fn poll_next ( mut self : Pin < & mut Self > , cx : & mut Context ) -> Poll < Option < Self :: Item > > {
84
+ loop {
85
+ let mut guard = ready ! ( self . asyncfd . poll_read_ready_mut ( cx) ) ? ;
86
+ match guard . try_io ( |inner| inner . get_mut ( ) . read_event ( ) ) {
87
+ Err ( TryIoError { .. } ) => {
88
+ // Continue
89
+ }
90
+ Ok ( Ok ( Some ( event) ) ) => return Poll :: Ready ( Some ( Ok ( event) ) ) ,
91
+ Ok ( Ok ( None ) ) => {
92
+ return Poll :: Ready ( Some ( Err ( event_err ( nix :: Error :: Sys (
93
+ nix :: errno :: Errno :: EIO ,
94
+ ) ) ) ) )
95
+ }
96
+ Ok ( Err ( err ) ) => return Poll :: Ready ( Some ( Err ( err . into ( ) ) ) ) ,
131
97
}
132
- Err ( e) => Poll :: Ready ( Some ( Err ( event_err ( e) ) ) ) ,
133
98
}
134
99
}
135
100
}
136
101
137
102
impl AsRef < LineEventHandle > for AsyncLineEventHandle {
138
103
fn as_ref ( & self ) -> & LineEventHandle {
139
- & self . evented . get_ref ( ) . handle
104
+ & self . asyncfd . get_ref ( )
140
105
}
141
106
}
0 commit comments