diff --git a/Cargo.lock b/Cargo.lock index f32cc60..8af27f7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1741,6 +1741,7 @@ dependencies = [ "reqwest", "serde", "tokio", + "tokio-stream", "toml", "tower-cookies", "tower-http", @@ -2556,6 +2557,18 @@ dependencies = [ "tokio", ] +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", + "tokio-util", +] + [[package]] name = "tokio-util" version = "0.7.12" diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml index 100093e..894dc84 100644 --- a/pubky-homeserver/Cargo.toml +++ b/pubky-homeserver/Cargo.toml @@ -28,6 +28,7 @@ tower-http = { version = "0.5.2", features = ["cors", "trace"] } tracing = "0.1.40" tracing-subscriber = { version = "0.3.18", features = ["env-filter"] } url = "2.5.2" +tokio-stream = { version = "0.1.16", features = ["sync"] } [dev-dependencies] reqwest = "0.12.8" diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs index 0079a4f..4a4897a 100644 --- a/pubky-homeserver/src/database/tables/entries.rs +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -36,12 +36,16 @@ impl DB { EntryWriter::new(self, public_key, path) } - pub fn delete_entry(&mut self, public_key: &PublicKey, path: &str) -> anyhow::Result { + pub fn delete_entry( + &mut self, + public_key: &PublicKey, + path: &str, + ) -> anyhow::Result<(bool, Option<(Timestamp, Event)>)> { let mut wtxn = self.env.write_txn()?; let key = format!("{public_key}/{path}"); - let deleted = if let Some(bytes) = self.tables.entries.get(&wtxn, &key)? { + let tuple = if let Some(bytes) = self.tables.entries.get(&wtxn, &key)? { let entry = Entry::deserialize(bytes)?; let mut deleted_chunks = false; @@ -62,28 +66,33 @@ impl DB { let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?; // create DELETE event - if path.starts_with("pub/") { + let sse_event = if path.starts_with("pub/") { let url = format!("pubky://{key}"); let event = Event::delete(&url); let value = event.serialize(); - let key = Timestamp::now().to_string(); + let timestamp = Timestamp::now(); + let key = timestamp.to_string(); self.tables.events.put(&mut wtxn, &key, &value)?; // TODO: delete events older than a threshold. // TODO: move to events.rs - } + // + Some((timestamp, event)) + } else { + None + }; - deleted_entry && deleted_chunks + (deleted_entry && deleted_chunks, sse_event) } else { - false + (false, None) }; wtxn.commit()?; - Ok(deleted) + Ok(tuple) } pub fn get_entry( @@ -359,7 +368,7 @@ impl<'db> EntryWriter<'db> { /// Commit blob from the filesystem buffer to LMDB, /// write the [Entry], and commit the write transaction. - pub fn commit(&self) -> anyhow::Result { + pub fn commit(&self) -> anyhow::Result<(Entry, Option<(Timestamp, Event)>)> { let hash = self.hasher.finalize(); let mut buffer = File::open(&self.buffer_path)?; @@ -404,7 +413,7 @@ impl<'db> EntryWriter<'db> { .put(&mut wtxn, &self.entry_key, &entry.serialize())?; // Write a public [Event]. - if self.is_public { + let sse_event = if self.is_public { let url = format!("pubky://{}", self.entry_key); let event = Event::put(&url); let value = event.serialize(); @@ -415,13 +424,17 @@ impl<'db> EntryWriter<'db> { // TODO: delete events older than a threshold. // TODO: move to events.rs - } + + Some((entry.timestamp, event)) + } else { + None + }; wtxn.commit()?; std::fs::remove_file(&self.buffer_path)?; - Ok(entry) + Ok((entry, sse_event)) } } diff --git a/pubky-homeserver/src/database/tables/events.rs b/pubky-homeserver/src/database/tables/events.rs index 76a4d46..f4814ee 100644 --- a/pubky-homeserver/src/database/tables/events.rs +++ b/pubky-homeserver/src/database/tables/events.rs @@ -33,7 +33,7 @@ impl Event { } pub fn serialize(&self) -> Vec { - to_allocvec(self).expect("Session::serialize") + to_allocvec(self).expect("Event::serialize") } pub fn deserialize(bytes: &[u8]) -> core::result::Result { @@ -57,6 +57,10 @@ impl Event { Event::Delete(_) => "DEL", } } + + pub fn into_event_line(self) -> String { + format!("{} {}", self.operation(), self.url()) + } } impl DB { @@ -85,10 +89,9 @@ impl DB { Some((timestamp, event_bytes)) => { let event = Event::deserialize(event_bytes)?; - let line = format!("{} {}", event.operation(), event.url()); next_cursor = timestamp.to_string(); - result.push(line); + result.push(event.into_event_line()); } None => break, }; diff --git a/pubky-homeserver/src/extractors.rs b/pubky-homeserver/src/extractors.rs index 779ce65..f5fe430 100644 --- a/pubky-homeserver/src/extractors.rs +++ b/pubky-homeserver/src/extractors.rs @@ -81,6 +81,7 @@ pub struct ListQueryParams { pub cursor: Option, pub reverse: bool, pub shallow: bool, + pub subscribe: bool, } #[async_trait] @@ -112,12 +113,14 @@ where Some(c.to_string()) } }); + let subscribe = params.contains_key("subscribe"); Ok(ListQueryParams { reverse, shallow, limit, cursor, + subscribe, }) } } diff --git a/pubky-homeserver/src/routes/feed.rs b/pubky-homeserver/src/routes/feed.rs index a54b8a5..d9a85ad 100644 --- a/pubky-homeserver/src/routes/feed.rs +++ b/pubky-homeserver/src/routes/feed.rs @@ -2,8 +2,13 @@ use axum::{ body::Body, extract::State, http::{header, Response, StatusCode}, - response::IntoResponse, + response::{ + sse::{Event, Sse}, + IntoResponse, + }, }; +use tokio_stream::{wrappers::BroadcastStream, StreamExt}; + use pubky_common::timestamp::Timestamp; use crate::{ @@ -16,6 +21,24 @@ pub async fn feed( State(state): State, params: ListQueryParams, ) -> Result { + if params.subscribe { + let rx = state.events.subscribe(); + + let stream = BroadcastStream::new(rx).filter_map(|result| match result { + Ok((timestamp, event)) => Some(Ok::( + Event::default() + .id(timestamp.to_string()) + .event(event.operation()) + .data(event.url()), + )), + Err(_) => None, + }); + + return Ok(Sse::new(stream) + .keep_alive(Default::default()) + .into_response()); + } + if let Some(ref cursor) = params.cursor { if Timestamp::try_from(cursor.to_string()).is_err() { Err(Error::new( diff --git a/pubky-homeserver/src/routes/public.rs b/pubky-homeserver/src/routes/public.rs index 3b9963f..a181649 100644 --- a/pubky-homeserver/src/routes/public.rs +++ b/pubky-homeserver/src/routes/public.rs @@ -39,7 +39,11 @@ pub async fn put( entry_writer.write_all(&chunk)?; } - let _entry = entry_writer.commit()?; + let (_entry, sse_event) = entry_writer.commit()?; + + if let Some(sse_event) = sse_event { + let _ = state.events.send(sse_event); + } // TODO: return relevant headers, like Etag? @@ -198,13 +202,17 @@ pub async fn delete( verify(path)?; // TODO: should we wrap this with `tokio::task::spawn_blocking` in case it takes too long? - let deleted = state.db.delete_entry(&public_key, path)?; + let (deleted, sse_event) = state.db.delete_entry(&public_key, path)?; if !deleted { // TODO: if the path ends with `/` return a `CONFLICT` error? return Err(Error::with_status(StatusCode::NOT_FOUND)); }; + if let Some(event) = sse_event { + let _ = state.events.send(event); + } + Ok(()) } diff --git a/pubky-homeserver/src/server.rs b/pubky-homeserver/src/server.rs index c3f8719..9b89c72 100644 --- a/pubky-homeserver/src/server.rs +++ b/pubky-homeserver/src/server.rs @@ -1,8 +1,8 @@ use std::{future::IntoFuture, net::SocketAddr}; use anyhow::{Error, Result}; -use pubky_common::auth::AuthVerifier; -use tokio::{net::TcpListener, signal, task::JoinSet}; +use pubky_common::{auth::AuthVerifier, timestamp::Timestamp}; +use tokio::{net::TcpListener, signal, sync::broadcast, task::JoinSet}; use tracing::{debug, info, warn}; use pkarr::{ @@ -10,7 +10,11 @@ use pkarr::{ PkarrClient, PkarrClientAsync, PublicKey, Settings, }; -use crate::{config::Config, database::DB, pkarr::publish_server_packet}; +use crate::{ + config::Config, + database::{tables::events::Event, DB}, + pkarr::publish_server_packet, +}; #[derive(Debug)] pub struct Homeserver { @@ -25,6 +29,7 @@ pub(crate) struct AppState { pub(crate) pkarr_client: PkarrClientAsync, pub(crate) config: Config, pub(crate) port: u16, + pub(crate) events: broadcast::Sender<(Timestamp, Event)>, } impl Homeserver { @@ -49,12 +54,15 @@ impl Homeserver { let port = listener.local_addr()?.port(); + let (tx, _) = broadcast::channel(1024); + let state = AppState { verifier: AuthVerifier::default(), db, pkarr_client, config: config.clone(), port, + events: tx, }; let app = crate::routes::create_app(state.clone());