Skip to content

Commit

Permalink
Merge pull request #48 from pubky/feat/304
Browse files Browse the repository at this point in the history
Feat/304
  • Loading branch information
Nuhvi authored Oct 17, 2024
2 parents 3b7dec7 + 6592b87 commit c40f18b
Show file tree
Hide file tree
Showing 5 changed files with 204 additions and 38 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions pubky-homeserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ flume = "0.11.0"
futures-util = "0.3.30"
heed = "0.20.3"
hex = "0.4.3"
httpdate = "1.0.3"
libc = "0.2.159"
pkarr = { workspace = true }
postcard = { version = "1.0.8", features = ["alloc"] }
Expand All @@ -27,3 +28,6 @@ tower-http = { version = "0.5.2", features = ["cors", "trace"] }
tracing = "0.1.40"
tracing-subscriber = { version = "0.3.18", features = ["env-filter"] }
url = "2.5.2"

[dev-dependencies]
reqwest = "0.12.8"
36 changes: 25 additions & 11 deletions pubky-homeserver/src/database/tables/entries.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
use pkarr::PublicKey;
use postcard::{from_bytes, to_allocvec};
use serde::{Deserialize, Serialize};
use std::{fs::File, io::Read, path::PathBuf};
use std::{
fs::File,
io::{Read, Write},
path::PathBuf,
};
use tracing::instrument;

use heed::{
Expand Down Expand Up @@ -345,6 +349,14 @@ impl<'db> EntryWriter<'db> {
})
}

/// Same ase [EntryWriter::write_all] but returns a Result of a mutable reference of itself
/// to enable chaining with [Self::commit].
pub fn update(&mut self, chunk: &[u8]) -> Result<&mut Self, std::io::Error> {
self.write_all(chunk)?;

Ok(self)
}

/// Commit blob from the filesystem buffer to LMDB,
/// write the [Entry], and commit the write transaction.
pub fn commit(&self) -> anyhow::Result<Entry> {
Expand Down Expand Up @@ -432,8 +444,6 @@ impl<'db> std::io::Write for EntryWriter<'db> {

#[cfg(test)]
mod tests {
use std::io::Write;

use bytes::Bytes;
use pkarr::{mainline::Testnet, Keypair};

Expand All @@ -442,7 +452,7 @@ mod tests {
use super::DB;

#[tokio::test]
async fn entries() {
async fn entries() -> anyhow::Result<()> {
let mut db = DB::open(Config::test(&Testnet::new(0))).unwrap();

let keypair = Keypair::random();
Expand All @@ -451,9 +461,9 @@ mod tests {

let chunk = Bytes::from(vec![1, 2, 3, 4, 5]);

let mut entry_writer = db.write_entry(&public_key, path).unwrap();
entry_writer.write_all(&chunk).unwrap();
entry_writer.commit().unwrap();
db.write_entry(&public_key, path)?
.update(&chunk)?
.commit()?;

let rtxn = db.env.read_txn().unwrap();
let entry = db.get_entry(&rtxn, &public_key, path).unwrap().unwrap();
Expand All @@ -479,10 +489,12 @@ mod tests {
assert_eq!(blob, vec![1, 2, 3, 4, 5]);

rtxn.commit().unwrap();

Ok(())
}

#[tokio::test]
async fn chunked_entry() {
async fn chunked_entry() -> anyhow::Result<()> {
let mut db = DB::open(Config::test(&Testnet::new(0))).unwrap();

let keypair = Keypair::random();
Expand All @@ -491,9 +503,9 @@ mod tests {

let chunk = Bytes::from(vec![0; 1024 * 1024]);

let mut entry_writer = db.write_entry(&public_key, path).unwrap();
entry_writer.write_all(&chunk).unwrap();
entry_writer.commit().unwrap();
db.write_entry(&public_key, path)?
.update(&chunk)?
.commit()?;

let rtxn = db.env.read_txn().unwrap();
let entry = db.get_entry(&rtxn, &public_key, path).unwrap().unwrap();
Expand Down Expand Up @@ -522,5 +534,7 @@ mod tests {
assert_eq!(stats.overflow_pages, 0);

rtxn.commit().unwrap();

Ok(())
}
}
195 changes: 168 additions & 27 deletions pubky-homeserver/src/routes/public.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,14 @@
use axum::{
body::Body,
debug_handler,
extract::State,
http::{header, HeaderMap, HeaderValue, Response, StatusCode},
response::IntoResponse,
};
use futures_util::stream::StreamExt;
use httpdate::HttpDate;
use pkarr::PublicKey;
use std::io::Write;
use std::{io::Write, str::FromStr};
use tower_cookies::Cookies;

use crate::{
Expand Down Expand Up @@ -44,8 +46,10 @@ pub async fn put(
Ok(())
}

#[debug_handler]
pub async fn get(
State(state): State<AppState>,
headers: HeaderMap,
pubky: Pubky,
path: EntryPath,
params: ListQueryParams,
Expand Down Expand Up @@ -105,43 +109,79 @@ pub async fn get(
Ok(())
});

if let Some(entry) = entry_rx.recv_async().await? {
// TODO: add HEAD endpoint
// TODO: Enable seek API (range requests)
// TODO: Gzip? or brotli?

Ok(Response::builder()
.status(StatusCode::OK)
.header(header::CONTENT_LENGTH, entry.content_length())
.header(header::CONTENT_TYPE, entry.content_type())
.header(
header::ETAG,
format!("\"{}\"", entry.content_hash().to_hex()),
)
.body(Body::from_stream(chunks_rx.into_stream()))
.unwrap())
} else {
Err(Error::with_status(StatusCode::NOT_FOUND))?
}
get_entry(
headers,
entry_rx.recv_async().await?,
Some(Body::from_stream(chunks_rx.into_stream())),
)
}

pub async fn head(
State(state): State<AppState>,
headers: HeaderMap,
pubky: Pubky,
path: EntryPath,
) -> Result<impl IntoResponse> {
verify(path.as_str())?;

let rtxn = state.db.env.read_txn()?;

match state
.db
.get_entry(&rtxn, pubky.public_key(), path.as_str())?
.as_ref()
.map(HeaderMap::from)
{
Some(headers) => Ok(headers),
None => Err(Error::with_status(StatusCode::NOT_FOUND)),
get_entry(
headers,
state
.db
.get_entry(&rtxn, pubky.public_key(), path.as_str())?,
None,
)
}

pub fn get_entry(
headers: HeaderMap,
entry: Option<Entry>,
body: Option<Body>,
) -> Result<Response<Body>> {
if let Some(entry) = entry {
// TODO: Enable seek API (range requests)
// TODO: Gzip? or brotli?

let mut response = HeaderMap::from(&entry).into_response();

// Handle IF_MODIFIED_SINCE
if let Some(condition_http_date) = headers
.get(header::IF_MODIFIED_SINCE)
.and_then(|h| h.to_str().ok())
.and_then(|s| HttpDate::from_str(s).ok())
{
let entry_http_date: HttpDate = entry.timestamp().to_owned().into();

if condition_http_date >= entry_http_date {
*response.status_mut() = StatusCode::NOT_MODIFIED;
}
};

// Handle IF_NONE_MATCH
if let Some(str) = headers
.get(header::IF_NONE_MATCH)
.and_then(|h| h.to_str().ok())
{
let etag = format!("\"{}\"", entry.content_hash());
if str
.trim()
.split(',')
.collect::<Vec<_>>()
.contains(&etag.as_str())
{
*response.status_mut() = StatusCode::NOT_MODIFIED;
};
}

if let Some(body) = body {
*response.body_mut() = body;
};

Ok(response)
} else {
Err(Error::with_status(StatusCode::NOT_FOUND))?
}
}

Expand Down Expand Up @@ -237,3 +277,104 @@ impl From<&Entry> for HeaderMap {
headers
}
}

#[cfg(test)]
mod tests {
use axum::http::header;
use pkarr::{mainline::Testnet, Keypair};
use reqwest::{self, Method, StatusCode};

use crate::Homeserver;

#[tokio::test]
async fn if_last_modified() -> anyhow::Result<()> {
let testnet = Testnet::new(3);
let mut server = Homeserver::start_test(&testnet).await?;

let public_key = Keypair::random().public_key();

let data = &[1, 2, 3, 4, 5];

server
.database_mut()
.write_entry(&public_key, "pub/foo")?
.update(data)?
.commit()?;

let client = reqwest::Client::builder().build()?;

let url = format!("http://localhost:{}/{public_key}/pub/foo", server.port());

let response = client.request(Method::GET, &url).send().await?;

let response = client
.request(Method::GET, &url)
.header(
header::IF_MODIFIED_SINCE,
response.headers().get(header::LAST_MODIFIED).unwrap(),
)
.send()
.await?;

assert_eq!(response.status(), StatusCode::NOT_MODIFIED);

let response = client
.request(Method::HEAD, &url)
.header(
header::IF_MODIFIED_SINCE,
response.headers().get(header::LAST_MODIFIED).unwrap(),
)
.send()
.await?;

assert_eq!(response.status(), StatusCode::NOT_MODIFIED);

Ok(())
}

#[tokio::test]
async fn if_none_match() -> anyhow::Result<()> {
let testnet = Testnet::new(3);
let mut server = Homeserver::start_test(&testnet).await?;

let public_key = Keypair::random().public_key();

let data = &[1, 2, 3, 4, 5];

server
.database_mut()
.write_entry(&public_key, "pub/foo")?
.update(data)?
.commit()?;

let client = reqwest::Client::builder().build()?;

let url = format!("http://localhost:{}/{public_key}/pub/foo", server.port());

let response = client.request(Method::GET, &url).send().await?;

let response = client
.request(Method::GET, &url)
.header(
header::IF_NONE_MATCH,
response.headers().get(header::ETAG).unwrap(),
)
.send()
.await?;

assert_eq!(response.status(), StatusCode::NOT_MODIFIED);

let response = client
.request(Method::HEAD, &url)
.header(
header::IF_NONE_MATCH,
response.headers().get(header::ETAG).unwrap(),
)
.send()
.await?;

assert_eq!(response.status(), StatusCode::NOT_MODIFIED);

Ok(())
}
}
5 changes: 5 additions & 0 deletions pubky-homeserver/src/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,11 @@ impl Homeserver {
self.state.config.keypair().public_key()
}

#[cfg(test)]
pub(crate) fn database_mut(&mut self) -> &mut DB {
&mut self.state.db
}

// === Public Methods ===

/// Shutdown the server and wait for all tasks to complete.
Expand Down

0 comments on commit c40f18b

Please sign in to comment.