From 401872a61fa24ee89fc695e55c27d2b2a7b9aa4b Mon Sep 17 00:00:00 2001 From: nazeh Date: Tue, 23 Jul 2024 11:26:12 +0300 Subject: [PATCH 1/4] feat(homeserver): stream incoming body --- Cargo.lock | 17 +++++ pubky-common/src/crypto.rs | 2 + pubky-homeserver/Cargo.toml | 4 +- pubky-homeserver/src/config.rs | 15 +++-- pubky-homeserver/src/database.rs | 15 +---- pubky-homeserver/src/database/migrations.rs | 16 ++--- .../src/database/migrations/m0.rs | 15 +++++ pubky-homeserver/src/database/tables.rs | 2 + pubky-homeserver/src/database/tables/blobs.rs | 13 ++++ .../src/database/tables/entries.rs | 66 +++++++++++++++++++ pubky-homeserver/src/routes.rs | 6 +- pubky-homeserver/src/routes/auth.rs | 7 +- pubky-homeserver/src/routes/drive.rs | 60 +++++++++++++++-- pubky-homeserver/src/server.rs | 4 +- 14 files changed, 205 insertions(+), 37 deletions(-) create mode 100644 pubky-homeserver/src/database/migrations/m0.rs create mode 100644 pubky-homeserver/src/database/tables/blobs.rs create mode 100644 pubky-homeserver/src/database/tables/entries.rs diff --git a/Cargo.lock b/Cargo.lock index 26bae45..74ef877 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -142,6 +142,8 @@ dependencies = [ "mime", "pin-project-lite", "serde", + "tokio", + "tokio-util", "tower", "tower-layer", "tower-service", @@ -1283,6 +1285,8 @@ dependencies = [ "base32", "bytes", "dirs-next", + "flume", + "futures-util", "heed", "pkarr", "postcard", @@ -1826,6 +1830,19 @@ dependencies = [ "syn", ] +[[package]] +name = "tokio-util" +version = "0.7.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + [[package]] name = "tower" version = "0.4.13" diff --git a/pubky-common/src/crypto.rs b/pubky-common/src/crypto.rs index 2f8131c..ec8f58a 100644 --- a/pubky-common/src/crypto.rs +++ b/pubky-common/src/crypto.rs @@ -8,6 +8,8 @@ pub type Hash = blake3::Hash; pub use blake3::hash; +pub use blake3::Hasher; + pub fn random_hash() -> Hash { let mut rng = rand::thread_rng(); Hash::from_bytes(rng.gen()) diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml index 33e18ab..da0c5c7 100644 --- a/pubky-homeserver/Cargo.toml +++ b/pubky-homeserver/Cargo.toml @@ -6,10 +6,12 @@ edition = "2021" [dependencies] anyhow = "1.0.82" axum = "0.7.5" -axum-extra = { version = "0.9.3", features = ["typed-header"] } +axum-extra = { version = "0.9.3", features = ["typed-header", "async-read-body"] } base32 = "0.5.1" bytes = "1.6.1" dirs-next = "2.0.0" +flume = "0.11.0" +futures-util = "0.3.30" heed = "0.20.3" pkarr = { version = "2.1.0", features = ["async"] } postcard = { version = "1.0.8", features = ["alloc"] } diff --git a/pubky-homeserver/src/config.rs b/pubky-homeserver/src/config.rs index 7c9fcfe..3657ecd 100644 --- a/pubky-homeserver/src/config.rs +++ b/pubky-homeserver/src/config.rs @@ -40,13 +40,16 @@ impl Config { /// Test configurations pub fn test(testnet: &pkarr::mainline::Testnet) -> Self { + let bootstrap = Some(testnet.bootstrap.to_owned()); + let storage = Some( + std::env::temp_dir() + .join(Timestamp::now().to_string()) + .join(DEFAULT_STORAGE_DIR), + ); + Self { - bootstrap: Some(testnet.bootstrap.to_owned()), - storage: Some( - std::env::temp_dir() - .join(Timestamp::now().to_string()) - .join(DEFAULT_STORAGE_DIR), - ), + bootstrap, + storage, ..Default::default() } } diff --git a/pubky-homeserver/src/database.rs b/pubky-homeserver/src/database.rs index 2f8d591..0eb3200 100644 --- a/pubky-homeserver/src/database.rs +++ b/pubky-homeserver/src/database.rs @@ -19,21 +19,10 @@ impl DB { let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?; - let db = DB { env }; + migrations::run(&env); - db.run_migrations(); + let db = DB { env }; Ok(db) } - - fn run_migrations(&self) -> anyhow::Result<()> { - let mut wtxn = self.env.write_txn()?; - - migrations::create_users_table(&self.env, &mut wtxn); - migrations::create_sessions_table(&self.env, &mut wtxn); - - wtxn.commit()?; - - Ok(()) - } } diff --git a/pubky-homeserver/src/database/migrations.rs b/pubky-homeserver/src/database/migrations.rs index 93c7631..dbead07 100644 --- a/pubky-homeserver/src/database/migrations.rs +++ b/pubky-homeserver/src/database/migrations.rs @@ -1,19 +1,17 @@ use heed::{types::Str, Database, Env, RwTxn}; +mod m0; + use super::tables; -pub const TABLES_COUNT: u32 = 2; +pub const TABLES_COUNT: u32 = 4; -pub fn create_users_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> { - let _: tables::users::UsersTable = - env.create_database(wtxn, Some(tables::users::USERS_TABLE))?; +pub fn run(env: &Env) -> anyhow::Result<()> { + let mut wtxn = env.write_txn()?; - Ok(()) -} + m0::run(env, &mut wtxn); -pub fn create_sessions_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> { - let _: tables::sessions::SessionsTable = - env.create_database(wtxn, Some(tables::sessions::SESSIONS_TABLE))?; + wtxn.commit()?; Ok(()) } diff --git a/pubky-homeserver/src/database/migrations/m0.rs b/pubky-homeserver/src/database/migrations/m0.rs new file mode 100644 index 0000000..74d89c4 --- /dev/null +++ b/pubky-homeserver/src/database/migrations/m0.rs @@ -0,0 +1,15 @@ +use heed::{types::Str, Database, Env, RwTxn}; + +use super::tables::{blobs, entries, sessions, users}; + +pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> { + let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?; + + let _: sessions::SessionsTable = env.create_database(wtxn, Some(sessions::SESSIONS_TABLE))?; + + let _: blobs::BlobsTable = env.create_database(wtxn, Some(blobs::BLOBS_TABLE))?; + + let _: entries::EntriesTable = env.create_database(wtxn, Some(entries::ENTRIES_TABLE))?; + + Ok(()) +} diff --git a/pubky-homeserver/src/database/tables.rs b/pubky-homeserver/src/database/tables.rs index b6e3efc..4f0c1c5 100644 --- a/pubky-homeserver/src/database/tables.rs +++ b/pubky-homeserver/src/database/tables.rs @@ -1,2 +1,4 @@ +pub mod blobs; +pub mod entries; pub mod sessions; pub mod users; diff --git a/pubky-homeserver/src/database/tables/blobs.rs b/pubky-homeserver/src/database/tables/blobs.rs new file mode 100644 index 0000000..9cf1da1 --- /dev/null +++ b/pubky-homeserver/src/database/tables/blobs.rs @@ -0,0 +1,13 @@ +use std::{borrow::Cow, time::SystemTime}; + +use heed::{ + types::{Bytes, Str}, + BoxedError, BytesDecode, BytesEncode, Database, +}; + +use pubky_common::crypto::Hash; + +/// hash of the blob => bytes. +pub type BlobsTable = Database; + +pub const BLOBS_TABLE: &str = "blobs"; diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs new file mode 100644 index 0000000..5a1cc8e --- /dev/null +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -0,0 +1,66 @@ +use postcard::{from_bytes, to_allocvec}; +use serde::{Deserialize, Serialize}; +use std::{borrow::Cow, time::SystemTime}; + +use heed::{ + types::{Bytes, Str}, + BoxedError, BytesDecode, BytesEncode, Database, +}; + +use pubky_common::crypto::Hash; + +/// full_path(pubky/*path) => Entry. +pub type EntriesTable = Database; + +pub const ENTRIES_TABLE: &str = "entries"; + +#[derive(Clone, Default, Serialize, Deserialize, Debug, Eq, PartialEq)] +pub struct Entry { + /// Encoding version + version: usize, + /// Modified at + timestamp: u64, + content_hash: [u8; 32], + content_length: usize, + content_type: String, + // user_metadata: ? +} + +// TODO: get headers like Etag + +impl Entry { + pub fn new() -> Self { + Default::default() + } + + // === Setters === + + pub fn set_content_hash(&mut self, content_hash: Hash) -> &mut Self { + content_hash.as_bytes().clone_into(&mut self.content_hash); + self + } + + pub fn set_content_length(&mut self, content_length: usize) -> &mut Self { + self.content_length = content_length; + self + } + + pub fn set_content_type(&mut self, content_type: &str) -> &mut Self { + self.content_type = content_type.to_string(); + self + } + + // === Getters === + + pub fn content_hash(&self) -> &[u8; 32] { + &self.content_hash + } + + pub fn content_length(&self) -> usize { + self.content_length + } + + pub fn content_type(&self) -> &str { + &self.content_type + } +} diff --git a/pubky-homeserver/src/routes.rs b/pubky-homeserver/src/routes.rs index 86120c2..6099858 100644 --- a/pubky-homeserver/src/routes.rs +++ b/pubky-homeserver/src/routes.rs @@ -1,4 +1,5 @@ use axum::{ + extract::DefaultBodyLimit, routing::{delete, get, post, put}, Router, }; @@ -18,8 +19,11 @@ pub fn create_app(state: AppState) -> Router { .route("/:pubky/session", get(auth::session)) .route("/:pubky/session", post(auth::signin)) .route("/:pubky/session", delete(auth::signout)) - .route("/:pubky/*key", get(drive::put)) + .route("/:pubky/*key", put(drive::put)) .layer(TraceLayer::new_for_http()) .layer(CookieManagerLayer::new()) + // TODO: revisit if we enable streaming big payloads + // TODO: maybe add to a separate router (drive router?). + .layer(DefaultBodyLimit::max(16 * 1024)) .with_state(state) } diff --git a/pubky-homeserver/src/routes/auth.rs b/pubky-homeserver/src/routes/auth.rs index fceb6fe..c38aa38 100644 --- a/pubky-homeserver/src/routes/auth.rs +++ b/pubky-homeserver/src/routes/auth.rs @@ -2,7 +2,6 @@ use axum::{ extract::{Request, State}, http::{HeaderMap, StatusCode}, response::IntoResponse, - routing::get, Router, }; use axum_extra::{headers::UserAgent, TypedHeader}; @@ -103,7 +102,11 @@ pub async fn signin( state.verifier.verify(&body, public_key)?; let mut wtxn = state.db.env.write_txn()?; - let users: UsersTable = state.db.env.create_database(&mut wtxn, Some(USERS_TABLE))?; + let users: UsersTable = state + .db + .env + .open_database(&wtxn, Some(USERS_TABLE))? + .expect("Users table already created"); if let Some(existing) = users.get(&wtxn, public_key)? { users.put(&mut wtxn, public_key, &existing)?; diff --git a/pubky-homeserver/src/routes/drive.rs b/pubky-homeserver/src/routes/drive.rs index 3050250..12f8fb8 100644 --- a/pubky-homeserver/src/routes/drive.rs +++ b/pubky-homeserver/src/routes/drive.rs @@ -1,11 +1,63 @@ -use axum::response::IntoResponse; +use axum::{ + body::{Body, Bytes}, + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + RequestExt, Router, +}; +use futures_util::stream::StreamExt; use tracing::debug; -use crate::extractors::Pubky; +use pubky_common::crypto::Hasher; -pub async fn put(pubky: Pubky) -> Result { - debug!(pubky=?pubky.public_key()); +use crate::{ + database::tables::blobs::{BlobsTable, BLOBS_TABLE}, + error::{Error, Result}, + extractors::Pubky, + server::AppState, +}; + +pub async fn put( + State(state): State, + pubky: Pubky, + // Path(key): Path, + mut body: Body, +) -> Result { + let mut stream = body.into_data_stream(); + + let (tx, rx) = flume::bounded::(1); + + // Offload the write transaction to a blocking task + let done = tokio::task::spawn_blocking(move || { + // TODO: this is a blocking operation, which is ok for small + // payloads (we have 16 kb limit for now) but later we need + // to stream this to filesystem, and keep track of any failed + // writes to GC these files later. + + let mut wtxn = state.db.env.write_txn().unwrap(); + let blobs: BlobsTable = state + .db + .env + .open_database(&wtxn, Some(BLOBS_TABLE)) + .unwrap() + .expect("Blobs table already created"); + + let hasher = Hasher::new(); + + while let Ok(chunk) = rx.recv() { + dbg!(chunk); + } + }); + + while let Some(next) = stream.next().await { + let chunk = next + .map_err(|err| Error::new(StatusCode::INTERNAL_SERVER_ERROR, Some(err.to_string())))?; + + tx.send(chunk); + } + + let _ = done.await; Ok("Pubky drive...".to_string()) } diff --git a/pubky-homeserver/src/server.rs b/pubky-homeserver/src/server.rs index f167d05..0a2f3ae 100644 --- a/pubky-homeserver/src/server.rs +++ b/pubky-homeserver/src/server.rs @@ -3,7 +3,7 @@ use std::{future::IntoFuture, net::SocketAddr}; use anyhow::{Error, Result}; use pubky_common::auth::AuthnVerifier; use tokio::{net::TcpListener, signal, task::JoinSet}; -use tracing::{info, warn}; +use tracing::{debug, info, warn}; use pkarr::{ mainline::dht::{DhtSettings, Testnet}, @@ -27,6 +27,8 @@ pub(crate) struct AppState { impl Homeserver { pub async fn start(config: Config) -> Result { + debug!(?config); + let public_key = config.keypair().public_key(); let db = DB::open(&config.storage()?)?; From 62cc13bca489e568882757ed851e06841ec3258a Mon Sep 17 00:00:00 2001 From: nazeh Date: Tue, 23 Jul 2024 11:35:53 +0300 Subject: [PATCH 2/4] refactor(pubky): separate PubkyClient implementation into modules --- pubky/src/client.rs | 245 +------------------------------------- pubky/src/client/auth.rs | 122 +++++++++++++++++++ pubky/src/client/pkarr.rs | 179 ++++++++++++++++++++++++++++ pubky/src/lib.rs | 51 -------- 4 files changed, 305 insertions(+), 292 deletions(-) create mode 100644 pubky/src/client/auth.rs create mode 100644 pubky/src/client/pkarr.rs diff --git a/pubky/src/client.rs b/pubky/src/client.rs index e353e98..69f418c 100644 --- a/pubky/src/client.rs +++ b/pubky/src/client.rs @@ -1,18 +1,14 @@ +mod auth; +mod pkarr; + use std::{collections::HashMap, fmt::format, time::Duration}; -use pkarr::{ - dns::{rdata::SVCB, Packet}, - mainline::{dht::DhtSettings, Testnet}, - Keypair, PkarrClient, PublicKey, Settings, SignedPacket, -}; use ureq::{Agent, Response}; use url::Url; -use pubky_common::{auth::AuthnSignature, session::Session}; - use crate::error::{Error, Result}; -const MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION: u8 = 3; +use pkarr::{DhtSettings, PkarrClient, Settings, Testnet}; #[derive(Debug, Clone)] pub struct PubkyClient { @@ -45,174 +41,8 @@ impl PubkyClient { // === Public Methods === - /// Signup to a homeserver and update Pkarr accordingly. - /// - /// The homeserver is a Pkarr domain name, where the TLD is a Pkarr public key - /// for example "pubky.o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy" - pub fn signup(&self, keypair: &Keypair, homeserver: &str) -> Result<()> { - let (audience, mut url) = self.resolve_endpoint(homeserver)?; - - url.set_path(&format!("/{}", keypair.public_key())); - - self.request(HttpMethod::Put, &url) - .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes()) - .map_err(Box::new)?; - - self.publish_pubky_homeserver(keypair, homeserver); - - Ok(()) - } - - /// Check the current sesison for a given Pubky in its homeserver. - pub fn session(&self, pubky: &PublicKey) -> Result { - let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?; - - url.set_path(&format!("/{}/session", pubky)); - - let mut bytes = vec![]; - - let result = self.request(HttpMethod::Get, &url).call().map_err(Box::new); - - if let Ok(reader) = result { - reader.into_reader().read_to_end(&mut bytes); - } else { - return Err(Error::NotSignedIn); - } - - Ok(Session::deserialize(&bytes)?) - } - - /// Signout from a homeserver. - pub fn signout(&self, pubky: &PublicKey) -> Result<()> { - let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?; - - url.set_path(&format!("/{}/session", pubky)); - - self.request(HttpMethod::Delete, &url) - .call() - .map_err(Box::new)?; - - Ok(()) - } - - /// Signin to a homeserver. - pub fn signin(&self, keypair: &Keypair) -> Result<()> { - let pubky = keypair.public_key(); - - let (audience, mut url) = self.resolve_pubky_homeserver(&pubky)?; - - url.set_path(&format!("/{}/session", &pubky)); - - self.request(HttpMethod::Post, &url) - .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes()) - .map_err(Box::new)?; - - Ok(()) - } - // === Private Methods === - /// Publish the SVCB record for `_pubky.`. - pub(crate) fn publish_pubky_homeserver(&self, keypair: &Keypair, host: &str) -> Result<()> { - let mut packet = Packet::new_reply(0); - - if let Some(existing) = self.pkarr.resolve(&keypair.public_key())? { - for answer in existing.packet().answers.iter().cloned() { - if !answer.name.to_string().starts_with("_pubky") { - packet.answers.push(answer.into_owned()) - } - } - } - - let svcb = SVCB::new(0, host.try_into()?); - - packet.answers.push(pkarr::dns::ResourceRecord::new( - "_pubky".try_into().unwrap(), - pkarr::dns::CLASS::IN, - 60 * 60, - pkarr::dns::rdata::RData::SVCB(svcb), - )); - - let signed_packet = SignedPacket::from_packet(keypair, &packet)?; - - self.pkarr.publish(&signed_packet)?; - - Ok(()) - } - - /// Resolve the homeserver for a pubky. - pub(crate) fn resolve_pubky_homeserver(&self, pubky: &PublicKey) -> Result<(PublicKey, Url)> { - let target = format!("_pubky.{}", pubky); - - self.resolve_endpoint(&target) - .map_err(|_| Error::Generic("Could not resolve homeserver".to_string())) - } - - /// Resolve a service's public_key and clearnet url from a Pubky domain - fn resolve_endpoint(&self, target: &str) -> Result<(PublicKey, Url)> { - // TODO: cache the result of this function? - // TODO: use MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION - // TODO: move to common? - - let mut target = target.to_string(); - let mut homeserver_public_key = None; - let mut host = target.clone(); - - // PublicKey is very good at extracting the Pkarr TLD from a string. - while let Ok(public_key) = PublicKey::try_from(target.clone()) { - if let Some(signed_packet) = self.pkarr.resolve(&public_key)? { - let mut prior = None; - - for answer in signed_packet.resource_records(&target) { - if let pkarr::dns::rdata::RData::SVCB(svcb) = &answer.rdata { - if svcb.priority == 0 { - prior = Some(svcb) - } else if let Some(sofar) = prior { - if svcb.priority >= sofar.priority { - prior = Some(svcb) - } - // TODO return random if priority is the same - } else { - prior = Some(svcb) - } - } - } - - if let Some(svcb) = prior { - homeserver_public_key = Some(public_key); - target = svcb.target.to_string(); - - if let Some(port) = svcb.get_param(pkarr::dns::rdata::SVCB::PORT) { - if port.len() < 2 { - // TODO: debug! Error encoding port! - } - let port = u16::from_be_bytes([port[0], port[1]]); - - host = format!("{target}:{port}"); - } else { - host.clone_from(&target); - }; - - continue; - } - }; - - break; - } - - if let Some(homeserver) = homeserver_public_key { - let url = if host.starts_with("localhost") { - format!("http://{host}") - } else { - format!("https://{host}") - }; - - return Ok((homeserver, Url::parse(&url)?)); - } - - Err(Error::Generic("Could not resolve endpoint".to_string())) - } - fn request(&self, method: HttpMethod, url: &Url) -> ureq::Request { self.agent.request_url(method.into(), url) } @@ -242,70 +72,3 @@ impl From for &str { } } } - -#[cfg(test)] -mod tests { - use super::*; - - use pkarr::{ - dns::{rdata::SVCB, Packet}, - mainline::{dht::DhtSettings, Testnet}, - Keypair, PkarrClient, Settings, SignedPacket, - }; - use pubky_homeserver::Homeserver; - - #[tokio::test] - async fn resolve_homeserver() { - let testnet = Testnet::new(3); - let server = Homeserver::start_test(&testnet).await.unwrap(); - - // Publish an intermediate controller of the homeserver - let pkarr_client = PkarrClient::new(Settings { - dht: DhtSettings { - bootstrap: Some(testnet.bootstrap.clone()), - ..Default::default() - }, - ..Default::default() - }) - .unwrap() - .as_async(); - - let intermediate = Keypair::random(); - - let mut packet = Packet::new_reply(0); - - let server_tld = server.public_key().to_string(); - - let mut svcb = SVCB::new(0, server_tld.as_str().try_into().unwrap()); - - packet.answers.push(pkarr::dns::ResourceRecord::new( - "pubky".try_into().unwrap(), - pkarr::dns::CLASS::IN, - 60 * 60, - pkarr::dns::rdata::RData::SVCB(svcb), - )); - - let signed_packet = SignedPacket::from_packet(&intermediate, &packet).unwrap(); - - pkarr_client.publish(&signed_packet).await.unwrap(); - - tokio::task::spawn_blocking(move || { - let client = PubkyClient::test(&testnet); - - let pubky = Keypair::random(); - - client - .publish_pubky_homeserver(&pubky, &format!("pubky.{}", &intermediate.public_key())); - - let (public_key, url) = client - .resolve_pubky_homeserver(&pubky.public_key()) - .unwrap(); - - assert_eq!(public_key, server.public_key()); - assert_eq!(url.host_str(), Some("localhost")); - assert_eq!(url.port(), Some(server.port())); - }) - .await - .expect("task failed") - } -} diff --git a/pubky/src/client/auth.rs b/pubky/src/client/auth.rs new file mode 100644 index 0000000..25b679c --- /dev/null +++ b/pubky/src/client/auth.rs @@ -0,0 +1,122 @@ +use crate::PubkyClient; + +use pubky_common::{auth::AuthnSignature, session::Session}; + +use super::{Error, HttpMethod, Result}; +use pkarr::{Keypair, PublicKey}; + +impl PubkyClient { + /// Signup to a homeserver and update Pkarr accordingly. + /// + /// The homeserver is a Pkarr domain name, where the TLD is a Pkarr public key + /// for example "pubky.o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy" + pub fn signup(&self, keypair: &Keypair, homeserver: &str) -> Result<()> { + let (audience, mut url) = self.resolve_endpoint(homeserver)?; + + url.set_path(&format!("/{}", keypair.public_key())); + + self.request(HttpMethod::Put, &url) + .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes()) + .map_err(Box::new)?; + + self.publish_pubky_homeserver(keypair, homeserver); + + Ok(()) + } + + /// Check the current sesison for a given Pubky in its homeserver. + pub fn session(&self, pubky: &PublicKey) -> Result { + let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?; + + url.set_path(&format!("/{}/session", pubky)); + + let mut bytes = vec![]; + + let result = self.request(HttpMethod::Get, &url).call().map_err(Box::new); + + if let Ok(reader) = result { + reader.into_reader().read_to_end(&mut bytes); + } else { + return Err(Error::NotSignedIn); + } + + Ok(Session::deserialize(&bytes)?) + } + + /// Signout from a homeserver. + pub fn signout(&self, pubky: &PublicKey) -> Result<()> { + let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?; + + url.set_path(&format!("/{}/session", pubky)); + + self.request(HttpMethod::Delete, &url) + .call() + .map_err(Box::new)?; + + Ok(()) + } + + /// Signin to a homeserver. + pub fn signin(&self, keypair: &Keypair) -> Result<()> { + let pubky = keypair.public_key(); + + let (audience, mut url) = self.resolve_pubky_homeserver(&pubky)?; + + url.set_path(&format!("/{}/session", &pubky)); + + self.request(HttpMethod::Post, &url) + .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes()) + .map_err(Box::new)?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use crate::*; + + use pkarr::{mainline::Testnet, Keypair}; + use pubky_common::session::Session; + use pubky_homeserver::Homeserver; + + #[tokio::test] + async fn basic_authn() { + let testnet = Testnet::new(3); + let server = Homeserver::start_test(&testnet).await.unwrap(); + + let client = PubkyClient::test(&testnet).as_async(); + + let keypair = Keypair::random(); + + client + .signup(&keypair, &server.public_key().to_string()) + .await + .unwrap(); + + let session = client.session(&keypair.public_key()).await.unwrap(); + + assert_eq!(session, Session { ..session.clone() }); + + client.signout(&keypair.public_key()).await.unwrap(); + + { + let session = client.session(&keypair.public_key()).await; + + assert!(session.is_err()); + + match session { + Err(Error::NotSignedIn) => {} + _ => assert!(false, "expected NotSignedInt error"), + } + } + + client.signin(&keypair).await.unwrap(); + + { + let session = client.session(&keypair.public_key()).await.unwrap(); + + assert_eq!(session, Session { ..session.clone() }); + } + } +} diff --git a/pubky/src/client/pkarr.rs b/pubky/src/client/pkarr.rs new file mode 100644 index 0000000..c036527 --- /dev/null +++ b/pubky/src/client/pkarr.rs @@ -0,0 +1,179 @@ +pub use pkarr::{ + dns::{rdata::SVCB, Packet}, + mainline::{dht::DhtSettings, Testnet}, + Keypair, PkarrClient, PublicKey, Settings, SignedPacket, +}; + +use super::{Error, PubkyClient, Result, Url}; + +const MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION: u8 = 3; + +impl PubkyClient { + /// Publish the SVCB record for `_pubky.`. + pub(crate) fn publish_pubky_homeserver(&self, keypair: &Keypair, host: &str) -> Result<()> { + let mut packet = Packet::new_reply(0); + + if let Some(existing) = self.pkarr.resolve(&keypair.public_key())? { + for answer in existing.packet().answers.iter().cloned() { + if !answer.name.to_string().starts_with("_pubky") { + packet.answers.push(answer.into_owned()) + } + } + } + + let svcb = SVCB::new(0, host.try_into()?); + + packet.answers.push(pkarr::dns::ResourceRecord::new( + "_pubky".try_into().unwrap(), + pkarr::dns::CLASS::IN, + 60 * 60, + pkarr::dns::rdata::RData::SVCB(svcb), + )); + + let signed_packet = SignedPacket::from_packet(keypair, &packet)?; + + self.pkarr.publish(&signed_packet)?; + + Ok(()) + } + + /// Resolve the homeserver for a pubky. + pub(crate) fn resolve_pubky_homeserver(&self, pubky: &PublicKey) -> Result<(PublicKey, Url)> { + let target = format!("_pubky.{}", pubky); + + self.resolve_endpoint(&target) + .map_err(|_| Error::Generic("Could not resolve homeserver".to_string())) + } + + /// Resolve a service's public_key and clearnet url from a Pubky domain + pub(crate) fn resolve_endpoint(&self, target: &str) -> Result<(PublicKey, Url)> { + // TODO: cache the result of this function? + // TODO: use MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION + // TODO: move to common? + + let mut target = target.to_string(); + let mut homeserver_public_key = None; + let mut host = target.clone(); + + // PublicKey is very good at extracting the Pkarr TLD from a string. + while let Ok(public_key) = PublicKey::try_from(target.clone()) { + if let Some(signed_packet) = self.pkarr.resolve(&public_key)? { + let mut prior = None; + + for answer in signed_packet.resource_records(&target) { + if let pkarr::dns::rdata::RData::SVCB(svcb) = &answer.rdata { + if svcb.priority == 0 { + prior = Some(svcb) + } else if let Some(sofar) = prior { + if svcb.priority >= sofar.priority { + prior = Some(svcb) + } + // TODO return random if priority is the same + } else { + prior = Some(svcb) + } + } + } + + if let Some(svcb) = prior { + homeserver_public_key = Some(public_key); + target = svcb.target.to_string(); + + if let Some(port) = svcb.get_param(pkarr::dns::rdata::SVCB::PORT) { + if port.len() < 2 { + // TODO: debug! Error encoding port! + } + let port = u16::from_be_bytes([port[0], port[1]]); + + host = format!("{target}:{port}"); + } else { + host.clone_from(&target); + }; + + continue; + } + }; + + break; + } + + if let Some(homeserver) = homeserver_public_key { + let url = if host.starts_with("localhost") { + format!("http://{host}") + } else { + format!("https://{host}") + }; + + return Ok((homeserver, Url::parse(&url)?)); + } + + Err(Error::Generic("Could not resolve endpoint".to_string())) + } +} + +#[cfg(test)] +mod tests { + use super::*; + + use pkarr::{ + dns::{rdata::SVCB, Packet}, + mainline::{dht::DhtSettings, Testnet}, + Keypair, PkarrClient, Settings, SignedPacket, + }; + use pubky_homeserver::Homeserver; + + #[tokio::test] + async fn resolve_homeserver() { + let testnet = Testnet::new(3); + let server = Homeserver::start_test(&testnet).await.unwrap(); + + // Publish an intermediate controller of the homeserver + let pkarr_client = PkarrClient::new(Settings { + dht: DhtSettings { + bootstrap: Some(testnet.bootstrap.clone()), + ..Default::default() + }, + ..Default::default() + }) + .unwrap() + .as_async(); + + let intermediate = Keypair::random(); + + let mut packet = Packet::new_reply(0); + + let server_tld = server.public_key().to_string(); + + let mut svcb = SVCB::new(0, server_tld.as_str().try_into().unwrap()); + + packet.answers.push(pkarr::dns::ResourceRecord::new( + "pubky".try_into().unwrap(), + pkarr::dns::CLASS::IN, + 60 * 60, + pkarr::dns::rdata::RData::SVCB(svcb), + )); + + let signed_packet = SignedPacket::from_packet(&intermediate, &packet).unwrap(); + + pkarr_client.publish(&signed_packet).await.unwrap(); + + tokio::task::spawn_blocking(move || { + let client = PubkyClient::test(&testnet); + + let pubky = Keypair::random(); + + client + .publish_pubky_homeserver(&pubky, &format!("pubky.{}", &intermediate.public_key())); + + let (public_key, url) = client + .resolve_pubky_homeserver(&pubky.public_key()) + .unwrap(); + + assert_eq!(public_key, server.public_key()); + assert_eq!(url.host_str(), Some("localhost")); + assert_eq!(url.port(), Some(server.port())); + }) + .await + .expect("task failed") + } +} diff --git a/pubky/src/lib.rs b/pubky/src/lib.rs index b05d067..7125ca1 100644 --- a/pubky/src/lib.rs +++ b/pubky/src/lib.rs @@ -6,54 +6,3 @@ mod error; pub use client::PubkyClient; pub use error::Error; - -#[cfg(test)] -mod tests { - use super::*; - - use super::error::Error; - - use pkarr::{mainline::Testnet, Keypair}; - use pubky_common::session::Session; - use pubky_homeserver::Homeserver; - - #[tokio::test] - async fn basic_authn() { - let testnet = Testnet::new(3); - let server = Homeserver::start_test(&testnet).await.unwrap(); - - let client = PubkyClient::test(&testnet).as_async(); - - let keypair = Keypair::random(); - - client - .signup(&keypair, &server.public_key().to_string()) - .await - .unwrap(); - - let session = client.session(&keypair.public_key()).await.unwrap(); - - assert_eq!(session, Session { ..session.clone() }); - - client.signout(&keypair.public_key()).await.unwrap(); - - { - let session = client.session(&keypair.public_key()).await; - - assert!(session.is_err()); - - match session { - Err(Error::NotSignedIn) => {} - _ => assert!(false, "expected NotSignedInt error"), - } - } - - client.signin(&keypair).await.unwrap(); - - { - let session = client.session(&keypair.public_key()).await.unwrap(); - - assert_eq!(session, Session { ..session.clone() }); - } - } -} From 8cf18a3c0c1566dbdc4ac0e3f15b4789eed02d56 Mon Sep 17 00:00:00 2001 From: nazeh Date: Tue, 23 Jul 2024 19:04:02 +0300 Subject: [PATCH 3/4] feat(pubky): add get() --- pubky-homeserver/src/database/tables/blobs.rs | 4 +- .../src/database/tables/entries.rs | 23 ++- pubky-homeserver/src/error.rs | 40 +++-- pubky-homeserver/src/extractors.rs | 29 ++++ pubky-homeserver/src/routes.rs | 5 +- pubky-homeserver/src/routes/drive.rs | 63 -------- pubky-homeserver/src/routes/public.rs | 149 ++++++++++++++++++ pubky/Cargo.toml | 1 + pubky/src/client.rs | 1 + pubky/src/client/auth.rs | 20 ++- pubky/src/client/public.rs | 115 ++++++++++++++ pubky/src/client_async.rs | 29 ++++ pubky/src/error.rs | 6 + 13 files changed, 395 insertions(+), 90 deletions(-) delete mode 100644 pubky-homeserver/src/routes/drive.rs create mode 100644 pubky-homeserver/src/routes/public.rs create mode 100644 pubky/src/client/public.rs diff --git a/pubky-homeserver/src/database/tables/blobs.rs b/pubky-homeserver/src/database/tables/blobs.rs index 9cf1da1..0148d6f 100644 --- a/pubky-homeserver/src/database/tables/blobs.rs +++ b/pubky-homeserver/src/database/tables/blobs.rs @@ -5,9 +5,7 @@ use heed::{ BoxedError, BytesDecode, BytesEncode, Database, }; -use pubky_common::crypto::Hash; - /// hash of the blob => bytes. -pub type BlobsTable = Database; +pub type BlobsTable = Database; pub const BLOBS_TABLE: &str = "blobs"; diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs index 5a1cc8e..1d2028e 100644 --- a/pubky-homeserver/src/database/tables/entries.rs +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -7,10 +7,10 @@ use heed::{ BoxedError, BytesDecode, BytesEncode, Database, }; -use pubky_common::crypto::Hash; +use pubky_common::{crypto::Hash, timestamp::Timestamp}; /// full_path(pubky/*path) => Entry. -pub type EntriesTable = Database; +pub type EntriesTable = Database; pub const ENTRIES_TABLE: &str = "entries"; @@ -30,7 +30,10 @@ pub struct Entry { impl Entry { pub fn new() -> Self { - Default::default() + Self { + timestamp: Timestamp::now().into_inner(), + ..Default::default() + } } // === Setters === @@ -63,4 +66,18 @@ impl Entry { pub fn content_type(&self) -> &str { &self.content_type } + + // === Public Method === + + pub fn serialize(&self) -> Vec { + to_allocvec(self).expect("Session::serialize") + } + + pub fn deserialize(bytes: &[u8]) -> core::result::Result { + if bytes[0] > 0 { + panic!("Unknown Entry version"); + } + + Ok(from_bytes(bytes)?) + } } diff --git a/pubky-homeserver/src/error.rs b/pubky-homeserver/src/error.rs index 46d37d6..57081ab 100644 --- a/pubky-homeserver/src/error.rs +++ b/pubky-homeserver/src/error.rs @@ -54,25 +54,49 @@ impl IntoResponse for Error { impl From for Error { fn from(error: QueryRejection) -> Self { - Self::new(StatusCode::BAD_REQUEST, Some(error)) + Self::new(StatusCode::BAD_REQUEST, error.into()) } } impl From for Error { fn from(error: ExtensionRejection) -> Self { - Self::new(StatusCode::BAD_REQUEST, Some(error)) + Self::new(StatusCode::BAD_REQUEST, error.into()) } } impl From for Error { fn from(error: PathRejection) -> Self { - Self::new(StatusCode::BAD_REQUEST, Some(error)) + Self::new(StatusCode::BAD_REQUEST, error.into()) } } impl From for Error { fn from(error: std::io::Error) -> Self { - Self::new(StatusCode::INTERNAL_SERVER_ERROR, Some(error)) + Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into()) + } +} + +impl From for Error { + fn from(error: heed::Error) -> Self { + Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into()) + } +} + +impl From for Error { + fn from(error: anyhow::Error) -> Self { + Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into()) + } +} + +impl From for Error { + fn from(error: postcard::Error) -> Self { + Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into()) + } +} + +impl From for Error { + fn from(error: axum::Error) -> Self { + Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into()) } } @@ -89,11 +113,3 @@ impl From for Error { Self::new(StatusCode::BAD_REQUEST, Some(error)) } } - -impl From for Error { - fn from(error: heed::Error) -> Self { - debug!(?error); - - Self::with_status(StatusCode::INTERNAL_SERVER_ERROR) - } -} diff --git a/pubky-homeserver/src/extractors.rs b/pubky-homeserver/src/extractors.rs index be65f13..e7192db 100644 --- a/pubky-homeserver/src/extractors.rs +++ b/pubky-homeserver/src/extractors.rs @@ -45,3 +45,32 @@ where Ok(Pubky(public_key)) } } + +pub struct EntryPath(pub(crate) String); + +impl EntryPath { + pub fn as_bytes(&self) -> &[u8] { + self.0.as_bytes() + } +} + +#[async_trait] +impl FromRequestParts for EntryPath +where + S: Send + Sync, +{ + type Rejection = Response; + + async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result { + let params: Path> = + parts.extract().await.map_err(IntoResponse::into_response)?; + + // TODO: enforce path limits like no trailing '/' + + let path = params + .get("path") + .ok_or_else(|| (StatusCode::NOT_FOUND, "entry path missing").into_response())?; + + Ok(EntryPath(path.to_string())) + } +} diff --git a/pubky-homeserver/src/routes.rs b/pubky-homeserver/src/routes.rs index 6099858..3b872c1 100644 --- a/pubky-homeserver/src/routes.rs +++ b/pubky-homeserver/src/routes.rs @@ -9,7 +9,7 @@ use tower_http::trace::TraceLayer; use crate::server::AppState; mod auth; -mod drive; +mod public; mod root; pub fn create_app(state: AppState) -> Router { @@ -19,7 +19,8 @@ pub fn create_app(state: AppState) -> Router { .route("/:pubky/session", get(auth::session)) .route("/:pubky/session", post(auth::signin)) .route("/:pubky/session", delete(auth::signout)) - .route("/:pubky/*key", put(drive::put)) + .route("/:pubky/*path", put(public::put)) + .route("/:pubky/*path", get(public::get)) .layer(TraceLayer::new_for_http()) .layer(CookieManagerLayer::new()) // TODO: revisit if we enable streaming big payloads diff --git a/pubky-homeserver/src/routes/drive.rs b/pubky-homeserver/src/routes/drive.rs deleted file mode 100644 index 12f8fb8..0000000 --- a/pubky-homeserver/src/routes/drive.rs +++ /dev/null @@ -1,63 +0,0 @@ -use axum::{ - body::{Body, Bytes}, - extract::{Path, State}, - http::StatusCode, - response::IntoResponse, - RequestExt, Router, -}; -use futures_util::stream::StreamExt; - -use tracing::debug; - -use pubky_common::crypto::Hasher; - -use crate::{ - database::tables::blobs::{BlobsTable, BLOBS_TABLE}, - error::{Error, Result}, - extractors::Pubky, - server::AppState, -}; - -pub async fn put( - State(state): State, - pubky: Pubky, - // Path(key): Path, - mut body: Body, -) -> Result { - let mut stream = body.into_data_stream(); - - let (tx, rx) = flume::bounded::(1); - - // Offload the write transaction to a blocking task - let done = tokio::task::spawn_blocking(move || { - // TODO: this is a blocking operation, which is ok for small - // payloads (we have 16 kb limit for now) but later we need - // to stream this to filesystem, and keep track of any failed - // writes to GC these files later. - - let mut wtxn = state.db.env.write_txn().unwrap(); - let blobs: BlobsTable = state - .db - .env - .open_database(&wtxn, Some(BLOBS_TABLE)) - .unwrap() - .expect("Blobs table already created"); - - let hasher = Hasher::new(); - - while let Ok(chunk) = rx.recv() { - dbg!(chunk); - } - }); - - while let Some(next) = stream.next().await { - let chunk = next - .map_err(|err| Error::new(StatusCode::INTERNAL_SERVER_ERROR, Some(err.to_string())))?; - - tx.send(chunk); - } - - let _ = done.await; - - Ok("Pubky drive...".to_string()) -} diff --git a/pubky-homeserver/src/routes/public.rs b/pubky-homeserver/src/routes/public.rs new file mode 100644 index 0000000..38c2741 --- /dev/null +++ b/pubky-homeserver/src/routes/public.rs @@ -0,0 +1,149 @@ +use axum::{ + body::{Body, Bytes}, + extract::{Path, State}, + http::StatusCode, + response::IntoResponse, + RequestExt, Router, +}; +use axum_extra::body::AsyncReadBody; +use futures_util::stream::StreamExt; + +use tracing::debug; + +use pubky_common::crypto::Hasher; + +use crate::{ + database::tables::{ + blobs::{BlobsTable, BLOBS_TABLE}, + entries::{EntriesTable, Entry, ENTRIES_TABLE}, + }, + error::{Error, Result}, + extractors::{EntryPath, Pubky}, + server::AppState, +}; + +pub async fn put( + State(state): State, + pubky: Pubky, + path: EntryPath, + mut body: Body, +) -> Result { + // TODO: return an error if path does not start with '/pub/' + + let mut stream = body.into_data_stream(); + + let (tx, rx) = flume::bounded::(1); + + // TODO: refactor Database to clean up this scope. + let done = tokio::task::spawn_blocking(move || -> Result<()> { + // TODO: this is a blocking operation, which is ok for small + // payloads (we have 16 kb limit for now) but later we need + // to stream this to filesystem, and keep track of any failed + // writes to GC these files later. + + let public_key = pubky.public_key(); + + // TODO: Authorize + + let mut wtxn = state.db.env.write_txn()?; + let blobs: BlobsTable = state + .db + .env + .open_database(&wtxn, Some(BLOBS_TABLE))? + .expect("Blobs table already created"); + + let entries: EntriesTable = state + .db + .env + .open_database(&wtxn, Some(ENTRIES_TABLE))? + .expect("Entries table already created"); + + let mut hasher = Hasher::new(); + let mut bytes = vec![]; + let mut length = 0; + + while let Ok(chunk) = rx.recv() { + hasher.update(&chunk); + bytes.extend_from_slice(&chunk); + length += chunk.len(); + } + + let hash = hasher.finalize(); + + blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?; + + let mut entry = Entry::new(); + + entry.set_content_hash(hash); + entry.set_content_length(length); + + let mut key = vec![]; + key.extend_from_slice(public_key.as_bytes()); + key.extend_from_slice(path.as_bytes()); + + entries.put(&mut wtxn, &key, &entry.serialize()); + + Ok(()) + }); + + while let Some(next) = stream.next().await { + let chunk = next?; + + tx.send(chunk); + } + + drop(tx); + done.await.expect("join error")?; + + // TODO: return relevant headers, like Etag? + + Ok(()) +} + +pub async fn get( + State(state): State, + pubky: Pubky, + path: EntryPath, +) -> Result { + // TODO: check the path, return an error if doesn't start with `/pub/` + + // TODO: Enable streaming + + let public_key = pubky.public_key(); + + let mut rtxn = state.db.env.read_txn()?; + + let entries: EntriesTable = state + .db + .env + .open_database(&rtxn, Some(ENTRIES_TABLE))? + .expect("Entries table already created"); + + let blobs: BlobsTable = state + .db + .env + .open_database(&rtxn, Some(BLOBS_TABLE))? + .expect("Blobs table already created"); + + let mut count = 0; + + for x in entries.iter(&rtxn)? { + count += 1 + } + + return Err(Error::new(StatusCode::NOT_FOUND, count.to_string().into())); + + let mut key = vec![]; + key.extend_from_slice(public_key.as_bytes()); + key.extend_from_slice(path.as_bytes()); + + if let Some(bytes) = entries.get(&rtxn, &key)? { + let entry = Entry::deserialize(bytes)?; + + if let Some(blob) = blobs.get(&rtxn, entry.content_hash())? { + return Ok(blob.to_vec()); + }; + }; + + Err(Error::new(StatusCode::NOT_FOUND, path.0.into())) +} diff --git a/pubky/Cargo.toml b/pubky/Cargo.toml index e10789a..c40cd9c 100644 --- a/pubky/Cargo.toml +++ b/pubky/Cargo.toml @@ -11,6 +11,7 @@ ureq = { version = "2.10.0", features = ["cookies"] } thiserror = "1.0.62" url = "2.5.2" flume = { version = "0.11.0", features = ["select", "eventual-fairness"], default-features = false } +bytes = "1.6.1" [dev-dependencies] pubky_homeserver = { path = "../pubky-homeserver" } diff --git a/pubky/src/client.rs b/pubky/src/client.rs index 69f418c..95742df 100644 --- a/pubky/src/client.rs +++ b/pubky/src/client.rs @@ -1,5 +1,6 @@ mod auth; mod pkarr; +mod public; use std::{collections::HashMap, fmt::format, time::Duration}; diff --git a/pubky/src/client/auth.rs b/pubky/src/client/auth.rs index 25b679c..8445640 100644 --- a/pubky/src/client/auth.rs +++ b/pubky/src/client/auth.rs @@ -16,8 +16,7 @@ impl PubkyClient { url.set_path(&format!("/{}", keypair.public_key())); self.request(HttpMethod::Put, &url) - .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes()) - .map_err(Box::new)?; + .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())?; self.publish_pubky_homeserver(keypair, homeserver); @@ -25,6 +24,9 @@ impl PubkyClient { } /// Check the current sesison for a given Pubky in its homeserver. + /// + /// Returns an [Error::NotSignedIn] if so, or [ureq::Error] if + /// the response has any other `>=400` status code. pub fn session(&self, pubky: &PublicKey) -> Result { let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?; @@ -34,11 +36,15 @@ impl PubkyClient { let result = self.request(HttpMethod::Get, &url).call().map_err(Box::new); - if let Ok(reader) = result { - reader.into_reader().read_to_end(&mut bytes); - } else { - return Err(Error::NotSignedIn); - } + let reader = self.request(HttpMethod::Get, &url).call().map_err(|err| { + match err { + ureq::Error::Status(404, _) => Error::NotSignedIn, + // TODO: handle other types of errors + _ => err.into(), + } + })?; + + reader.into_reader().read_to_end(&mut bytes); Ok(Session::deserialize(&bytes)?) } diff --git a/pubky/src/client/public.rs b/pubky/src/client/public.rs new file mode 100644 index 0000000..1600c92 --- /dev/null +++ b/pubky/src/client/public.rs @@ -0,0 +1,115 @@ +use bytes::Bytes; + +use pkarr::PublicKey; + +use crate::PubkyClient; + +use super::Result; + +impl PubkyClient { + pub fn put(&self, pubky: &PublicKey, path: &str, content: &[u8]) -> Result<()> { + let path = normalize_path(path); + + let (_, mut url) = self.resolve_pubky_homeserver(pubky)?; + + url.set_path(&format!("/{pubky}/{path}")); + + self.request(super::HttpMethod::Put, &url) + .send_bytes(content)?; + + Ok(()) + } + + pub fn get(&self, pubky: &PublicKey, path: &str) -> Result { + let path = normalize_path(path); + + let (_, mut url) = self.resolve_pubky_homeserver(pubky)?; + + url.set_path(&format!("/{pubky}/{path}")); + + let result = self.request(super::HttpMethod::Get, &url).call(); + + if let Err(error) = result { + dbg!(&error); + + return Err(error)?; + } + + let response = result.unwrap(); + + let len = response + .header("Content-Length") + .and_then(|s| s.parse::().ok()) + // TODO: return an error in case content-length header is missing + .unwrap_or(0); + + // TODO: bail on too large files. + + let mut bytes = Vec::with_capacity(len as usize); + + response.into_reader().read_exact(&mut bytes); + + Ok(bytes.into()) + } +} + +fn normalize_path(path: &str) -> String { + let mut path = path.to_string(); + + if path.starts_with('/') { + path = path[1..].to_string() + } + + // TODO: should we return error instead? + if path.ends_with('/') { + path = path[..path.len()].to_string() + } + + path +} + +#[cfg(test)] +mod tests { + use std::ops::Deref; + + use crate::*; + + use pkarr::{mainline::Testnet, Keypair}; + use pubky_common::session::Session; + use pubky_homeserver::Homeserver; + + #[tokio::test] + async fn put_get() { + let testnet = Testnet::new(3); + let server = Homeserver::start_test(&testnet).await.unwrap(); + + let client = PubkyClient::test(&testnet).as_async(); + + let keypair = Keypair::random(); + + client + .signup(&keypair, &server.public_key().to_string()) + .await + .unwrap(); + + let response = client + .put(&keypair.public_key(), "/pub/foo.txt", &[0, 1, 2, 3, 4]) + .await; + + if let Err(Error::Ureq(ureqerror)) = response { + if let Some(r) = ureqerror.into_response() { + dbg!(r.into_string()); + } + } + + let response = client.get(&keypair.public_key(), "/pub/foo.txt").await; + + if let Err(Error::Ureq(ureqerror)) = response { + if let Some(r) = ureqerror.into_response() { + dbg!(r.into_string()); + } + } + + // dbg!(response); + } +} diff --git a/pubky/src/client_async.rs b/pubky/src/client_async.rs index de9012c..2fb7bd5 100644 --- a/pubky/src/client_async.rs +++ b/pubky/src/client_async.rs @@ -1,5 +1,7 @@ use std::thread; +use bytes::Bytes; + use pkarr::{Keypair, PublicKey}; use pubky_common::session::Session; @@ -62,4 +64,31 @@ impl PubkyClientAsync { receiver.recv_async().await? } + + /// Async version of [PubkyClient::put] + pub async fn put(&self, pubky: &PublicKey, path: &str, content: &[u8]) -> Result<()> { + let (sender, receiver) = flume::bounded::>(1); + + let client = self.0.clone(); + let pubky = pubky.clone(); + let path = path.to_string(); + let content = content.to_vec(); + + thread::spawn(move || sender.send(client.put(&pubky, &path, &content))); + + receiver.recv_async().await? + } + + /// Async version of [PubkyClient::get] + pub async fn get(&self, pubky: &PublicKey, path: &str) -> Result { + let (sender, receiver) = flume::bounded::>(1); + + let client = self.0.clone(); + let pubky = pubky.clone(); + let path = path.to_string(); + + thread::spawn(move || sender.send(client.get(&pubky, &path))); + + receiver.recv_async().await? + } } diff --git a/pubky/src/error.rs b/pubky/src/error.rs index 026382e..acbad4b 100644 --- a/pubky/src/error.rs +++ b/pubky/src/error.rs @@ -34,3 +34,9 @@ pub enum Error { #[error(transparent)] Session(#[from] pubky_common::session::Error), } + +impl From for Error { + fn from(error: ureq::Error) -> Self { + Error::Ureq(Box::new(error)) + } +} From cc97744f25c8a8e479f08b9231905f5cf6e4ddda Mon Sep 17 00:00:00 2001 From: nazeh Date: Tue, 23 Jul 2024 21:15:41 +0300 Subject: [PATCH 4/4] feat(pubky): get successful --- Cargo.lock | 1 + pubky-homeserver/src/database.rs | 113 +++++++++++++++++- pubky-homeserver/src/database/migrations.rs | 10 +- .../src/database/migrations/m0.rs | 2 +- pubky-homeserver/src/database/tables.rs | 26 ++++ .../src/database/tables/entries.rs | 2 +- pubky-homeserver/src/extractors.rs | 4 + pubky-homeserver/src/routes/public.rs | 80 ++----------- pubky/src/client/public.rs | 29 ++--- 9 files changed, 167 insertions(+), 100 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 74ef877..613c9c4 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1250,6 +1250,7 @@ dependencies = [ name = "pubky" version = "0.1.0" dependencies = [ + "bytes", "flume", "pkarr", "pubky-common", diff --git a/pubky-homeserver/src/database.rs b/pubky-homeserver/src/database.rs index 0eb3200..5bbe6c2 100644 --- a/pubky-homeserver/src/database.rs +++ b/pubky-homeserver/src/database.rs @@ -1,16 +1,23 @@ use std::fs; use std::path::Path; +use bytes::Bytes; use heed::{types::Str, Database, Env, EnvOpenOptions, RwTxn}; mod migrations; pub mod tables; -use migrations::TABLES_COUNT; +use pubky_common::crypto::Hasher; + +use tables::{entries::Entry, Tables, TABLES_COUNT}; + +use pkarr::PublicKey; +use tables::blobs::{BlobsTable, BLOBS_TABLE}; #[derive(Debug, Clone)] pub struct DB { pub(crate) env: Env, + pub(crate) tables: Tables, } impl DB { @@ -19,10 +26,110 @@ impl DB { let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?; - migrations::run(&env); + let tables = migrations::run(&env)?; - let db = DB { env }; + let db = DB { env, tables }; Ok(db) } + + pub fn put_entry( + &mut self, + public_key: &PublicKey, + path: &str, + rx: flume::Receiver, + ) -> anyhow::Result<()> { + let mut wtxn = self.env.write_txn()?; + + let mut hasher = Hasher::new(); + let mut bytes = vec![]; + let mut length = 0; + + while let Ok(chunk) = rx.recv() { + hasher.update(&chunk); + bytes.extend_from_slice(&chunk); + length += chunk.len(); + } + + let hash = hasher.finalize(); + + self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?; + + let mut entry = Entry::new(); + + entry.set_content_hash(hash); + entry.set_content_length(length); + + let mut key = vec![]; + key.extend_from_slice(public_key.as_bytes()); + key.extend_from_slice(path.as_bytes()); + + self.tables.entries.put(&mut wtxn, &key, &entry.serialize()); + + wtxn.commit()?; + + Ok(()) + } + + pub fn get_blob( + &mut self, + public_key: &PublicKey, + path: &str, + ) -> anyhow::Result> { + let mut rtxn = self.env.read_txn()?; + + let mut key = vec![]; + key.extend_from_slice(public_key.as_bytes()); + key.extend_from_slice(path.as_bytes()); + + if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? { + let entry = Entry::deserialize(bytes)?; + + if let Some(blob) = self.tables.blobs.get(&rtxn, entry.content_hash())? { + return Ok(Some(Bytes::from(blob.to_vec()))); + }; + }; + + Ok(None) + } +} + +#[cfg(test)] +mod tests { + use pkarr::Keypair; + use pubky_common::timestamp::Timestamp; + + use crate::config::Config; + + use super::{Bytes, DB}; + + #[tokio::test] + async fn entries() { + let storage = std::env::temp_dir() + .join(Timestamp::now().to_string()) + .join("pubky"); + + let mut db = DB::open(&storage).unwrap(); + + let keypair = Keypair::random(); + let path = "/pub/foo.txt"; + + let (tx, rx) = flume::bounded::(0); + + let mut cloned = db.clone(); + let cloned_keypair = keypair.clone(); + + let done = tokio::task::spawn_blocking(move || { + cloned.put_entry(&cloned_keypair.public_key(), path, rx); + }); + + tx.send(vec![1, 2, 3, 4, 5].into()); + drop(tx); + + done.await; + + let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap(); + + assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5])); + } } diff --git a/pubky-homeserver/src/database/migrations.rs b/pubky-homeserver/src/database/migrations.rs index dbead07..32f2909 100644 --- a/pubky-homeserver/src/database/migrations.rs +++ b/pubky-homeserver/src/database/migrations.rs @@ -2,16 +2,16 @@ use heed::{types::Str, Database, Env, RwTxn}; mod m0; -use super::tables; +use super::tables::Tables; -pub const TABLES_COUNT: u32 = 4; - -pub fn run(env: &Env) -> anyhow::Result<()> { +pub fn run(env: &Env) -> anyhow::Result { let mut wtxn = env.write_txn()?; m0::run(env, &mut wtxn); + let tables = Tables::new(env, &mut wtxn)?; + wtxn.commit()?; - Ok(()) + Ok(tables) } diff --git a/pubky-homeserver/src/database/migrations/m0.rs b/pubky-homeserver/src/database/migrations/m0.rs index 74d89c4..d7dac79 100644 --- a/pubky-homeserver/src/database/migrations/m0.rs +++ b/pubky-homeserver/src/database/migrations/m0.rs @@ -1,6 +1,6 @@ use heed::{types::Str, Database, Env, RwTxn}; -use super::tables::{blobs, entries, sessions, users}; +use crate::database::tables::{blobs, entries, sessions, users}; pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> { let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?; diff --git a/pubky-homeserver/src/database/tables.rs b/pubky-homeserver/src/database/tables.rs index 4f0c1c5..a019fbe 100644 --- a/pubky-homeserver/src/database/tables.rs +++ b/pubky-homeserver/src/database/tables.rs @@ -2,3 +2,29 @@ pub mod blobs; pub mod entries; pub mod sessions; pub mod users; + +use heed::{Env, RwTxn}; + +use blobs::{BlobsTable, BLOBS_TABLE}; +use entries::{EntriesTable, ENTRIES_TABLE}; + +pub const TABLES_COUNT: u32 = 4; + +#[derive(Debug, Clone)] +pub struct Tables { + pub blobs: BlobsTable, + pub entries: EntriesTable, +} + +impl Tables { + pub fn new(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result { + Ok(Self { + blobs: env + .open_database(wtxn, Some(BLOBS_TABLE))? + .expect("Blobs table already created"), + entries: env + .open_database(wtxn, Some(ENTRIES_TABLE))? + .expect("Entries table already created"), + }) + } +} diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs index 1d2028e..7c9e2e3 100644 --- a/pubky-homeserver/src/database/tables/entries.rs +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -78,6 +78,6 @@ impl Entry { panic!("Unknown Entry version"); } - Ok(from_bytes(bytes)?) + from_bytes(bytes) } } diff --git a/pubky-homeserver/src/extractors.rs b/pubky-homeserver/src/extractors.rs index e7192db..b89911c 100644 --- a/pubky-homeserver/src/extractors.rs +++ b/pubky-homeserver/src/extractors.rs @@ -49,6 +49,10 @@ where pub struct EntryPath(pub(crate) String); impl EntryPath { + pub fn as_str(&self) -> &str { + self.0.as_str() + } + pub fn as_bytes(&self) -> &[u8] { self.0.as_bytes() } diff --git a/pubky-homeserver/src/routes/public.rs b/pubky-homeserver/src/routes/public.rs index 38c2741..cb07cee 100644 --- a/pubky-homeserver/src/routes/public.rs +++ b/pubky-homeserver/src/routes/public.rs @@ -23,7 +23,7 @@ use crate::{ }; pub async fn put( - State(state): State, + State(mut state): State, pubky: Pubky, path: EntryPath, mut body: Body, @@ -45,43 +45,7 @@ pub async fn put( // TODO: Authorize - let mut wtxn = state.db.env.write_txn()?; - let blobs: BlobsTable = state - .db - .env - .open_database(&wtxn, Some(BLOBS_TABLE))? - .expect("Blobs table already created"); - - let entries: EntriesTable = state - .db - .env - .open_database(&wtxn, Some(ENTRIES_TABLE))? - .expect("Entries table already created"); - - let mut hasher = Hasher::new(); - let mut bytes = vec![]; - let mut length = 0; - - while let Ok(chunk) = rx.recv() { - hasher.update(&chunk); - bytes.extend_from_slice(&chunk); - length += chunk.len(); - } - - let hash = hasher.finalize(); - - blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?; - - let mut entry = Entry::new(); - - entry.set_content_hash(hash); - entry.set_content_length(length); - - let mut key = vec![]; - key.extend_from_slice(public_key.as_bytes()); - key.extend_from_slice(path.as_bytes()); - - entries.put(&mut wtxn, &key, &entry.serialize()); + state.db.put_entry(public_key, path.as_str(), rx); Ok(()) }); @@ -101,7 +65,7 @@ pub async fn put( } pub async fn get( - State(state): State, + State(mut state): State, pubky: Pubky, path: EntryPath, ) -> Result { @@ -111,39 +75,9 @@ pub async fn get( let public_key = pubky.public_key(); - let mut rtxn = state.db.env.read_txn()?; - - let entries: EntriesTable = state - .db - .env - .open_database(&rtxn, Some(ENTRIES_TABLE))? - .expect("Entries table already created"); - - let blobs: BlobsTable = state - .db - .env - .open_database(&rtxn, Some(BLOBS_TABLE))? - .expect("Blobs table already created"); - - let mut count = 0; - - for x in entries.iter(&rtxn)? { - count += 1 + match state.db.get_blob(public_key, path.as_str()) { + Err(error) => Err(error)?, + Ok(Some(bytes)) => Ok(bytes), + Ok(None) => Err(Error::with_status(StatusCode::NOT_FOUND)), } - - return Err(Error::new(StatusCode::NOT_FOUND, count.to_string().into())); - - let mut key = vec![]; - key.extend_from_slice(public_key.as_bytes()); - key.extend_from_slice(path.as_bytes()); - - if let Some(bytes) = entries.get(&rtxn, &key)? { - let entry = Entry::deserialize(bytes)?; - - if let Some(blob) = blobs.get(&rtxn, entry.content_hash())? { - return Ok(blob.to_vec()); - }; - }; - - Err(Error::new(StatusCode::NOT_FOUND, path.0.into())) } diff --git a/pubky/src/client/public.rs b/pubky/src/client/public.rs index 1600c92..7599e8f 100644 --- a/pubky/src/client/public.rs +++ b/pubky/src/client/public.rs @@ -27,15 +27,7 @@ impl PubkyClient { url.set_path(&format!("/{pubky}/{path}")); - let result = self.request(super::HttpMethod::Get, &url).call(); - - if let Err(error) = result { - dbg!(&error); - - return Err(error)?; - } - - let response = result.unwrap(); + let response = self.request(super::HttpMethod::Get, &url).call()?; let len = response .header("Content-Length") @@ -45,7 +37,7 @@ impl PubkyClient { // TODO: bail on too large files. - let mut bytes = Vec::with_capacity(len as usize); + let mut bytes = vec![0; len as usize]; response.into_reader().read_exact(&mut bytes); @@ -102,14 +94,17 @@ mod tests { } } - let response = client.get(&keypair.public_key(), "/pub/foo.txt").await; + let response = client + .get(&keypair.public_key(), "/pub/foo.txt") + .await + .unwrap(); - if let Err(Error::Ureq(ureqerror)) = response { - if let Some(r) = ureqerror.into_response() { - dbg!(r.into_string()); - } - } + // if let Err(Error::Ureq(ureqerror)) = response { + // if let Some(r) = ureqerror.into_response() { + // dbg!(r.into_string()); + // } + // } - // dbg!(response); + assert_eq!(response, bytes::Bytes::from(vec![0, 1, 2, 3, 4])) } }