Skip to content

Commit 90dba8d

Browse files
committed
chore: add outgoing Readable stream from WASM DataStore.get
Add an implementation for the outgoing StreamReadable from DataStore.get, which uses wrapped EventEmitters to build the output stream.
1 parent ecf1a1a commit 90dba8d

File tree

3 files changed

+48
-7
lines changed

3 files changed

+48
-7
lines changed

crates/dwn-rs-wasm/src/streams/stream.rs

Lines changed: 39 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,13 @@ use std::{
33
task::{Context, Poll},
44
};
55

6+
use futures_util::{pin_mut, StreamExt};
67
use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver};
78
use tokio_stream::Stream;
89
use wasm_bindgen::prelude::*;
910

11+
use crate::streams::sys::EventEmitter;
12+
1013
use super::sys::Readable;
1114

1215
#[derive(Clone, Debug)]
@@ -40,6 +43,42 @@ impl StreamReadable {
4043
&self.readable
4144
}
4245

46+
/// from_stream creates a new StreamReadable from a Rust Stream. This function will return a
47+
/// new StreamReadable, and the Readable (accessible as as_raw) will stream data to the
48+
/// JavaScript stream, as JsValues.
49+
pub async fn from_stream<St>(stream: St) -> Self
50+
where
51+
St: Stream<Item = Result<JsValue, JsValue>>,
52+
{
53+
// TODO: this is an extremely "hacky" implementation, that uses a legacy trait of
54+
// streams in Node, and wraps an EventEmitter, emitting data from the Rust Stream,
55+
// using the Readable.wrap, turning it into a proper Node ReadableStream. Once (if)
56+
// dwn-sdk-js is on Web Streams, we can remove this (or someone can find a better way).
57+
let ee = EventEmitter::new();
58+
let readable = Readable::new().wrap(JsCast::unchecked_into::<Readable>(ee.clone()));
59+
readable.resume();
60+
61+
pin_mut!(stream);
62+
63+
while let Some(item) = stream.next().await {
64+
let item = match item {
65+
Ok(i) => i,
66+
Err(e) => {
67+
if e.is_null() {
68+
ee.emit("end", JsValue::NULL);
69+
} else {
70+
ee.emit("error", e);
71+
}
72+
return Self::new(readable);
73+
}
74+
};
75+
76+
ee.emit("data", item.clone());
77+
}
78+
79+
Self::new(readable)
80+
}
81+
4382
/// into_stream creates a new Stream from the StreamReadable stream. This function locks the StreamReadable in
4483
/// JavaScript, and attaches the handlers for data and end events. It then returns a new Stream
4584
/// from the locked data, and passes the values through unbounded channels.

crates/dwn-rs-wasm/src/streams/sys.rs

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,9 @@ extern "C" {
99
/// This is a very simple wrapper on the EventEmitter class in node.js (and provided by the `events module).
1010
pub type EventEmitter;
1111

12+
#[wasm_bindgen(constructor)]
13+
pub fn new() -> EventEmitter;
14+
1215
#[wasm_bindgen(method, js_name = on)]
1316
pub fn on(this: &EventEmitter, event: &str, callback: &Function);
1417

@@ -19,7 +22,7 @@ extern "C" {
1922
pub fn off(this: &EventEmitter, event: &str, callback: &Function);
2023

2124
#[wasm_bindgen(method, js_name = emit)]
22-
pub fn emit(this: &EventEmitter, event: &str, args: Box<[JsValue]>);
25+
pub fn emit(this: &EventEmitter, event: &str, args: JsValue);
2326
}
2427

2528
// Basic bindings to the node.js Writable stream module.

crates/dwn-rs-wasm/src/surrealdb/data_store.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,5 @@
11
use async_stream::stream;
2-
use core::iter::IntoIterator;
3-
use futures_util::{stream, StreamExt, TryStreamExt};
2+
use futures_util::StreamExt;
43
use js_sys::{Object, Reflect};
54
use thiserror::Error;
65
use wasm_bindgen::prelude::*;
@@ -115,18 +114,18 @@ impl SurrealDataStore {
115114
let size = v.size;
116115
let reader = stream! {
117116
while let Some(chunk) = v.data.next().await {
118-
yield Ok(wasm_bindgen::JsCast::unchecked_into(js_sys::Uint8Array::from(chunk.as_slice())));
117+
yield Ok(js_sys::Uint8Array::from(chunk.as_slice()).into());
119118
}
119+
120+
yield Err(JsValue::NULL);
120121
};
121122

122123
let obj: DataStoreGetResult = JsCast::unchecked_into(Object::new());
123124
Reflect::set(&obj, &"dataSize".into(), &size.into())?;
124125
Reflect::set(
125126
&obj,
126127
&"dataStream".into(),
127-
&Readable::from_web(JsCast::unchecked_into(
128-
wasm_streams::ReadableStream::from_stream(reader).into_raw(),
129-
)),
128+
StreamReadable::from_stream(reader).await.as_raw(),
130129
)?;
131130

132131
Ok(Some(obj))

0 commit comments

Comments
 (0)