From cc97744f25c8a8e479f08b9231905f5cf6e4ddda Mon Sep 17 00:00:00 2001 From: nazeh Date: Tue, 23 Jul 2024 21:15:41 +0300 Subject: [PATCH] feat(pubky): get successful --- Cargo.lock | 1 + pubky-homeserver/src/database.rs | 113 +++++++++++++++++- pubky-homeserver/src/database/migrations.rs | 10 +- .../src/database/migrations/m0.rs | 2 +- pubky-homeserver/src/database/tables.rs | 26 ++++ .../src/database/tables/entries.rs | 2 +- pubky-homeserver/src/extractors.rs | 4 + pubky-homeserver/src/routes/public.rs | 80 ++----------- pubky/src/client/public.rs | 29 ++--- 9 files changed, 167 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74ef877..613c9c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1250,6 +1250,7 @@ dependencies = [ name = "pubky" version = "0.1.0" dependencies = [ + "bytes", "flume", "pkarr", "pubky-common", diff --git a/pubky-homeserver/src/database.rs b/pubky-homeserver/src/database.rs index 0eb3200..5bbe6c2 100644 --- a/pubky-homeserver/src/database.rs +++ b/pubky-homeserver/src/database.rs @@ -1,16 +1,23 @@ use std::fs; use std::path::Path; +use bytes::Bytes; use heed::{types::Str, Database, Env, EnvOpenOptions, RwTxn}; mod migrations; pub mod tables; -use migrations::TABLES_COUNT; +use pubky_common::crypto::Hasher; + +use tables::{entries::Entry, Tables, TABLES_COUNT}; + +use pkarr::PublicKey; +use tables::blobs::{BlobsTable, BLOBS_TABLE}; #[derive(Debug, Clone)] pub struct DB { pub(crate) env: Env, + pub(crate) tables: Tables, } impl DB { @@ -19,10 +26,110 @@ impl DB { let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?; - migrations::run(&env); + let tables = migrations::run(&env)?; - let db = DB { env }; + let db = DB { env, tables }; Ok(db) } + + pub fn put_entry( + &mut self, + public_key: &PublicKey, + path: &str, + rx: flume::Receiver, + ) -> anyhow::Result<()> { + let mut wtxn = self.env.write_txn()?; + + let mut hasher = Hasher::new(); + let mut bytes = vec![]; + let mut length = 0; + + while let Ok(chunk) = rx.recv() { + hasher.update(&chunk); + bytes.extend_from_slice(&chunk); + length += chunk.len(); + } + + let hash = hasher.finalize(); + + self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?; + + let mut entry = Entry::new(); + + entry.set_content_hash(hash); + entry.set_content_length(length); + + let mut key = vec![]; + key.extend_from_slice(public_key.as_bytes()); + key.extend_from_slice(path.as_bytes()); + + self.tables.entries.put(&mut wtxn, &key, &entry.serialize()); + + wtxn.commit()?; + + Ok(()) + } + + pub fn get_blob( + &mut self, + public_key: &PublicKey, + path: &str, + ) -> anyhow::Result> { + let mut rtxn = self.env.read_txn()?; + + let mut key = vec![]; + key.extend_from_slice(public_key.as_bytes()); + key.extend_from_slice(path.as_bytes()); + + if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? { + let entry = Entry::deserialize(bytes)?; + + if let Some(blob) = self.tables.blobs.get(&rtxn, entry.content_hash())? { + return Ok(Some(Bytes::from(blob.to_vec()))); + }; + }; + + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use pkarr::Keypair; + use pubky_common::timestamp::Timestamp; + + use crate::config::Config; + + use super::{Bytes, DB}; + + #[tokio::test] + async fn entries() { + let storage = std::env::temp_dir() + .join(Timestamp::now().to_string()) + .join("pubky"); + + let mut db = DB::open(&storage).unwrap(); + + let keypair = Keypair::random(); + let path = "/pub/foo.txt"; + + let (tx, rx) = flume::bounded::(0); + + let mut cloned = db.clone(); + let cloned_keypair = keypair.clone(); + + let done = tokio::task::spawn_blocking(move || { + cloned.put_entry(&cloned_keypair.public_key(), path, rx); + }); + + tx.send(vec![1, 2, 3, 4, 5].into()); + drop(tx); + + done.await; + + let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap(); + + assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5])); + } } diff --git a/pubky-homeserver/src/database/migrations.rs b/pubky-homeserver/src/database/migrations.rs index dbead07..32f2909 100644 --- a/pubky-homeserver/src/database/migrations.rs +++ b/pubky-homeserver/src/database/migrations.rs @@ -2,16 +2,16 @@ use heed::{types::Str, Database, Env, RwTxn}; mod m0; -use super::tables; +use super::tables::Tables; -pub const TABLES_COUNT: u32 = 4; - -pub fn run(env: &Env) -> anyhow::Result<()> { +pub fn run(env: &Env) -> anyhow::Result { let mut wtxn = env.write_txn()?; m0::run(env, &mut wtxn); + let tables = Tables::new(env, &mut wtxn)?; + wtxn.commit()?; - Ok(()) + Ok(tables) } diff --git a/pubky-homeserver/src/database/migrations/m0.rs b/pubky-homeserver/src/database/migrations/m0.rs index 74d89c4..d7dac79 100644 --- a/pubky-homeserver/src/database/migrations/m0.rs +++ b/pubky-homeserver/src/database/migrations/m0.rs @@ -1,6 +1,6 @@ use heed::{types::Str, Database, Env, RwTxn}; -use super::tables::{blobs, entries, sessions, users}; +use crate::database::tables::{blobs, entries, sessions, users}; pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> { let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?; diff --git a/pubky-homeserver/src/database/tables.rs b/pubky-homeserver/src/database/tables.rs index 4f0c1c5..a019fbe 100644 --- a/pubky-homeserver/src/database/tables.rs +++ b/pubky-homeserver/src/database/tables.rs @@ -2,3 +2,29 @@ pub mod blobs; pub mod entries; pub mod sessions; pub mod users; + +use heed::{Env, RwTxn}; + +use blobs::{BlobsTable, BLOBS_TABLE}; +use entries::{EntriesTable, ENTRIES_TABLE}; + +pub const TABLES_COUNT: u32 = 4; + +#[derive(Debug, Clone)] +pub struct Tables { + pub blobs: BlobsTable, + pub entries: EntriesTable, +} + +impl Tables { + pub fn new(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result { + Ok(Self { + blobs: env + .open_database(wtxn, Some(BLOBS_TABLE))? + .expect("Blobs table already created"), + entries: env + .open_database(wtxn, Some(ENTRIES_TABLE))? + .expect("Entries table already created"), + }) + } +} diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs index 1d2028e..7c9e2e3 100644 --- a/pubky-homeserver/src/database/tables/entries.rs +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -78,6 +78,6 @@ impl Entry { panic!("Unknown Entry version"); } - Ok(from_bytes(bytes)?) + from_bytes(bytes) } } diff --git a/pubky-homeserver/src/extractors.rs b/pubky-homeserver/src/extractors.rs index e7192db..b89911c 100644 --- a/pubky-homeserver/src/extractors.rs +++ b/pubky-homeserver/src/extractors.rs @@ -49,6 +49,10 @@ where pub struct EntryPath(pub(crate) String); impl EntryPath { + pub fn as_str(&self) -> &str { + self.0.as_str() + } + pub fn as_bytes(&self) -> &[u8] { self.0.as_bytes() } diff --git a/pubky-homeserver/src/routes/public.rs b/pubky-homeserver/src/routes/public.rs index 38c2741..cb07cee 100644 --- a/pubky-homeserver/src/routes/public.rs +++ b/pubky-homeserver/src/routes/public.rs @@ -23,7 +23,7 @@ use crate::{ }; pub async fn put( - State(state): State, + State(mut state): State, pubky: Pubky, path: EntryPath, mut body: Body, @@ -45,43 +45,7 @@ pub async fn put( // TODO: Authorize - let mut wtxn = state.db.env.write_txn()?; - let blobs: BlobsTable = state - .db - .env - .open_database(&wtxn, Some(BLOBS_TABLE))? - .expect("Blobs table already created"); - - let entries: EntriesTable = state - .db - .env - .open_database(&wtxn, Some(ENTRIES_TABLE))? - .expect("Entries table already created"); - - let mut hasher = Hasher::new(); - let mut bytes = vec![]; - let mut length = 0; - - while let Ok(chunk) = rx.recv() { - hasher.update(&chunk); - bytes.extend_from_slice(&chunk); - length += chunk.len(); - } - - let hash = hasher.finalize(); - - blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?; - - let mut entry = Entry::new(); - - entry.set_content_hash(hash); - entry.set_content_length(length); - - let mut key = vec![]; - key.extend_from_slice(public_key.as_bytes()); - key.extend_from_slice(path.as_bytes()); - - entries.put(&mut wtxn, &key, &entry.serialize()); + state.db.put_entry(public_key, path.as_str(), rx); Ok(()) }); @@ -101,7 +65,7 @@ pub async fn put( } pub async fn get( - State(state): State, + State(mut state): State, pubky: Pubky, path: EntryPath, ) -> Result { @@ -111,39 +75,9 @@ pub async fn get( let public_key = pubky.public_key(); - let mut rtxn = state.db.env.read_txn()?; - - let entries: EntriesTable = state - .db - .env - .open_database(&rtxn, Some(ENTRIES_TABLE))? - .expect("Entries table already created"); - - let blobs: BlobsTable = state - .db - .env - .open_database(&rtxn, Some(BLOBS_TABLE))? - .expect("Blobs table already created"); - - let mut count = 0; - - for x in entries.iter(&rtxn)? { - count += 1 + match state.db.get_blob(public_key, path.as_str()) { + Err(error) => Err(error)?, + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => Err(Error::with_status(StatusCode::NOT_FOUND)), } - - return Err(Error::new(StatusCode::NOT_FOUND, count.to_string().into())); - - let mut key = vec![]; - key.extend_from_slice(public_key.as_bytes()); - key.extend_from_slice(path.as_bytes()); - - if let Some(bytes) = entries.get(&rtxn, &key)? { - let entry = Entry::deserialize(bytes)?; - - if let Some(blob) = blobs.get(&rtxn, entry.content_hash())? { - return Ok(blob.to_vec()); - }; - }; - - Err(Error::new(StatusCode::NOT_FOUND, path.0.into())) } diff --git a/pubky/src/client/public.rs b/pubky/src/client/public.rs index 1600c92..7599e8f 100644 --- a/pubky/src/client/public.rs +++ b/pubky/src/client/public.rs @@ -27,15 +27,7 @@ impl PubkyClient { url.set_path(&format!("/{pubky}/{path}")); - let result = self.request(super::HttpMethod::Get, &url).call(); - - if let Err(error) = result { - dbg!(&error); - - return Err(error)?; - } - - let response = result.unwrap(); + let response = self.request(super::HttpMethod::Get, &url).call()?; let len = response .header("Content-Length") @@ -45,7 +37,7 @@ impl PubkyClient { // TODO: bail on too large files. - let mut bytes = Vec::with_capacity(len as usize); + let mut bytes = vec![0; len as usize]; response.into_reader().read_exact(&mut bytes); @@ -102,14 +94,17 @@ mod tests { } } - let response = client.get(&keypair.public_key(), "/pub/foo.txt").await; + let response = client + .get(&keypair.public_key(), "/pub/foo.txt") + .await + .unwrap(); - if let Err(Error::Ureq(ureqerror)) = response { - if let Some(r) = ureqerror.into_response() { - dbg!(r.into_string()); - } - } + // if let Err(Error::Ureq(ureqerror)) = response { + // if let Some(r) = ureqerror.into_response() { + // dbg!(r.into_string()); + // } + // } - // dbg!(response); + assert_eq!(response, bytes::Bytes::from(vec![0, 1, 2, 3, 4])) } }