Skip to content

Commit e13742e

Browse files
committed
feat: add LazyDirect watcher
1 parent 7a7cf4b commit e13742e

File tree

1 file changed

+75
-0
lines changed

1 file changed

+75
-0
lines changed

src/lib.rs

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,20 @@ impl<T: Clone + Eq> Watchable<T> {
179179
}
180180
}
181181

182+
/// Creates a [`LazyDirect`] [`Watcher`], allowing the value to be observed, but not modified.
183+
///
184+
/// The [`LazyDirect`] watcher does not store the current value, making it smaller. If the watchable
185+
/// is dropped, [`LazyDirect::get`] returns `T::default`.
186+
pub fn watch_lazy(&self) -> LazyDirect<T>
187+
where
188+
T: Default,
189+
{
190+
LazyDirect {
191+
epoch: self.shared.state().epoch,
192+
shared: Arc::downgrade(&self.shared),
193+
}
194+
}
195+
182196
/// Returns the currently stored value.
183197
pub fn get(&self) -> T {
184198
self.shared.get()
@@ -392,6 +406,48 @@ impl<T: Clone + Eq> Watcher for Direct<T> {
392406
}
393407
}
394408

409+
/// A lazy direct observer of a [`Watchable`] value.
410+
///
411+
/// Other than [`Direct`] it does not store the current value. It needs `T` to implement [`Default`].
412+
/// If the watchable is dropped, [`Self::get`] will return `T::default()`.
413+
///
414+
/// This type is mainly used via the [`Watcher`] interface.
415+
#[derive(Debug, Clone)]
416+
pub struct LazyDirect<T> {
417+
epoch: u64,
418+
shared: Weak<Shared<T>>,
419+
}
420+
421+
impl<T: Clone + Default + Eq> Watcher for LazyDirect<T> {
422+
type Value = T;
423+
424+
fn get(&mut self) -> Self::Value {
425+
if let Some(shared) = self.shared.upgrade() {
426+
let state = shared.state();
427+
self.epoch = state.epoch;
428+
state.value
429+
} else {
430+
T::default()
431+
}
432+
}
433+
434+
fn is_connected(&self) -> bool {
435+
self.shared.upgrade().is_some()
436+
}
437+
438+
fn poll_updated(
439+
&mut self,
440+
cx: &mut task::Context<'_>,
441+
) -> Poll<Result<Self::Value, Disconnected>> {
442+
let Some(shared) = self.shared.upgrade() else {
443+
return Poll::Ready(Err(Disconnected));
444+
};
445+
let state = ready!(shared.poll_updated(cx, self.epoch));
446+
self.epoch = state.epoch;
447+
Poll::Ready(Ok(state.value))
448+
}
449+
}
450+
395451
impl<S: Watcher, T: Watcher> Watcher for (S, T) {
396452
type Value = (S::Value, T::Value);
397453

@@ -1214,4 +1270,23 @@ mod tests {
12141270
assert!(!values.is_empty());
12151271
}
12161272
}
1273+
1274+
#[test]
1275+
fn test_lazy_direct() {
1276+
let a = Watchable::new(1u8);
1277+
let mut w1 = a.watch_lazy();
1278+
let mut w2 = a.watch_lazy();
1279+
assert_eq!(w1.get(), 1u8);
1280+
assert_eq!(w2.get(), 1u8);
1281+
a.set(2u8).unwrap();
1282+
assert_eq!(w1.get(), 2u8);
1283+
assert_eq!(w2.get(), 2u8);
1284+
let mut s1 = w1.stream_updates_only();
1285+
a.set(3u8).unwrap();
1286+
assert_eq!(n0_future::future::now_or_never(s1.next()), Some(Some(3u8)));
1287+
assert_eq!(w2.get(), 3u8);
1288+
drop(a);
1289+
assert_eq!(n0_future::future::now_or_never(s1.next()), Some(None));
1290+
assert_eq!(w2.get(), 0u8);
1291+
}
12171292
}

0 commit comments

Comments
 (0)