1
1
use std:: {
2
2
io:: { self , ErrorKind , SeekFrom } ,
3
- ops:: DerefMut ,
4
3
pin:: Pin ,
5
- sync:: { Arc , Mutex } ,
6
4
task:: { Context , Poll } ,
7
5
} ;
8
6
@@ -17,7 +15,7 @@ use crate::api::{
17
15
pub struct Reader {
18
16
blobs : Blobs ,
19
17
options : ReaderOptions ,
20
- state : Arc < Mutex < ReaderState > > ,
18
+ state : ReaderState ,
21
19
}
22
20
23
21
#[ derive( Default , derive_more:: Debug ) ]
@@ -42,7 +40,7 @@ impl Reader {
42
40
Self {
43
41
blobs,
44
42
options,
45
- state : Arc :: new ( Mutex :: new ( ReaderState :: Idle { position : 0 } ) ) ,
43
+ state : ReaderState :: Idle { position : 0 } ,
46
44
}
47
45
}
48
46
}
@@ -56,8 +54,8 @@ impl tokio::io::AsyncRead for Reader {
56
54
let this = self . get_mut ( ) ;
57
55
let mut position1 = None ;
58
56
loop {
59
- let mut guard = this. state . lock ( ) . unwrap ( ) ;
60
- match std:: mem:: take ( guard. deref_mut ( ) ) {
57
+ let guard = & mut this. state ;
58
+ match std:: mem:: take ( guard) {
61
59
ReaderState :: Idle { position } => {
62
60
// todo: read until next page boundary instead of fixed size
63
61
let len = buf. remaining ( ) as u64 ;
@@ -78,6 +76,9 @@ impl tokio::io::AsyncRead for Reader {
78
76
ReaderState :: Reading { position, mut op } => {
79
77
let position1 = position1. get_or_insert ( position) ;
80
78
match op. poll_next ( cx) {
79
+ Poll :: Ready ( Some ( ExportRangesItem :: Size ( _) ) ) => {
80
+ * guard = ReaderState :: Reading { position, op } ;
81
+ }
81
82
Poll :: Ready ( Some ( ExportRangesItem :: Data ( data) ) ) => {
82
83
if data. offset != * position1 {
83
84
break Poll :: Ready ( Err ( io:: Error :: other (
@@ -96,13 +97,9 @@ impl tokio::io::AsyncRead for Reader {
96
97
}
97
98
Poll :: Ready ( Some ( ExportRangesItem :: Error ( err) ) ) => {
98
99
* guard = ReaderState :: Idle { position } ;
99
- break Poll :: Ready ( Err ( io:: Error :: other (
100
- format ! ( "Error reading data: {err}" ) ,
101
- ) ) ) ;
102
- }
103
- Poll :: Ready ( Some ( ExportRangesItem :: Size ( _size) ) ) => {
104
- // put back the state and continue reading
105
- * guard = ReaderState :: Reading { position, op } ;
100
+ break Poll :: Ready ( Err ( io:: Error :: other ( format ! (
101
+ "Error reading data: {err}"
102
+ ) ) ) ) ;
106
103
}
107
104
Poll :: Ready ( None ) => {
108
105
// done with the stream, go back in idle.
@@ -134,10 +131,9 @@ impl tokio::io::AsyncRead for Reader {
134
131
}
135
132
}
136
133
state @ ReaderState :: Seeking { .. } => {
137
- * this. state . lock ( ) . unwrap ( ) = state;
138
- break Poll :: Ready ( Err ( io:: Error :: other (
139
- "Can't read while seeking" ,
140
- ) ) ) ;
134
+ // should I try to recover from this or just keep it poisoned?
135
+ this. state = state;
136
+ break Poll :: Ready ( Err ( io:: Error :: other ( "Can't read while seeking" ) ) ) ;
141
137
}
142
138
ReaderState :: Poisoned => {
143
139
break Poll :: Ready ( Err ( io:: Error :: other ( "Reader is poisoned" ) ) ) ;
@@ -153,8 +149,8 @@ impl tokio::io::AsyncSeek for Reader {
153
149
seek_from : tokio:: io:: SeekFrom ,
154
150
) -> io:: Result < ( ) > {
155
151
let this = self . get_mut ( ) ;
156
- let mut guard = this. state . lock ( ) . unwrap ( ) ;
157
- match std:: mem:: take ( guard. deref_mut ( ) ) {
152
+ let guard = & mut this. state ;
153
+ match std:: mem:: take ( guard) {
158
154
ReaderState :: Idle { position } => {
159
155
let position1 = match seek_from {
160
156
SeekFrom :: Start ( pos) => pos,
@@ -187,8 +183,8 @@ impl tokio::io::AsyncSeek for Reader {
187
183
188
184
fn poll_complete ( self : Pin < & mut Self > , _cx : & mut Context < ' _ > ) -> Poll < io:: Result < u64 > > {
189
185
let this = self . get_mut ( ) ;
190
- let mut guard = this. state . lock ( ) . unwrap ( ) ;
191
- Poll :: Ready ( match std:: mem:: take ( guard. deref_mut ( ) ) {
186
+ let guard = & mut this. state ;
187
+ Poll :: Ready ( match std:: mem:: take ( guard) {
192
188
ReaderState :: Seeking { position } => {
193
189
* guard = ReaderState :: Idle { position } ;
194
190
Ok ( position)
@@ -199,7 +195,11 @@ impl tokio::io::AsyncSeek for Reader {
199
195
* guard = ReaderState :: Idle { position } ;
200
196
Ok ( position)
201
197
}
202
- ReaderState :: Reading { .. } => Err ( io:: Error :: other ( "Can't seek while reading" ) ) ,
198
+ state @ ReaderState :: Reading { .. } => {
199
+ // should I try to recover from this or just keep it poisoned?
200
+ * guard = state;
201
+ Err ( io:: Error :: other ( "Can't seek while reading" ) )
202
+ }
203
203
ReaderState :: Poisoned => Err ( io:: Error :: other ( "Reader is poisoned" ) ) ,
204
204
} )
205
205
}
0 commit comments