Skip to content

Commit

Permalink
feat(homeserver): support cursor param in events/?subscribe
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Nov 3, 2024
1 parent 6e3d80c commit eb9cff4
Show file tree
Hide file tree
Showing 4 changed files with 60 additions and 28 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pubky-homeserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = "2.5.2"
tokio-stream = { version = "0.1.16", features = ["sync"] }
futures = "0.3.31"

[dev-dependencies]
reqwest = "0.12.8"
12 changes: 4 additions & 8 deletions pubky-homeserver/src/database/tables/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ impl Event {
}
}

pub fn into_event_line(self) -> String {
pub fn to_event_line(&self) -> String {
format!("{} {}", self.operation(), self.url())
}
}
Expand All @@ -72,7 +72,7 @@ impl DB {
&self,
limit: Option<u16>,
cursor: Option<String>,
) -> anyhow::Result<Vec<String>> {
) -> anyhow::Result<Vec<(String, Event)>> {
let txn = self.env.read_txn()?;

let limit = limit
Expand All @@ -81,7 +81,7 @@ impl DB {

let cursor = cursor.unwrap_or("0000000000000".to_string());

let mut result: Vec<String> = vec![];
let mut result: Vec<(String, Event)> = vec![];
let mut next_cursor = cursor.to_string();

for _ in 0..limit {
Expand All @@ -91,16 +91,12 @@ impl DB {

next_cursor = timestamp.to_string();

result.push(event.into_event_line());
result.push((timestamp.to_string(), event));
}
None => break,
};
}

if !result.is_empty() {
result.push(format!("cursor: {next_cursor}"))
}

txn.commit()?;

Ok(result)
Expand Down
74 changes: 54 additions & 20 deletions pubky-homeserver/src/routes/feed.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
use axum::{
body::Body,
extract::State,
http::{header, Response, StatusCode},
http::{header, HeaderMap, Response, StatusCode},
response::{
sse::{Event, Sse},
IntoResponse,
},
};
use futures::stream;
use tokio_stream::{wrappers::BroadcastStream, StreamExt};

use pubky_common::timestamp::Timestamp;
Expand All @@ -19,26 +20,9 @@ use crate::{

pub async fn feed(
State(state): State<AppState>,
headers: HeaderMap,
params: ListQueryParams,
) -> Result<impl IntoResponse> {
if params.subscribe {
let rx = state.events.subscribe();

let stream = BroadcastStream::new(rx).filter_map(|result| match result {
Ok((timestamp, event)) => Some(Ok::<Event, String>(
Event::default()
.id(timestamp.to_string())
.event(event.operation())
.data(event.url()),
)),
Err(_) => None,
});

return Ok(Sse::new(stream)
.keep_alive(Default::default())
.into_response());
}

if let Some(ref cursor) = params.cursor {
if Timestamp::try_from(cursor.to_string()).is_err() {
Err(Error::new(
Expand All @@ -48,7 +32,57 @@ pub async fn feed(
}
}

let result = state.db.list_events(params.limit, params.cursor)?;
if params.subscribe {
// Get the cursor from the last-event-id header or cursor param.
let cursor = headers
.get("last-event-id")
.and_then(|h| h.to_str().ok().map(|s| s.to_string()))
.or(params.cursor);

let initial_events = stream::iter(if let Some(cursor) = cursor {
state
.db
.list_events(params.limit, Some(cursor))?
.iter()
.map(|(timestamp, event)| {
Ok(Event::default()
.id(timestamp)
.event(event.operation())
.data(event.url()))
})
.collect::<Vec<_>>()
} else {
vec![]
});

let live_events =
BroadcastStream::new(state.events.subscribe()).filter_map(|result| match result {
Ok((timestamp, event)) => Some(Ok::<Event, String>(
Event::default()
.id(timestamp.to_string())
.event(event.operation())
.data(event.url()),
)),
Err(_) => None,
});

let combined_stream = initial_events.chain(live_events);

return Ok(Sse::new(combined_stream)
.keep_alive(Default::default())
.into_response());
}

let events = state.db.list_events(params.limit, params.cursor)?;

let mut result = events
.iter()
.map(|(_, e)| e.to_event_line())
.collect::<Vec<_>>();

if let Some(last) = events.last() {
result.push(format!("cursor: {}", last.0))
};

Ok(Response::builder()
.status(StatusCode::OK)
Expand Down

0 comments on commit eb9cff4

Please sign in to comment.