Skip to content

Commit

Permalink
feat(homeserver): add a subscribe query param to /events/ endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Nov 3, 2024
1 parent d1b2210 commit 6e3d80c
Show file tree
Hide file tree
Showing 8 changed files with 93 additions and 21 deletions.
13 changes: 13 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pubky-homeserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ tower-http = { version = "0.5.2", features = ["cors", "trace"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = "2.5.2"
tokio-stream = { version = "0.1.16", features = ["sync"] }

[dev-dependencies]
reqwest = "0.12.8"
37 changes: 25 additions & 12 deletions pubky-homeserver/src/database/tables/entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,12 +36,16 @@ impl DB {
EntryWriter::new(self, public_key, path)
}

pub fn delete_entry(&mut self, public_key: &PublicKey, path: &str) -> anyhow::Result<bool> {
pub fn delete_entry(
&mut self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<(bool, Option<(Timestamp, Event)>)> {
let mut wtxn = self.env.write_txn()?;

let key = format!("{public_key}/{path}");

let deleted = if let Some(bytes) = self.tables.entries.get(&wtxn, &key)? {
let tuple = if let Some(bytes) = self.tables.entries.get(&wtxn, &key)? {
let entry = Entry::deserialize(bytes)?;

let mut deleted_chunks = false;
Expand All @@ -62,28 +66,33 @@ impl DB {
let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?;

// create DELETE event
if path.starts_with("pub/") {
let sse_event = if path.starts_with("pub/") {
let url = format!("pubky://{key}");

let event = Event::delete(&url);
let value = event.serialize();

let key = Timestamp::now().to_string();
let timestamp = Timestamp::now();
let key = timestamp.to_string();

self.tables.events.put(&mut wtxn, &key, &value)?;

// TODO: delete events older than a threshold.
// TODO: move to events.rs
}
//
Some((timestamp, event))
} else {
None
};

deleted_entry && deleted_chunks
(deleted_entry && deleted_chunks, sse_event)
} else {
false
(false, None)
};

wtxn.commit()?;

Ok(deleted)
Ok(tuple)
}

pub fn get_entry(
Expand Down Expand Up @@ -359,7 +368,7 @@ impl<'db> EntryWriter<'db> {

/// Commit blob from the filesystem buffer to LMDB,
/// write the [Entry], and commit the write transaction.
pub fn commit(&self) -> anyhow::Result<Entry> {
pub fn commit(&self) -> anyhow::Result<(Entry, Option<(Timestamp, Event)>)> {
let hash = self.hasher.finalize();

let mut buffer = File::open(&self.buffer_path)?;
Expand Down Expand Up @@ -404,7 +413,7 @@ impl<'db> EntryWriter<'db> {
.put(&mut wtxn, &self.entry_key, &entry.serialize())?;

// Write a public [Event].
if self.is_public {
let sse_event = if self.is_public {
let url = format!("pubky://{}", self.entry_key);
let event = Event::put(&url);
let value = event.serialize();
Expand All @@ -415,13 +424,17 @@ impl<'db> EntryWriter<'db> {

// TODO: delete events older than a threshold.
// TODO: move to events.rs
}

Some((entry.timestamp, event))
} else {
None
};

wtxn.commit()?;

std::fs::remove_file(&self.buffer_path)?;

Ok(entry)
Ok((entry, sse_event))
}
}

Expand Down
9 changes: 6 additions & 3 deletions pubky-homeserver/src/database/tables/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl Event {
}

pub fn serialize(&self) -> Vec<u8> {
to_allocvec(self).expect("Session::serialize")
to_allocvec(self).expect("Event::serialize")
}

pub fn deserialize(bytes: &[u8]) -> core::result::Result<Self, postcard::Error> {
Expand All @@ -57,6 +57,10 @@ impl Event {
Event::Delete(_) => "DEL",
}
}

pub fn into_event_line(self) -> String {
format!("{} {}", self.operation(), self.url())
}
}

impl DB {
Expand Down Expand Up @@ -85,10 +89,9 @@ impl DB {
Some((timestamp, event_bytes)) => {
let event = Event::deserialize(event_bytes)?;

let line = format!("{} {}", event.operation(), event.url());
next_cursor = timestamp.to_string();

result.push(line);
result.push(event.into_event_line());
}
None => break,
};
Expand Down
3 changes: 3 additions & 0 deletions pubky-homeserver/src/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,7 @@ pub struct ListQueryParams {
pub cursor: Option<String>,
pub reverse: bool,
pub shallow: bool,
pub subscribe: bool,
}

#[async_trait]
Expand Down Expand Up @@ -112,12 +113,14 @@ where
Some(c.to_string())
}
});
let subscribe = params.contains_key("subscribe");

Ok(ListQueryParams {
reverse,
shallow,
limit,
cursor,
subscribe,
})
}
}
25 changes: 24 additions & 1 deletion pubky-homeserver/src/routes/feed.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,13 @@ use axum::{
body::Body,
extract::State,
http::{header, Response, StatusCode},
response::IntoResponse,
response::{
sse::{Event, Sse},
IntoResponse,
},
};
use tokio_stream::{wrappers::BroadcastStream, StreamExt};

use pubky_common::timestamp::Timestamp;

use crate::{
Expand All @@ -16,6 +21,24 @@ pub async fn feed(
State(state): State<AppState>,
params: ListQueryParams,
) -> Result<impl IntoResponse> {
if params.subscribe {
let rx = state.events.subscribe();

let stream = BroadcastStream::new(rx).filter_map(|result| match result {
Ok((timestamp, event)) => Some(Ok::<Event, String>(
Event::default()
.id(timestamp.to_string())
.event(event.operation())
.data(event.url()),
)),
Err(_) => None,
});

return Ok(Sse::new(stream)
.keep_alive(Default::default())
.into_response());
}

if let Some(ref cursor) = params.cursor {
if Timestamp::try_from(cursor.to_string()).is_err() {
Err(Error::new(
Expand Down
12 changes: 10 additions & 2 deletions pubky-homeserver/src/routes/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,11 @@ pub async fn put(
entry_writer.write_all(&chunk)?;
}

let _entry = entry_writer.commit()?;
let (_entry, sse_event) = entry_writer.commit()?;

if let Some(sse_event) = sse_event {
let _ = state.events.send(sse_event);
}

// TODO: return relevant headers, like Etag?

Expand Down Expand Up @@ -198,13 +202,17 @@ pub async fn delete(
verify(path)?;

// TODO: should we wrap this with `tokio::task::spawn_blocking` in case it takes too long?
let deleted = state.db.delete_entry(&public_key, path)?;
let (deleted, sse_event) = state.db.delete_entry(&public_key, path)?;

if !deleted {
// TODO: if the path ends with `/` return a `CONFLICT` error?
return Err(Error::with_status(StatusCode::NOT_FOUND));
};

if let Some(event) = sse_event {
let _ = state.events.send(event);
}

Ok(())
}

Expand Down
14 changes: 11 additions & 3 deletions pubky-homeserver/src/server.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
use std::{future::IntoFuture, net::SocketAddr};

use anyhow::{Error, Result};
use pubky_common::auth::AuthVerifier;
use tokio::{net::TcpListener, signal, task::JoinSet};
use pubky_common::{auth::AuthVerifier, timestamp::Timestamp};
use tokio::{net::TcpListener, signal, sync::broadcast, task::JoinSet};
use tracing::{debug, info, warn};

use pkarr::{
mainline::dht::{DhtSettings, Testnet},
PkarrClient, PkarrClientAsync, PublicKey, Settings,
};

use crate::{config::Config, database::DB, pkarr::publish_server_packet};
use crate::{
config::Config,
database::{tables::events::Event, DB},
pkarr::publish_server_packet,
};

#[derive(Debug)]
pub struct Homeserver {
Expand All @@ -25,6 +29,7 @@ pub(crate) struct AppState {
pub(crate) pkarr_client: PkarrClientAsync,
pub(crate) config: Config,
pub(crate) port: u16,
pub(crate) events: broadcast::Sender<(Timestamp, Event)>,
}

impl Homeserver {
Expand All @@ -49,12 +54,15 @@ impl Homeserver {

let port = listener.local_addr()?.port();

let (tx, _) = broadcast::channel(1024);

let state = AppState {
verifier: AuthVerifier::default(),
db,
pkarr_client,
config: config.clone(),
port,
events: tx,
};

let app = crate::routes::create_app(state.clone());
Expand Down

0 comments on commit 6e3d80c

Please sign in to comment.