Skip to content

Commit cf01341

Browse files
committed
feat: fix support for stream.Readable in browsers
- Since wasm-bindgen doesn't support a way to extend JavaScript classes (rustwasm/wasm-bindgen#210), add a basic JavaScript shim, that pushes the data available in Rust to the Readable in JavaScript. - use channels and spawn_local in Rust to move along Stream iterator and pass results to enqueue for read() (in the JavaScript shim) - adds an abort controller that will abort the stream in JavaScript is a stream error occurs in Rust
1 parent 90dba8d commit cf01341

File tree

3 files changed

+55
-21
lines changed

3 files changed

+55
-21
lines changed
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
// Provide a JavaScript shim for the WAS-based ReadableStream, so we can use it in
2+
// the browser and we can use the ES6-style classes
3+
4+
const { Readable } = require("readable-stream");
5+
6+
const makeReadable = (readFn, abort) => {
7+
return new Readable({
8+
read(size) {
9+
this.push(readFn(size));
10+
},
11+
12+
signal: abort,
13+
});
14+
};
15+
16+
module.exports = { makeReadable };

crates/dwn-rs-wasm/src/streams/stream.rs

Lines changed: 32 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -7,8 +7,10 @@ use futures_util::{pin_mut, StreamExt};
77
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
88
use tokio_stream::Stream;
99
use wasm_bindgen::prelude::*;
10+
use wasm_bindgen_futures::spawn_local;
11+
use web_sys::AbortController;
1012

11-
use crate::streams::sys::EventEmitter;
13+
use crate::streams::sys::make_readable;
1214

1315
use super::sys::Readable;
1416

@@ -50,33 +52,42 @@ impl StreamReadable {
5052
where
5153
St: Stream<Item = Result<JsValue, JsValue>>,
5254
{
53-
// TODO: this is an extremely "hacky" implementation, that uses a legacy trait of
54-
// streams in Node, and wraps an EventEmitter, emitting data from the Rust Stream,
55-
// using the Readable.wrap, turning it into a proper Node ReadableStream. Once (if)
56-
// dwn-sdk-js is on Web Streams, we can remove this (or someone can find a better way).
57-
let ee = EventEmitter::new();
58-
let readable = Readable::new().wrap(JsCast::unchecked_into::<Readable>(ee.clone()));
59-
readable.resume();
60-
6155
pin_mut!(stream);
56+
let (data_tx, mut data_rx) = unbounded_channel::<JsValue>();
57+
let controller = AbortController::new().unwrap();
6258

6359
while let Some(item) = stream.next().await {
64-
let item = match item {
65-
Ok(i) => i,
66-
Err(e) => {
67-
if e.is_null() {
68-
ee.emit("end", JsValue::NULL);
69-
} else {
70-
ee.emit("error", e);
60+
let data_tx = data_tx.clone();
61+
let controller = controller.clone();
62+
spawn_local(async move {
63+
match item {
64+
Ok(i) => {
65+
data_tx.send(i).unwrap();
66+
}
67+
Err(e) => {
68+
if e.is_null() {
69+
data_tx.send(JsValue::NULL).unwrap();
70+
} else {
71+
controller.abort();
72+
}
7173
}
72-
return Self::new(readable);
7374
}
74-
};
75-
76-
ee.emit("data", item.clone());
75+
});
7776
}
7877

79-
Self::new(readable)
78+
let newr = make_readable(
79+
// TODO: the closure should take a `size` argument, and properly buffer the data
80+
Closure::wrap(Box::new(move |_size| -> JsValue {
81+
match data_rx.blocking_recv() {
82+
Some(d) => d,
83+
None => JsValue::NULL,
84+
}
85+
}) as Box<dyn FnMut(JsValue) -> JsValue>)
86+
.into_js_value(),
87+
controller.signal(),
88+
);
89+
90+
Self::new(newr)
8091
}
8192

8293
/// into_stream creates a new Stream from the StreamReadable stream. This function locks the StreamReadable in

crates/dwn-rs-wasm/src/streams/sys.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use js_sys::{AsyncIterator, Function, Iterator, Object};
22
use wasm_bindgen::prelude::*;
3+
use web_sys::AbortSignal;
34

45
#[wasm_bindgen(module = "events")]
56
extern "C" {
@@ -142,3 +143,9 @@ extern "C" {
142143
#[derive(Debug, Clone)]
143144
pub type PassThrough;
144145
}
146+
147+
#[wasm_bindgen(module = "/src/streams/readable.js")]
148+
extern "C" {
149+
#[wasm_bindgen(js_name = makeReadable)]
150+
pub fn make_readable(write: JsValue, abort: AbortSignal) -> Readable;
151+
}

0 commit comments

Comments
 (0)