diff --git a/Cargo.lock b/Cargo.lock index 8af27f7..9900b17 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1730,6 +1730,7 @@ dependencies = [ "clap", "dirs-next", "flume", + "futures", "futures-util", "heed", "hex", diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml index 894dc84..0ad4887 100644 --- a/pubky-homeserver/Cargo.toml +++ b/pubky-homeserver/Cargo.toml @@ -29,6 +29,7 @@ tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = "2.5.2" tokio-stream = { version = "0.1.16", features = ["sync"] } +futures = "0.3.31" [dev-dependencies] reqwest = "0.12.8" diff --git a/pubky-homeserver/src/database/tables/events.rs b/pubky-homeserver/src/database/tables/events.rs index f4814ee..b1e3aaf 100644 --- a/pubky-homeserver/src/database/tables/events.rs +++ b/pubky-homeserver/src/database/tables/events.rs @@ -58,7 +58,7 @@ impl Event { } } - pub fn into_event_line(self) -> String { + pub fn to_event_line(&self) -> String { format!("{} {}", self.operation(), self.url()) } } @@ -72,7 +72,7 @@ impl DB { &self, limit: Option, cursor: Option, - ) -> anyhow::Result> { + ) -> anyhow::Result> { let txn = self.env.read_txn()?; let limit = limit @@ -81,7 +81,7 @@ impl DB { let cursor = cursor.unwrap_or("0000000000000".to_string()); - let mut result: Vec = vec![]; + let mut result: Vec<(String, Event)> = vec![]; let mut next_cursor = cursor.to_string(); for _ in 0..limit { @@ -91,16 +91,12 @@ impl DB { next_cursor = timestamp.to_string(); - result.push(event.into_event_line()); + result.push((timestamp.to_string(), event)); } None => break, }; } - if !result.is_empty() { - result.push(format!("cursor: {next_cursor}")) - } - txn.commit()?; Ok(result) diff --git a/pubky-homeserver/src/routes/feed.rs b/pubky-homeserver/src/routes/feed.rs index d9a85ad..6690e54 100644 --- a/pubky-homeserver/src/routes/feed.rs +++ b/pubky-homeserver/src/routes/feed.rs @@ -1,12 +1,13 @@ use axum::{ body::Body, extract::State, - http::{header, Response, StatusCode}, + http::{header, HeaderMap, Response, StatusCode}, response::{ sse::{Event, Sse}, IntoResponse, }, }; +use futures::stream; use tokio_stream::{wrappers::BroadcastStream, StreamExt}; use pubky_common::timestamp::Timestamp; @@ -19,26 +20,9 @@ use crate::{ pub async fn feed( State(state): State, + headers: HeaderMap, params: ListQueryParams, ) -> Result { - if params.subscribe { - let rx = state.events.subscribe(); - - let stream = BroadcastStream::new(rx).filter_map(|result| match result { - Ok((timestamp, event)) => Some(Ok::( - Event::default() - .id(timestamp.to_string()) - .event(event.operation()) - .data(event.url()), - )), - Err(_) => None, - }); - - return Ok(Sse::new(stream) - .keep_alive(Default::default()) - .into_response()); - } - if let Some(ref cursor) = params.cursor { if Timestamp::try_from(cursor.to_string()).is_err() { Err(Error::new( @@ -48,7 +32,57 @@ pub async fn feed( } } - let result = state.db.list_events(params.limit, params.cursor)?; + if params.subscribe { + // Get the cursor from the last-event-id header or cursor param. + let cursor = headers + .get("last-event-id") + .and_then(|h| h.to_str().ok().map(|s| s.to_string())) + .or(params.cursor); + + let initial_events = stream::iter(if let Some(cursor) = cursor { + state + .db + .list_events(params.limit, Some(cursor))? + .iter() + .map(|(timestamp, event)| { + Ok(Event::default() + .id(timestamp) + .event(event.operation()) + .data(event.url())) + }) + .collect::>() + } else { + vec![] + }); + + let live_events = + BroadcastStream::new(state.events.subscribe()).filter_map(|result| match result { + Ok((timestamp, event)) => Some(Ok::( + Event::default() + .id(timestamp.to_string()) + .event(event.operation()) + .data(event.url()), + )), + Err(_) => None, + }); + + let combined_stream = initial_events.chain(live_events); + + return Ok(Sse::new(combined_stream) + .keep_alive(Default::default()) + .into_response()); + } + + let events = state.db.list_events(params.limit, params.cursor)?; + + let mut result = events + .iter() + .map(|(_, e)| e.to_event_line()) + .collect::>(); + + if let Some(last) = events.last() { + result.push(format!("cursor: {}", last.0)) + }; Ok(Response::builder() .status(StatusCode::OK)