Skip to content

Commit ba75a26

Browse files
committed
Use async-channel for GatedReader
1 parent 8429064 commit ba75a26

File tree

2 files changed

+11
-9
lines changed

2 files changed

+11
-9
lines changed

crates/bevy_asset/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ bevy_utils = { path = "../bevy_utils", version = "0.14.0-dev" }
3131
async-broadcast = "0.5"
3232
async-fs = "2.0"
3333
async-lock = "3.0"
34+
async-channel = "2.2"
3435
concurrent-queue = "2.4"
3536
downcast-rs = "1.2"
3637
futures-io = "0.3"

crates/bevy_asset/src/io/gated.rs

+10-9
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use crate::io::{AssetReader, AssetReaderError, PathStream, Reader};
2+
use async_channel::{Sender, Receiver};
23
use bevy_utils::HashMap;
3-
use concurrent_queue::ConcurrentQueue;
44
use parking_lot::RwLock;
55
use std::{path::Path, sync::Arc};
66

@@ -10,7 +10,7 @@ use std::{path::Path, sync::Arc};
1010
/// This is built primarily for unit tests.
1111
pub struct GatedReader<R: AssetReader> {
1212
reader: R,
13-
gates: Arc<RwLock<HashMap<Box<Path>, ConcurrentQueue<()>>>>,
13+
gates: Arc<RwLock<HashMap<Box<Path>, (Sender<()>, Receiver<()>)>>>,
1414
}
1515

1616
impl<R: AssetReader + Clone> Clone for GatedReader<R> {
@@ -24,7 +24,7 @@ impl<R: AssetReader + Clone> Clone for GatedReader<R> {
2424

2525
/// Opens path "gates" for a [`GatedReader`].
2626
pub struct GateOpener {
27-
gates: Arc<RwLock<HashMap<Box<Path>, ConcurrentQueue<()>>>>,
27+
gates: Arc<RwLock<HashMap<Box<Path>, (Sender<()>, Receiver<()>)>>>,
2828
}
2929

3030
impl GateOpener {
@@ -34,8 +34,8 @@ impl GateOpener {
3434
let mut gates = self.gates.write();
3535
let gates = gates
3636
.entry_ref(path.as_ref())
37-
.or_insert_with(ConcurrentQueue::unbounded);
38-
gates.push(()).unwrap();
37+
.or_insert_with(async_channel::unbounded);
38+
gates.0.send(()).unwrap();
3939
}
4040
}
4141

@@ -56,13 +56,14 @@ impl<R: AssetReader> GatedReader<R> {
5656

5757
impl<R: AssetReader> AssetReader for GatedReader<R> {
5858
async fn read<'a>(&'a self, path: &'a Path) -> Result<Box<Reader<'a>>, AssetReaderError> {
59-
{
59+
let receiver = {
6060
let mut gates = self.gates.write();
6161
let gates = gates
6262
.entry_ref(path.as_ref())
63-
.or_insert_with(ConcurrentQueue::unbounded);
64-
gates.pop().unwrap();
65-
}
63+
.or_insert_with(async_channel::unbounded);
64+
gates.1.clone()
65+
};
66+
receiver.recv().await;
6667
let result = self.reader.read(path).await?;
6768
Ok(result)
6869
}

0 commit comments

Comments
 (0)