diff --git a/Cargo.lock b/Cargo.lock index 52ce106a..68f3ab51 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -786,9 +786,9 @@ dependencies = [ [[package]] name = "futures" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "645c6916888f6cb6350d2550b80fb63e734897a8498abe35cfb732b6487804b0" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" dependencies = [ "futures-channel", "futures-core", @@ -801,9 +801,9 @@ dependencies = [ [[package]] name = "futures-channel" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "eac8f7d7865dcb88bd4373ab671c8cf4508703796caa2b1985a9ca867b3fcb78" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" dependencies = [ "futures-core", "futures-sink", @@ -811,15 +811,15 @@ dependencies = [ [[package]] name = "futures-core" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "dfc6580bb841c5a68e9ef15c77ccc837b40a7504914d52e47b8b0e9bbda25a1d" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" [[package]] name = "futures-executor" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a576fc72ae164fca6b9db127eaa9a9dda0d61316034f33a0a0d4eda41f02b01d" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" dependencies = [ "futures-core", "futures-task", @@ -828,15 +828,15 @@ dependencies = [ [[package]] name = "futures-io" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a44623e20b9681a318efdd71c299b6b222ed6f231972bfe2f224ebad6311f0c1" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" [[package]] name = "futures-macro" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" dependencies = [ "proc-macro2", "quote", @@ -845,21 +845,21 @@ dependencies = [ [[package]] name = "futures-sink" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9fb8e00e87438d937621c1c6269e53f536c14d3fbd6a042bb24879e57d474fb5" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" [[package]] name = "futures-task" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "38d84fa142264698cdce1a9f9172cf383a0c82de1bddcf3092901442c4097004" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" [[package]] name = "futures-util" -version = "0.3.30" +version = "0.3.31" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3d6401deb83407ab3da39eba7e33987a73c3df0c82b4bb5813ee871c19c41d48" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" dependencies = [ "futures-channel", "futures-core", @@ -1725,6 +1725,7 @@ dependencies = [ "futures-util", "heed", "hex", + "libc", "pkarr", "postcard", "pubky-common", diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml index ea55cf25..bb1a908b 100644 --- a/pubky-homeserver/Cargo.toml +++ b/pubky-homeserver/Cargo.toml @@ -15,6 +15,7 @@ flume = "0.11.0" futures-util = "0.3.30" heed = "0.20.3" hex = "0.4.3" +libc = "0.2.159" pkarr = { workspace = true } postcard = { version = "1.0.8", features = ["alloc"] } pubky-common = { version = "0.1.0", path = "../pubky-common" } diff --git a/pubky-homeserver/src/database.rs b/pubky-homeserver/src/database.rs index e95d1d28..8ea4f480 100644 --- a/pubky-homeserver/src/database.rs +++ b/pubky-homeserver/src/database.rs @@ -1,4 +1,4 @@ -use std::fs; +use std::{fs, path::PathBuf}; use heed::{Env, EnvOpenOptions}; @@ -14,11 +14,17 @@ pub struct DB { pub(crate) env: Env, pub(crate) tables: Tables, pub(crate) config: Config, + pub(crate) buffers_dir: PathBuf, + pub(crate) max_chunk_size: usize, } impl DB { pub fn open(config: Config) -> anyhow::Result { - fs::create_dir_all(config.storage())?; + let buffers_dir = config.storage().clone().join("buffers"); + + // Cleanup buffers. + let _ = fs::remove_dir(&buffers_dir); + fs::create_dir_all(&buffers_dir)?; let env = unsafe { EnvOpenOptions::new() @@ -33,46 +39,25 @@ impl DB { env, tables, config, + buffers_dir, + max_chunk_size: max_chunk_size(), }; Ok(db) } } -#[cfg(test)] -mod tests { - use bytes::Bytes; - use pkarr::{mainline::Testnet, Keypair}; - - use crate::config::Config; - - use super::DB; - - #[tokio::test] - async fn entries() { - let db = DB::open(Config::test(&Testnet::new(0))).unwrap(); - - let keypair = Keypair::random(); - let path = "/pub/foo.txt"; - - let (tx, rx) = flume::bounded::(0); - - let mut cloned = db.clone(); - let cloned_keypair = keypair.clone(); - - let done = tokio::task::spawn_blocking(move || { - cloned - .put_entry(&cloned_keypair.public_key(), path, rx) - .unwrap(); - }); - - tx.send(vec![1, 2, 3, 4, 5].into()).unwrap(); - drop(tx); - - done.await.unwrap(); - - let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap(); - - assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5])); - } +/// calculate optimal chunk size: +/// - https://lmdb.readthedocs.io/en/release/#storage-efficiency-limits +/// - https://github.com/lmdbjava/benchmarks/blob/master/results/20160710/README.md#test-2-determine-24816-kb-byte-values +fn max_chunk_size() -> usize { + let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize }; + + // - 16 bytes Header per page (LMDB) + // - Each page has to contain 2 records + // - 8 bytes per record (LMDB) (imperically, it seems to be 10 not 8) + // - 12 bytes key: + // - timestamp : 8 bytes + // - chunk index: 4 bytes + ((page_size - 16) / 2) - (8 + 2) - 12 } diff --git a/pubky-homeserver/src/database/tables/blobs.rs b/pubky-homeserver/src/database/tables/blobs.rs index c430a588..18ec7242 100644 --- a/pubky-homeserver/src/database/tables/blobs.rs +++ b/pubky-homeserver/src/database/tables/blobs.rs @@ -1,38 +1,24 @@ -use heed::{types::Bytes, Database}; -use pkarr::PublicKey; +use heed::{types::Bytes, Database, RoTxn}; use crate::database::DB; use super::entries::Entry; -/// hash of the blob => bytes. +/// (entry timestamp | chunk_index BE) => bytes pub type BlobsTable = Database; pub const BLOBS_TABLE: &str = "blobs"; impl DB { - pub fn get_blob( + pub fn read_entry_content<'txn>( &self, - public_key: &PublicKey, - path: &str, - ) -> anyhow::Result> { - let rtxn = self.env.read_txn()?; - - let key = format!("{public_key}/{path}"); - - let result = if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? { - let entry = Entry::deserialize(bytes)?; - - self.tables - .blobs - .get(&rtxn, entry.content_hash())? - .map(|blob| bytes::Bytes::from(blob[8..].to_vec())) - } else { - None - }; - - rtxn.commit()?; - - Ok(result) + rtxn: &'txn RoTxn, + entry: &Entry, + ) -> anyhow::Result> + 'txn> { + Ok(self + .tables + .blobs + .prefix_iter(rtxn, &entry.timestamp().to_bytes())? + .map(|i| i.map(|(_, bytes)| bytes))) } } diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs index b1c70392..18995780 100644 --- a/pubky-homeserver/src/database/tables/entries.rs +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -1,6 +1,7 @@ use pkarr::PublicKey; use postcard::{from_bytes, to_allocvec}; use serde::{Deserialize, Serialize}; +use std::{fs::File, io::Read, path::PathBuf}; use tracing::instrument; use heed::{ @@ -23,74 +24,12 @@ pub type EntriesTable = Database; pub const ENTRIES_TABLE: &str = "entries"; impl DB { - pub fn put_entry( + pub fn write_entry( &mut self, public_key: &PublicKey, path: &str, - rx: flume::Receiver, - ) -> anyhow::Result<()> { - let mut wtxn = self.env.write_txn()?; - - let mut hasher = Hasher::new(); - let mut bytes = vec![]; - let mut length = 0; - - while let Ok(chunk) = rx.recv() { - hasher.update(&chunk); - bytes.extend_from_slice(&chunk); - length += chunk.len(); - } - - let hash = hasher.finalize(); - - let key = hash.as_bytes(); - - let mut bytes_with_ref_count = Vec::with_capacity(bytes.len() + 8); - bytes_with_ref_count.extend_from_slice(&u64::to_be_bytes(0)); - bytes_with_ref_count.extend_from_slice(&bytes); - - // TODO: For now, we set the first 8 bytes to a reference counter - let exists = self - .tables - .blobs - .get(&wtxn, key)? - .unwrap_or(bytes_with_ref_count.as_slice()); - - let new_count = u64::from_be_bytes(exists[0..8].try_into().unwrap()) + 1; - - bytes_with_ref_count[0..8].copy_from_slice(&u64::to_be_bytes(new_count)); - - self.tables - .blobs - .put(&mut wtxn, hash.as_bytes(), &bytes_with_ref_count)?; - - let mut entry = Entry::new(); - - entry.set_content_hash(hash); - entry.set_content_length(length); - - let key = format!("{public_key}/{path}"); - - self.tables - .entries - .put(&mut wtxn, &key, &entry.serialize())?; - - if path.starts_with("pub/") { - let url = format!("pubky://{key}"); - let event = Event::put(&url); - let value = event.serialize(); - - let key = entry.timestamp.to_string(); - - self.tables.events.put(&mut wtxn, &key, &value)?; - - // TODO: delete older events. - // TODO: move to events.rs - } - - wtxn.commit()?; - - Ok(()) + ) -> anyhow::Result { + EntryWriter::new(self, public_key, path) } pub fn delete_entry(&mut self, public_key: &PublicKey, path: &str) -> anyhow::Result { @@ -101,28 +40,20 @@ impl DB { let deleted = if let Some(bytes) = self.tables.entries.get(&wtxn, &key)? { let entry = Entry::deserialize(bytes)?; - let mut bytes_with_ref_count = self - .tables - .blobs - .get(&wtxn, entry.content_hash())? - .map_or(vec![], |s| s.to_vec()); - - let arr: [u8; 8] = bytes_with_ref_count[0..8].try_into().unwrap_or([0; 8]); - let reference_count = u64::from_be_bytes(arr); + let mut deleted_chunks = false; - let deleted_blobs = if reference_count > 1 { - // decrement reference count - - bytes_with_ref_count[0..8].copy_from_slice(&(reference_count - 1).to_be_bytes()); - - self.tables + { + let mut iter = self + .tables .blobs - .put(&mut wtxn, entry.content_hash(), &bytes_with_ref_count)?; + .prefix_iter_mut(&mut wtxn, &entry.timestamp.to_bytes())?; - true - } else { - self.tables.blobs.delete(&mut wtxn, entry.content_hash())? - }; + while iter.next().is_some() { + unsafe { + deleted_chunks = iter.del_current()?; + } + } + } let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?; @@ -137,11 +68,11 @@ impl DB { self.tables.events.put(&mut wtxn, &key, &value)?; - // TODO: delete older events. + // TODO: delete events older than a threshold. // TODO: move to events.rs } - deleted_entry && deleted_blobs + deleted_entry && deleted_chunks } else { false }; @@ -151,6 +82,21 @@ impl DB { Ok(deleted) } + pub fn get_entry( + &self, + txn: &RoTxn, + public_key: &PublicKey, + path: &str, + ) -> anyhow::Result> { + let key = format!("{public_key}/{path}"); + + if let Some(bytes) = self.tables.entries.get(txn, &key)? { + return Ok(Some(Entry::deserialize(bytes)?)); + } + + Ok(None) + } + pub fn contains_directory(&self, txn: &RoTxn, path: &str) -> anyhow::Result { Ok(self.tables.entries.get_greater_than(txn, path)?.is_some()) } @@ -268,13 +214,40 @@ pub struct Entry { version: usize, /// Modified at timestamp: Timestamp, - content_hash: [u8; 32], + content_hash: EntryHash, content_length: usize, content_type: String, // user_metadata: ? } -// TODO: get headers like Etag +#[derive(Clone, Debug, Eq, PartialEq)] +struct EntryHash(Hash); + +impl Default for EntryHash { + fn default() -> Self { + Self(Hash::from_bytes([0; 32])) + } +} + +impl Serialize for EntryHash { + fn serialize(&self, serializer: S) -> Result + where + S: serde::Serializer, + { + let bytes = self.0.as_bytes(); + bytes.serialize(serializer) + } +} + +impl<'de> Deserialize<'de> for EntryHash { + fn deserialize(deserializer: D) -> Result + where + D: serde::Deserializer<'de>, + { + let bytes: [u8; 32] = Deserialize::deserialize(deserializer)?; + Ok(Self(Hash::from_bytes(bytes))) + } +} impl Entry { pub fn new() -> Self { @@ -283,8 +256,13 @@ impl Entry { // === Setters === + pub fn set_timestamp(&mut self, timestamp: &Timestamp) -> &mut Self { + self.timestamp = timestamp.clone(); + self + } + pub fn set_content_hash(&mut self, content_hash: Hash) -> &mut Self { - content_hash.as_bytes().clone_into(&mut self.content_hash); + EntryHash(content_hash).clone_into(&mut self.content_hash); self } @@ -295,12 +273,32 @@ impl Entry { // === Getters === - pub fn content_hash(&self) -> &[u8; 32] { - &self.content_hash + pub fn timestamp(&self) -> &Timestamp { + &self.timestamp + } + + pub fn content_hash(&self) -> &Hash { + &self.content_hash.0 + } + + pub fn content_length(&self) -> usize { + self.content_length + } + + pub fn content_type(&self) -> &str { + &self.content_type } // === Public Method === + pub fn read_content<'txn>( + &self, + db: &'txn DB, + rtxn: &'txn RoTxn, + ) -> anyhow::Result> + 'txn> { + db.read_entry_content(rtxn, self) + } + pub fn serialize(&self) -> Vec { to_allocvec(self).expect("Session::serialize") } @@ -313,3 +311,216 @@ impl Entry { from_bytes(bytes) } } + +pub struct EntryWriter<'db> { + db: &'db DB, + buffer: File, + hasher: Hasher, + buffer_path: PathBuf, + entry_key: String, + timestamp: Timestamp, + is_public: bool, +} + +impl<'db> EntryWriter<'db> { + pub fn new(db: &'db DB, public_key: &PublicKey, path: &str) -> anyhow::Result { + let hasher = Hasher::new(); + + let timestamp = Timestamp::now(); + + let buffer_path = db.buffers_dir.join(timestamp.to_string()); + + let buffer = File::create(&buffer_path)?; + + let entry_key = format!("{public_key}/{path}"); + + Ok(Self { + db, + buffer, + hasher, + buffer_path, + entry_key, + timestamp, + is_public: path.starts_with("pub/"), + }) + } + + /// Commit blob from the filesystem buffer to LMDB, + /// write the [Entry], and commit the write transaction. + pub fn commit(&self) -> anyhow::Result { + let hash = self.hasher.finalize(); + + let mut buffer = File::open(&self.buffer_path)?; + + let mut wtxn = self.db.env.write_txn()?; + + let mut chunk_key = [0; 12]; + chunk_key[0..8].copy_from_slice(&self.timestamp.to_bytes()); + + let mut chunk_index: u32 = 0; + + loop { + let mut chunk = vec![0_u8; self.db.max_chunk_size]; + + let bytes_read = buffer.read(&mut chunk)?; + + if bytes_read == 0 { + break; // EOF reached + } + + chunk_key[8..].copy_from_slice(&chunk_index.to_be_bytes()); + + self.db + .tables + .blobs + .put(&mut wtxn, &chunk_key, &chunk[..bytes_read])?; + + chunk_index += 1; + } + + let mut entry = Entry::new(); + entry.set_timestamp(&self.timestamp); + + entry.set_content_hash(hash); + + let length = buffer.metadata()?.len(); + entry.set_content_length(length as usize); + + self.db + .tables + .entries + .put(&mut wtxn, &self.entry_key, &entry.serialize())?; + + // Write a public [Event]. + if self.is_public { + let url = format!("pubky://{}", self.entry_key); + let event = Event::put(&url); + let value = event.serialize(); + + let key = entry.timestamp.to_string(); + + self.db.tables.events.put(&mut wtxn, &key, &value)?; + + // TODO: delete events older than a threshold. + // TODO: move to events.rs + } + + wtxn.commit()?; + + std::fs::remove_file(&self.buffer_path)?; + + Ok(entry) + } +} + +impl<'db> std::io::Write for EntryWriter<'db> { + /// Write a chunk to a Filesystem based buffer. + #[inline] + fn write(&mut self, chunk: &[u8]) -> std::io::Result { + self.hasher.update(chunk); + self.buffer.write_all(chunk)?; + + Ok(chunk.len()) + } + + /// Does not do anything, you need to call [Self::commit] + #[inline] + fn flush(&mut self) -> std::io::Result<()> { + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use std::io::Write; + + use bytes::Bytes; + use pkarr::{mainline::Testnet, Keypair}; + + use crate::config::Config; + + use super::DB; + + #[tokio::test] + async fn entries() { + let mut db = DB::open(Config::test(&Testnet::new(0))).unwrap(); + + let keypair = Keypair::random(); + let public_key = keypair.public_key(); + let path = "/pub/foo.txt"; + + let chunk = Bytes::from(vec![1, 2, 3, 4, 5]); + + let mut entry_writer = db.write_entry(&public_key, path).unwrap(); + entry_writer.write_all(&chunk).unwrap(); + entry_writer.commit().unwrap(); + + let rtxn = db.env.read_txn().unwrap(); + let entry = db.get_entry(&rtxn, &public_key, path).unwrap().unwrap(); + + assert_eq!( + entry.content_hash(), + &[ + 2, 79, 103, 192, 66, 90, 61, 192, 47, 186, 245, 140, 185, 61, 229, 19, 46, 61, 117, + 197, 25, 250, 160, 186, 218, 33, 73, 29, 136, 201, 112, 87 + ] + ); + + let mut blob = vec![]; + + { + let mut iter = entry.read_content(&db, &rtxn).unwrap(); + + while let Some(Ok(chunk)) = iter.next() { + blob.extend_from_slice(&chunk); + } + } + + assert_eq!(blob, vec![1, 2, 3, 4, 5]); + + rtxn.commit().unwrap(); + } + + #[tokio::test] + async fn chunked_entry() { + let mut db = DB::open(Config::test(&Testnet::new(0))).unwrap(); + + let keypair = Keypair::random(); + let public_key = keypair.public_key(); + let path = "/pub/foo.txt"; + + let chunk = Bytes::from(vec![0; 1024 * 1024]); + + let mut entry_writer = db.write_entry(&public_key, path).unwrap(); + entry_writer.write_all(&chunk).unwrap(); + entry_writer.commit().unwrap(); + + let rtxn = db.env.read_txn().unwrap(); + let entry = db.get_entry(&rtxn, &public_key, path).unwrap().unwrap(); + + assert_eq!( + entry.content_hash(), + &[ + 72, 141, 226, 2, 247, 59, 217, 118, 222, 78, 112, 72, 244, 225, 243, 154, 119, 109, + 134, 213, 130, 183, 52, 143, 245, 59, 244, 50, 185, 135, 252, 168 + ] + ); + + let mut blob = vec![]; + + { + let mut iter = entry.read_content(&db, &rtxn).unwrap(); + + while let Some(Ok(chunk)) = iter.next() { + blob.extend_from_slice(&chunk); + } + } + + assert_eq!(blob, vec![0; 1024 * 1024]); + + let stats = db.tables.blobs.stat(&rtxn).unwrap(); + assert_eq!(stats.overflow_pages, 0); + + rtxn.commit().unwrap(); + } +} diff --git a/pubky-homeserver/src/error.rs b/pubky-homeserver/src/error.rs index 8aa58d27..1a8f8a02 100644 --- a/pubky-homeserver/src/error.rs +++ b/pubky-homeserver/src/error.rs @@ -5,6 +5,7 @@ use axum::{ http::StatusCode, response::IntoResponse, }; +use tokio::task::JoinError; use tracing::debug; pub type Result = core::result::Result; @@ -126,3 +127,24 @@ impl From> for Error { Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into()) } } + +impl From for Error { + fn from(error: flume::RecvError) -> Self { + debug!(?error); + Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into()) + } +} + +impl From for Error { + fn from(error: JoinError) -> Self { + debug!(?error); + Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into()) + } +} + +impl From for Error { + fn from(error: axum::http::Error) -> Self { + debug!(?error); + Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into()) + } +} diff --git a/pubky-homeserver/src/routes/auth.rs b/pubky-homeserver/src/routes/auth.rs index a4f36fcd..1075f488 100644 --- a/pubky-homeserver/src/routes/auth.rs +++ b/pubky-homeserver/src/routes/auth.rs @@ -1,5 +1,4 @@ use axum::{ - debug_handler, extract::{Host, State}, http::StatusCode, response::IntoResponse, @@ -20,7 +19,6 @@ use crate::{ server::AppState, }; -#[debug_handler] pub async fn signup( State(state): State, user_agent: Option>, diff --git a/pubky-homeserver/src/routes/public.rs b/pubky-homeserver/src/routes/public.rs index 8c6b2b97..bd61e237 100644 --- a/pubky-homeserver/src/routes/public.rs +++ b/pubky-homeserver/src/routes/public.rs @@ -1,14 +1,16 @@ use axum::{ - body::{Body, Bytes}, + body::Body, extract::State, http::{header, Response, StatusCode}, response::IntoResponse, }; use futures_util::stream::StreamExt; use pkarr::PublicKey; +use std::io::Write; use tower_cookies::Cookies; use crate::{ + database::tables::entries::Entry, error::{Error, Result}, extractors::{EntryPath, ListQueryParams, Pubky}, server::AppState, @@ -22,37 +24,20 @@ pub async fn put( body: Body, ) -> Result { let public_key = pubky.public_key().clone(); - let path = path.as_str(); - - verify(path)?; - authorize(&mut state, cookies, &public_key, path)?; - - let mut stream = body.into_data_stream(); - - let (tx, rx) = flume::bounded::(1); - - let path = path.to_string(); + let path = path.as_str().to_string(); - // TODO: refactor Database to clean up this scope. - let done = tokio::task::spawn_blocking(move || -> Result<()> { - // TODO: this is a blocking operation, which is ok for small - // payloads (we have 16 kb limit for now) but later we need - // to stream this to filesystem, and keep track of any failed - // writes to GC these files later. + verify(&path)?; + authorize(&mut state, cookies, &public_key, &path)?; - state.db.put_entry(&public_key, &path, rx)?; - - Ok(()) - }); + let mut entry_writer = state.db.write_entry(&public_key, &path)?; + let mut stream = body.into_data_stream(); while let Some(next) = stream.next().await { let chunk = next?; - - tx.send(chunk)?; + entry_writer.write_all(&chunk)?; } - drop(tx); - done.await.expect("join error")?; + let _entry = entry_writer.commit()?; // TODO: return relevant headers, like Etag? @@ -66,9 +51,8 @@ pub async fn get( params: ListQueryParams, ) -> Result { verify(path.as_str())?; - let public_key = pubky.public_key(); - - let path = path.as_str(); + let public_key = pubky.public_key().clone(); + let path = path.as_str().to_string(); if path.ends_with('/') { let txn = state.db.env.read_txn()?; @@ -95,16 +79,49 @@ pub async fn get( return Ok(Response::builder() .status(StatusCode::OK) .header(header::CONTENT_TYPE, "text/plain") - .body(Body::from(vec.join("\n"))) - .unwrap()); + .body(Body::from(vec.join("\n")))?); } - // TODO: Enable streaming + let (entry_tx, entry_rx) = flume::bounded::>(1); + let (chunks_tx, chunks_rx) = flume::unbounded::, heed::Error>>(); + + tokio::task::spawn_blocking(move || -> anyhow::Result<()> { + let rtxn = state.db.env.read_txn()?; + + let option = state.db.get_entry(&rtxn, &public_key, &path)?; + + if let Some(entry) = option { + let iter = entry.read_content(&state.db, &rtxn)?; - match state.db.get_blob(public_key, path) { - Err(error) => Err(error)?, - Ok(Some(bytes)) => Ok(Response::builder().body(Body::from(bytes)).unwrap()), - Ok(None) => Err(Error::new(StatusCode::NOT_FOUND, "File Not Found".into())), + entry_tx.send(Some(entry))?; + + for next in iter { + chunks_tx.send(next.map(|b| b.to_vec()))?; + } + }; + + entry_tx.send(None)?; + + Ok(()) + }); + + if let Some(entry) = entry_rx.recv_async().await? { + // TODO: add HEAD endpoint + // TODO: Enable seek API (range requests) + // TODO: Gzip? or brotli? + + Ok(Response::builder() + .status(StatusCode::OK) + .header(header::CONTENT_LENGTH, entry.content_length()) + .header(header::CONTENT_TYPE, entry.content_type()) + .header( + header::ETAG, + format!("\"{}\"", entry.content_hash().to_hex()), + ) + .body(Body::from_stream(chunks_rx.into_stream())) + .unwrap()) + } else { + Err(Error::with_status(StatusCode::NOT_FOUND))? } } @@ -120,14 +137,13 @@ pub async fn delete( authorize(&mut state, cookies, &public_key, path)?; verify(path)?; + // TODO: should we wrap this with `tokio::task::spawn_blocking` in case it takes too long? let deleted = state.db.delete_entry(&public_key, path)?; if !deleted { // TODO: if the path ends with `/` return a `CONFLICT` error? return Err(Error::with_status(StatusCode::NOT_FOUND)); - } - - // TODO: return relevant headers, like Etag? + }; Ok(()) } diff --git a/pubky/src/shared/public.rs b/pubky/src/shared/public.rs index 21d3cbc6..81118f78 100644 --- a/pubky/src/shared/public.rs +++ b/pubky/src/shared/public.rs @@ -98,6 +98,7 @@ mod tests { use crate::*; + use bytes::Bytes; use pkarr::{mainline::Testnet, Keypair}; use pubky_homeserver::Homeserver; use reqwest::{Method, StatusCode}; @@ -762,8 +763,9 @@ mod tests { ); } - let get = client.get(url.as_str()).await.unwrap(); - dbg!(get); + let resolved = client.get(url.as_str()).await.unwrap().unwrap(); + + assert_eq!(&resolved[..], &[0]); } #[tokio::test] @@ -818,4 +820,35 @@ mod tests { ] ) } + + #[tokio::test] + async fn stream() { + // TODO: test better streaming API + + let testnet = Testnet::new(10); + let server = Homeserver::start_test(&testnet).await.unwrap(); + + let client = PubkyClient::test(&testnet); + + let keypair = Keypair::random(); + + client.signup(&keypair, &server.public_key()).await.unwrap(); + + let url = format!("pubky://{}/pub/foo.txt", keypair.public_key()); + let url = url.as_str(); + + let bytes = Bytes::from(vec![0; 1024 * 1024]); + + client.put(url, &bytes).await.unwrap(); + + let response = client.get(url).await.unwrap().unwrap(); + + assert_eq!(response, bytes); + + client.delete(url).await.unwrap(); + + let response = client.get(url).await.unwrap(); + + assert_eq!(response, None); + } }