Skip to content

Commit f87f844

Browse files
authored
fix: Forward list operations to runtime to avoid hang (#17842)
1 parent d92611d commit f87f844

File tree

1 file changed

+81
-4
lines changed

1 file changed

+81
-4
lines changed

src/common/storage/src/runtime_layer.rs

Lines changed: 81 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ use opendal::raw::RpRead;
3535
use opendal::raw::RpStat;
3636
use opendal::raw::RpWrite;
3737
use opendal::Buffer;
38+
use opendal::Metadata;
3839
use opendal::Result;
3940

4041
/// # TODO
@@ -89,11 +90,11 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
8990
type Inner = A;
9091
type Reader = RuntimeIO<A::Reader>;
9192
type BlockingReader = A::BlockingReader;
92-
type Writer = A::Writer;
93+
type Writer = RuntimeIO<A::Writer>;
9394
type BlockingWriter = A::BlockingWriter;
94-
type Lister = A::Lister;
95+
type Lister = RuntimeIO<A::Lister>;
9596
type BlockingLister = A::BlockingLister;
96-
type Deleter = RuntimeIO<A::Deleter>;
97+
type Deleter = RuntimeIO<RuntimeIO<A::Deleter>>;
9798
type BlockingDeleter = A::BlockingDeleter;
9899

99100
fn inner(&self) -> &Self::Inner {
@@ -130,6 +131,10 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
130131
.spawn(async move { op.write(&path, args).await })
131132
.await
132133
.expect("join must success")
134+
.map(|(rp, r)| {
135+
let r = RuntimeIO::new(r, self.runtime.clone());
136+
(rp, r)
137+
})
133138
}
134139

135140
async fn stat(&self, path: &str, args: OpStat) -> Result<RpStat> {
@@ -152,6 +157,10 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
152157
let r = RuntimeIO::new(r, self.runtime.clone());
153158
(rp, r)
154159
})
160+
.map(|(rp, r)| {
161+
let r = RuntimeIO::new(r, self.runtime.clone());
162+
(rp, r)
163+
})
155164
}
156165

157166
async fn list(&self, path: &str, args: OpList) -> Result<(RpList, Self::Lister)> {
@@ -161,6 +170,10 @@ impl<A: Access> LayeredAccess for RuntimeAccessor<A> {
161170
.spawn(async move { op.list(&path, args).await })
162171
.await
163172
.expect("join must success")
173+
.map(|(rp, r)| {
174+
let r = RuntimeIO::new(r, self.runtime.clone());
175+
(rp, r)
176+
})
164177
}
165178

166179
fn blocking_read(&self, path: &str, args: OpRead) -> Result<(RpRead, Self::BlockingReader)> {
@@ -211,13 +224,77 @@ impl<R: oio::Read> oio::Read for RuntimeIO<R> {
211224
}
212225
}
213226

227+
impl<R: oio::Write> oio::Write for RuntimeIO<R> {
228+
async fn write(&mut self, bs: Buffer) -> Result<()> {
229+
let mut r = self.inner.take().expect("writer must be valid");
230+
let runtime = self.runtime.clone();
231+
232+
let (r, res) = runtime
233+
.spawn(async move {
234+
let res = r.write(bs).await;
235+
(r, res)
236+
})
237+
.await
238+
.expect("join must success");
239+
self.inner = Some(r);
240+
res
241+
}
242+
243+
async fn close(&mut self) -> Result<Metadata> {
244+
let mut r = self.inner.take().expect("writer must be valid");
245+
let runtime = self.runtime.clone();
246+
247+
let (r, res) = runtime
248+
.spawn(async move {
249+
let res = r.close().await;
250+
(r, res)
251+
})
252+
.await
253+
.expect("join must success");
254+
self.inner = Some(r);
255+
res
256+
}
257+
258+
async fn abort(&mut self) -> Result<()> {
259+
let mut r = self.inner.take().expect("writer must be valid");
260+
let runtime = self.runtime.clone();
261+
262+
let (r, res) = runtime
263+
.spawn(async move {
264+
let res = r.abort().await;
265+
(r, res)
266+
})
267+
.await
268+
.expect("join must success");
269+
self.inner = Some(r);
270+
res
271+
}
272+
}
273+
274+
impl<R: oio::List> oio::List for RuntimeIO<R> {
275+
async fn next(&mut self) -> Result<Option<oio::Entry>> {
276+
let mut r = self.inner.take().expect("lister must be valid");
277+
let runtime = self.runtime.clone();
278+
279+
let (r, res) = runtime
280+
.spawn(async move {
281+
let res = r.next().await;
282+
(r, res)
283+
})
284+
.await
285+
.expect("join must success");
286+
self.inner = Some(r);
287+
res
288+
}
289+
}
290+
214291
impl<R: oio::Delete> oio::Delete for RuntimeIO<R> {
215292
fn delete(&mut self, path: &str, args: OpDelete) -> Result<()> {
216293
self.inner.as_mut().unwrap().delete(path, args)
217294
}
218295

219296
async fn flush(&mut self) -> Result<usize> {
220-
let mut r = self.inner.take().expect("reader must be valid");
297+
let mut r = self.inner.take().expect("deleter must be valid");
221298
let runtime = self.runtime.clone();
222299

223300
let (r, res) = runtime

0 commit comments

Comments
 (0)