From 401872a61fa24ee89fc695e55c27d2b2a7b9aa4b Mon Sep 17 00:00:00 2001
From: nazeh <ar.nazeh@gmail.com>
Date: Tue, 23 Jul 2024 11:26:12 +0300
Subject: [PATCH 1/4] feat(homeserver): stream incoming body

---
 Cargo.lock                                    | 17 +++++
 pubky-common/src/crypto.rs                    |  2 +
 pubky-homeserver/Cargo.toml                   |  4 +-
 pubky-homeserver/src/config.rs                | 15 +++--
 pubky-homeserver/src/database.rs              | 15 +----
 pubky-homeserver/src/database/migrations.rs   | 16 ++---
 .../src/database/migrations/m0.rs             | 15 +++++
 pubky-homeserver/src/database/tables.rs       |  2 +
 pubky-homeserver/src/database/tables/blobs.rs | 13 ++++
 .../src/database/tables/entries.rs            | 66 +++++++++++++++++++
 pubky-homeserver/src/routes.rs                |  6 +-
 pubky-homeserver/src/routes/auth.rs           |  7 +-
 pubky-homeserver/src/routes/drive.rs          | 60 +++++++++++++++--
 pubky-homeserver/src/server.rs                |  4 +-
 14 files changed, 205 insertions(+), 37 deletions(-)
 create mode 100644 pubky-homeserver/src/database/migrations/m0.rs
 create mode 100644 pubky-homeserver/src/database/tables/blobs.rs
 create mode 100644 pubky-homeserver/src/database/tables/entries.rs

diff --git a/Cargo.lock b/Cargo.lock
index 26bae45..74ef877 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -142,6 +142,8 @@ dependencies = [
  "mime",
  "pin-project-lite",
  "serde",
+ "tokio",
+ "tokio-util",
  "tower",
  "tower-layer",
  "tower-service",
@@ -1283,6 +1285,8 @@ dependencies = [
  "base32",
  "bytes",
  "dirs-next",
+ "flume",
+ "futures-util",
  "heed",
  "pkarr",
  "postcard",
@@ -1826,6 +1830,19 @@ dependencies = [
  "syn",
 ]
 
+[[package]]
+name = "tokio-util"
+version = "0.7.11"
+source = "registry+https://github.com/rust-lang/crates.io-index"
+checksum = "9cf6b47b3771c49ac75ad09a6162f53ad4b8088b76ac60e8ec1455b31a189fe1"
+dependencies = [
+ "bytes",
+ "futures-core",
+ "futures-sink",
+ "pin-project-lite",
+ "tokio",
+]
+
 [[package]]
 name = "tower"
 version = "0.4.13"
diff --git a/pubky-common/src/crypto.rs b/pubky-common/src/crypto.rs
index 2f8131c..ec8f58a 100644
--- a/pubky-common/src/crypto.rs
+++ b/pubky-common/src/crypto.rs
@@ -8,6 +8,8 @@ pub type Hash = blake3::Hash;
 
 pub use blake3::hash;
 
+pub use blake3::Hasher;
+
 pub fn random_hash() -> Hash {
     let mut rng = rand::thread_rng();
     Hash::from_bytes(rng.gen())
diff --git a/pubky-homeserver/Cargo.toml b/pubky-homeserver/Cargo.toml
index 33e18ab..da0c5c7 100644
--- a/pubky-homeserver/Cargo.toml
+++ b/pubky-homeserver/Cargo.toml
@@ -6,10 +6,12 @@ edition = "2021"
 [dependencies]
 anyhow = "1.0.82"
 axum = "0.7.5"
-axum-extra = { version = "0.9.3", features = ["typed-header"] }
+axum-extra = { version = "0.9.3", features = ["typed-header", "async-read-body"] }
 base32 = "0.5.1"
 bytes = "1.6.1"
 dirs-next = "2.0.0"
+flume = "0.11.0"
+futures-util = "0.3.30"
 heed = "0.20.3"
 pkarr = { version = "2.1.0", features = ["async"] }
 postcard = { version = "1.0.8", features = ["alloc"] }
diff --git a/pubky-homeserver/src/config.rs b/pubky-homeserver/src/config.rs
index 7c9fcfe..3657ecd 100644
--- a/pubky-homeserver/src/config.rs
+++ b/pubky-homeserver/src/config.rs
@@ -40,13 +40,16 @@ impl Config {
 
     /// Test configurations
     pub fn test(testnet: &pkarr::mainline::Testnet) -> Self {
+        let bootstrap = Some(testnet.bootstrap.to_owned());
+        let storage = Some(
+            std::env::temp_dir()
+                .join(Timestamp::now().to_string())
+                .join(DEFAULT_STORAGE_DIR),
+        );
+
         Self {
-            bootstrap: Some(testnet.bootstrap.to_owned()),
-            storage: Some(
-                std::env::temp_dir()
-                    .join(Timestamp::now().to_string())
-                    .join(DEFAULT_STORAGE_DIR),
-            ),
+            bootstrap,
+            storage,
             ..Default::default()
         }
     }
diff --git a/pubky-homeserver/src/database.rs b/pubky-homeserver/src/database.rs
index 2f8d591..0eb3200 100644
--- a/pubky-homeserver/src/database.rs
+++ b/pubky-homeserver/src/database.rs
@@ -19,21 +19,10 @@ impl DB {
 
         let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?;
 
-        let db = DB { env };
+        migrations::run(&env);
 
-        db.run_migrations();
+        let db = DB { env };
 
         Ok(db)
     }
-
-    fn run_migrations(&self) -> anyhow::Result<()> {
-        let mut wtxn = self.env.write_txn()?;
-
-        migrations::create_users_table(&self.env, &mut wtxn);
-        migrations::create_sessions_table(&self.env, &mut wtxn);
-
-        wtxn.commit()?;
-
-        Ok(())
-    }
 }
diff --git a/pubky-homeserver/src/database/migrations.rs b/pubky-homeserver/src/database/migrations.rs
index 93c7631..dbead07 100644
--- a/pubky-homeserver/src/database/migrations.rs
+++ b/pubky-homeserver/src/database/migrations.rs
@@ -1,19 +1,17 @@
 use heed::{types::Str, Database, Env, RwTxn};
 
+mod m0;
+
 use super::tables;
 
-pub const TABLES_COUNT: u32 = 2;
+pub const TABLES_COUNT: u32 = 4;
 
-pub fn create_users_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
-    let _: tables::users::UsersTable =
-        env.create_database(wtxn, Some(tables::users::USERS_TABLE))?;
+pub fn run(env: &Env) -> anyhow::Result<()> {
+    let mut wtxn = env.write_txn()?;
 
-    Ok(())
-}
+    m0::run(env, &mut wtxn);
 
-pub fn create_sessions_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
-    let _: tables::sessions::SessionsTable =
-        env.create_database(wtxn, Some(tables::sessions::SESSIONS_TABLE))?;
+    wtxn.commit()?;
 
     Ok(())
 }
diff --git a/pubky-homeserver/src/database/migrations/m0.rs b/pubky-homeserver/src/database/migrations/m0.rs
new file mode 100644
index 0000000..74d89c4
--- /dev/null
+++ b/pubky-homeserver/src/database/migrations/m0.rs
@@ -0,0 +1,15 @@
+use heed::{types::Str, Database, Env, RwTxn};
+
+use super::tables::{blobs, entries, sessions, users};
+
+pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
+    let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;
+
+    let _: sessions::SessionsTable = env.create_database(wtxn, Some(sessions::SESSIONS_TABLE))?;
+
+    let _: blobs::BlobsTable = env.create_database(wtxn, Some(blobs::BLOBS_TABLE))?;
+
+    let _: entries::EntriesTable = env.create_database(wtxn, Some(entries::ENTRIES_TABLE))?;
+
+    Ok(())
+}
diff --git a/pubky-homeserver/src/database/tables.rs b/pubky-homeserver/src/database/tables.rs
index b6e3efc..4f0c1c5 100644
--- a/pubky-homeserver/src/database/tables.rs
+++ b/pubky-homeserver/src/database/tables.rs
@@ -1,2 +1,4 @@
+pub mod blobs;
+pub mod entries;
 pub mod sessions;
 pub mod users;
diff --git a/pubky-homeserver/src/database/tables/blobs.rs b/pubky-homeserver/src/database/tables/blobs.rs
new file mode 100644
index 0000000..9cf1da1
--- /dev/null
+++ b/pubky-homeserver/src/database/tables/blobs.rs
@@ -0,0 +1,13 @@
+use std::{borrow::Cow, time::SystemTime};
+
+use heed::{
+    types::{Bytes, Str},
+    BoxedError, BytesDecode, BytesEncode, Database,
+};
+
+use pubky_common::crypto::Hash;
+
+/// hash of the blob => bytes.
+pub type BlobsTable = Database<Hash, Bytes>;
+
+pub const BLOBS_TABLE: &str = "blobs";
diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs
new file mode 100644
index 0000000..5a1cc8e
--- /dev/null
+++ b/pubky-homeserver/src/database/tables/entries.rs
@@ -0,0 +1,66 @@
+use postcard::{from_bytes, to_allocvec};
+use serde::{Deserialize, Serialize};
+use std::{borrow::Cow, time::SystemTime};
+
+use heed::{
+    types::{Bytes, Str},
+    BoxedError, BytesDecode, BytesEncode, Database,
+};
+
+use pubky_common::crypto::Hash;
+
+/// full_path(pubky/*path) => Entry.
+pub type EntriesTable = Database<Hash, Entry>;
+
+pub const ENTRIES_TABLE: &str = "entries";
+
+#[derive(Clone, Default, Serialize, Deserialize, Debug, Eq, PartialEq)]
+pub struct Entry {
+    /// Encoding version
+    version: usize,
+    /// Modified at
+    timestamp: u64,
+    content_hash: [u8; 32],
+    content_length: usize,
+    content_type: String,
+    // user_metadata: ?
+}
+
+// TODO: get headers like Etag
+
+impl Entry {
+    pub fn new() -> Self {
+        Default::default()
+    }
+
+    // === Setters ===
+
+    pub fn set_content_hash(&mut self, content_hash: Hash) -> &mut Self {
+        content_hash.as_bytes().clone_into(&mut self.content_hash);
+        self
+    }
+
+    pub fn set_content_length(&mut self, content_length: usize) -> &mut Self {
+        self.content_length = content_length;
+        self
+    }
+
+    pub fn set_content_type(&mut self, content_type: &str) -> &mut Self {
+        self.content_type = content_type.to_string();
+        self
+    }
+
+    // === Getters ===
+
+    pub fn content_hash(&self) -> &[u8; 32] {
+        &self.content_hash
+    }
+
+    pub fn content_length(&self) -> usize {
+        self.content_length
+    }
+
+    pub fn content_type(&self) -> &str {
+        &self.content_type
+    }
+}
diff --git a/pubky-homeserver/src/routes.rs b/pubky-homeserver/src/routes.rs
index 86120c2..6099858 100644
--- a/pubky-homeserver/src/routes.rs
+++ b/pubky-homeserver/src/routes.rs
@@ -1,4 +1,5 @@
 use axum::{
+    extract::DefaultBodyLimit,
     routing::{delete, get, post, put},
     Router,
 };
@@ -18,8 +19,11 @@ pub fn create_app(state: AppState) -> Router {
         .route("/:pubky/session", get(auth::session))
         .route("/:pubky/session", post(auth::signin))
         .route("/:pubky/session", delete(auth::signout))
-        .route("/:pubky/*key", get(drive::put))
+        .route("/:pubky/*key", put(drive::put))
         .layer(TraceLayer::new_for_http())
         .layer(CookieManagerLayer::new())
+        // TODO: revisit if we enable streaming big payloads
+        // TODO: maybe add to a separate router (drive router?).
+        .layer(DefaultBodyLimit::max(16 * 1024))
         .with_state(state)
 }
diff --git a/pubky-homeserver/src/routes/auth.rs b/pubky-homeserver/src/routes/auth.rs
index fceb6fe..c38aa38 100644
--- a/pubky-homeserver/src/routes/auth.rs
+++ b/pubky-homeserver/src/routes/auth.rs
@@ -2,7 +2,6 @@ use axum::{
     extract::{Request, State},
     http::{HeaderMap, StatusCode},
     response::IntoResponse,
-    routing::get,
     Router,
 };
 use axum_extra::{headers::UserAgent, TypedHeader};
@@ -103,7 +102,11 @@ pub async fn signin(
     state.verifier.verify(&body, public_key)?;
 
     let mut wtxn = state.db.env.write_txn()?;
-    let users: UsersTable = state.db.env.create_database(&mut wtxn, Some(USERS_TABLE))?;
+    let users: UsersTable = state
+        .db
+        .env
+        .open_database(&wtxn, Some(USERS_TABLE))?
+        .expect("Users table already created");
 
     if let Some(existing) = users.get(&wtxn, public_key)? {
         users.put(&mut wtxn, public_key, &existing)?;
diff --git a/pubky-homeserver/src/routes/drive.rs b/pubky-homeserver/src/routes/drive.rs
index 3050250..12f8fb8 100644
--- a/pubky-homeserver/src/routes/drive.rs
+++ b/pubky-homeserver/src/routes/drive.rs
@@ -1,11 +1,63 @@
-use axum::response::IntoResponse;
+use axum::{
+    body::{Body, Bytes},
+    extract::{Path, State},
+    http::StatusCode,
+    response::IntoResponse,
+    RequestExt, Router,
+};
+use futures_util::stream::StreamExt;
 
 use tracing::debug;
 
-use crate::extractors::Pubky;
+use pubky_common::crypto::Hasher;
 
-pub async fn put(pubky: Pubky) -> Result<impl IntoResponse, String> {
-    debug!(pubky=?pubky.public_key());
+use crate::{
+    database::tables::blobs::{BlobsTable, BLOBS_TABLE},
+    error::{Error, Result},
+    extractors::Pubky,
+    server::AppState,
+};
+
+pub async fn put(
+    State(state): State<AppState>,
+    pubky: Pubky,
+    // Path(key): Path<String>,
+    mut body: Body,
+) -> Result<impl IntoResponse> {
+    let mut stream = body.into_data_stream();
+
+    let (tx, rx) = flume::bounded::<Bytes>(1);
+
+    // Offload the write transaction to a blocking task
+    let done = tokio::task::spawn_blocking(move || {
+        // TODO: this is a blocking operation, which is ok for small
+        // payloads (we have 16 kb limit for now) but later we need
+        // to stream this to filesystem, and keep track of any failed
+        // writes to GC these files later.
+
+        let mut wtxn = state.db.env.write_txn().unwrap();
+        let blobs: BlobsTable = state
+            .db
+            .env
+            .open_database(&wtxn, Some(BLOBS_TABLE))
+            .unwrap()
+            .expect("Blobs table already created");
+
+        let hasher = Hasher::new();
+
+        while let Ok(chunk) = rx.recv() {
+            dbg!(chunk);
+        }
+    });
+
+    while let Some(next) = stream.next().await {
+        let chunk = next
+            .map_err(|err| Error::new(StatusCode::INTERNAL_SERVER_ERROR, Some(err.to_string())))?;
+
+        tx.send(chunk);
+    }
+
+    let _ = done.await;
 
     Ok("Pubky drive...".to_string())
 }
diff --git a/pubky-homeserver/src/server.rs b/pubky-homeserver/src/server.rs
index f167d05..0a2f3ae 100644
--- a/pubky-homeserver/src/server.rs
+++ b/pubky-homeserver/src/server.rs
@@ -3,7 +3,7 @@ use std::{future::IntoFuture, net::SocketAddr};
 use anyhow::{Error, Result};
 use pubky_common::auth::AuthnVerifier;
 use tokio::{net::TcpListener, signal, task::JoinSet};
-use tracing::{info, warn};
+use tracing::{debug, info, warn};
 
 use pkarr::{
     mainline::dht::{DhtSettings, Testnet},
@@ -27,6 +27,8 @@ pub(crate) struct AppState {
 
 impl Homeserver {
     pub async fn start(config: Config) -> Result<Self> {
+        debug!(?config);
+
         let public_key = config.keypair().public_key();
 
         let db = DB::open(&config.storage()?)?;

From 62cc13bca489e568882757ed851e06841ec3258a Mon Sep 17 00:00:00 2001
From: nazeh <ar.nazeh@gmail.com>
Date: Tue, 23 Jul 2024 11:35:53 +0300
Subject: [PATCH 2/4] refactor(pubky): separate PubkyClient implementation into
 modules

---
 pubky/src/client.rs       | 245 +-------------------------------------
 pubky/src/client/auth.rs  | 122 +++++++++++++++++++
 pubky/src/client/pkarr.rs | 179 ++++++++++++++++++++++++++++
 pubky/src/lib.rs          |  51 --------
 4 files changed, 305 insertions(+), 292 deletions(-)
 create mode 100644 pubky/src/client/auth.rs
 create mode 100644 pubky/src/client/pkarr.rs

diff --git a/pubky/src/client.rs b/pubky/src/client.rs
index e353e98..69f418c 100644
--- a/pubky/src/client.rs
+++ b/pubky/src/client.rs
@@ -1,18 +1,14 @@
+mod auth;
+mod pkarr;
+
 use std::{collections::HashMap, fmt::format, time::Duration};
 
-use pkarr::{
-    dns::{rdata::SVCB, Packet},
-    mainline::{dht::DhtSettings, Testnet},
-    Keypair, PkarrClient, PublicKey, Settings, SignedPacket,
-};
 use ureq::{Agent, Response};
 use url::Url;
 
-use pubky_common::{auth::AuthnSignature, session::Session};
-
 use crate::error::{Error, Result};
 
-const MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION: u8 = 3;
+use pkarr::{DhtSettings, PkarrClient, Settings, Testnet};
 
 #[derive(Debug, Clone)]
 pub struct PubkyClient {
@@ -45,174 +41,8 @@ impl PubkyClient {
 
     // === Public Methods ===
 
-    /// Signup to a homeserver and update Pkarr accordingly.
-    ///
-    /// The homeserver is a Pkarr domain name, where the TLD is a Pkarr public key
-    /// for example "pubky.o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy"
-    pub fn signup(&self, keypair: &Keypair, homeserver: &str) -> Result<()> {
-        let (audience, mut url) = self.resolve_endpoint(homeserver)?;
-
-        url.set_path(&format!("/{}", keypair.public_key()));
-
-        self.request(HttpMethod::Put, &url)
-            .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())
-            .map_err(Box::new)?;
-
-        self.publish_pubky_homeserver(keypair, homeserver);
-
-        Ok(())
-    }
-
-    /// Check the current sesison for a given Pubky in its homeserver.
-    pub fn session(&self, pubky: &PublicKey) -> Result<Session> {
-        let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?;
-
-        url.set_path(&format!("/{}/session", pubky));
-
-        let mut bytes = vec![];
-
-        let result = self.request(HttpMethod::Get, &url).call().map_err(Box::new);
-
-        if let Ok(reader) = result {
-            reader.into_reader().read_to_end(&mut bytes);
-        } else {
-            return Err(Error::NotSignedIn);
-        }
-
-        Ok(Session::deserialize(&bytes)?)
-    }
-
-    /// Signout from a homeserver.
-    pub fn signout(&self, pubky: &PublicKey) -> Result<()> {
-        let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?;
-
-        url.set_path(&format!("/{}/session", pubky));
-
-        self.request(HttpMethod::Delete, &url)
-            .call()
-            .map_err(Box::new)?;
-
-        Ok(())
-    }
-
-    /// Signin to a homeserver.
-    pub fn signin(&self, keypair: &Keypair) -> Result<()> {
-        let pubky = keypair.public_key();
-
-        let (audience, mut url) = self.resolve_pubky_homeserver(&pubky)?;
-
-        url.set_path(&format!("/{}/session", &pubky));
-
-        self.request(HttpMethod::Post, &url)
-            .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())
-            .map_err(Box::new)?;
-
-        Ok(())
-    }
-
     // === Private Methods ===
 
-    /// Publish the SVCB record for `_pubky.<public_key>`.
-    pub(crate) fn publish_pubky_homeserver(&self, keypair: &Keypair, host: &str) -> Result<()> {
-        let mut packet = Packet::new_reply(0);
-
-        if let Some(existing) = self.pkarr.resolve(&keypair.public_key())? {
-            for answer in existing.packet().answers.iter().cloned() {
-                if !answer.name.to_string().starts_with("_pubky") {
-                    packet.answers.push(answer.into_owned())
-                }
-            }
-        }
-
-        let svcb = SVCB::new(0, host.try_into()?);
-
-        packet.answers.push(pkarr::dns::ResourceRecord::new(
-            "_pubky".try_into().unwrap(),
-            pkarr::dns::CLASS::IN,
-            60 * 60,
-            pkarr::dns::rdata::RData::SVCB(svcb),
-        ));
-
-        let signed_packet = SignedPacket::from_packet(keypair, &packet)?;
-
-        self.pkarr.publish(&signed_packet)?;
-
-        Ok(())
-    }
-
-    /// Resolve the homeserver for a pubky.
-    pub(crate) fn resolve_pubky_homeserver(&self, pubky: &PublicKey) -> Result<(PublicKey, Url)> {
-        let target = format!("_pubky.{}", pubky);
-
-        self.resolve_endpoint(&target)
-            .map_err(|_| Error::Generic("Could not resolve homeserver".to_string()))
-    }
-
-    /// Resolve a service's public_key and clearnet url from a Pubky domain
-    fn resolve_endpoint(&self, target: &str) -> Result<(PublicKey, Url)> {
-        // TODO: cache the result of this function?
-        // TODO: use MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION
-        // TODO: move to common?
-
-        let mut target = target.to_string();
-        let mut homeserver_public_key = None;
-        let mut host = target.clone();
-
-        // PublicKey is very good at extracting the Pkarr TLD from a string.
-        while let Ok(public_key) = PublicKey::try_from(target.clone()) {
-            if let Some(signed_packet) = self.pkarr.resolve(&public_key)? {
-                let mut prior = None;
-
-                for answer in signed_packet.resource_records(&target) {
-                    if let pkarr::dns::rdata::RData::SVCB(svcb) = &answer.rdata {
-                        if svcb.priority == 0 {
-                            prior = Some(svcb)
-                        } else if let Some(sofar) = prior {
-                            if svcb.priority >= sofar.priority {
-                                prior = Some(svcb)
-                            }
-                            // TODO return random if priority is the same
-                        } else {
-                            prior = Some(svcb)
-                        }
-                    }
-                }
-
-                if let Some(svcb) = prior {
-                    homeserver_public_key = Some(public_key);
-                    target = svcb.target.to_string();
-
-                    if let Some(port) = svcb.get_param(pkarr::dns::rdata::SVCB::PORT) {
-                        if port.len() < 2 {
-                            // TODO: debug! Error encoding port!
-                        }
-                        let port = u16::from_be_bytes([port[0], port[1]]);
-
-                        host = format!("{target}:{port}");
-                    } else {
-                        host.clone_from(&target);
-                    };
-
-                    continue;
-                }
-            };
-
-            break;
-        }
-
-        if let Some(homeserver) = homeserver_public_key {
-            let url = if host.starts_with("localhost") {
-                format!("http://{host}")
-            } else {
-                format!("https://{host}")
-            };
-
-            return Ok((homeserver, Url::parse(&url)?));
-        }
-
-        Err(Error::Generic("Could not resolve endpoint".to_string()))
-    }
-
     fn request(&self, method: HttpMethod, url: &Url) -> ureq::Request {
         self.agent.request_url(method.into(), url)
     }
@@ -242,70 +72,3 @@ impl From<HttpMethod> for &str {
         }
     }
 }
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    use pkarr::{
-        dns::{rdata::SVCB, Packet},
-        mainline::{dht::DhtSettings, Testnet},
-        Keypair, PkarrClient, Settings, SignedPacket,
-    };
-    use pubky_homeserver::Homeserver;
-
-    #[tokio::test]
-    async fn resolve_homeserver() {
-        let testnet = Testnet::new(3);
-        let server = Homeserver::start_test(&testnet).await.unwrap();
-
-        // Publish an intermediate controller of the homeserver
-        let pkarr_client = PkarrClient::new(Settings {
-            dht: DhtSettings {
-                bootstrap: Some(testnet.bootstrap.clone()),
-                ..Default::default()
-            },
-            ..Default::default()
-        })
-        .unwrap()
-        .as_async();
-
-        let intermediate = Keypair::random();
-
-        let mut packet = Packet::new_reply(0);
-
-        let server_tld = server.public_key().to_string();
-
-        let mut svcb = SVCB::new(0, server_tld.as_str().try_into().unwrap());
-
-        packet.answers.push(pkarr::dns::ResourceRecord::new(
-            "pubky".try_into().unwrap(),
-            pkarr::dns::CLASS::IN,
-            60 * 60,
-            pkarr::dns::rdata::RData::SVCB(svcb),
-        ));
-
-        let signed_packet = SignedPacket::from_packet(&intermediate, &packet).unwrap();
-
-        pkarr_client.publish(&signed_packet).await.unwrap();
-
-        tokio::task::spawn_blocking(move || {
-            let client = PubkyClient::test(&testnet);
-
-            let pubky = Keypair::random();
-
-            client
-                .publish_pubky_homeserver(&pubky, &format!("pubky.{}", &intermediate.public_key()));
-
-            let (public_key, url) = client
-                .resolve_pubky_homeserver(&pubky.public_key())
-                .unwrap();
-
-            assert_eq!(public_key, server.public_key());
-            assert_eq!(url.host_str(), Some("localhost"));
-            assert_eq!(url.port(), Some(server.port()));
-        })
-        .await
-        .expect("task failed")
-    }
-}
diff --git a/pubky/src/client/auth.rs b/pubky/src/client/auth.rs
new file mode 100644
index 0000000..25b679c
--- /dev/null
+++ b/pubky/src/client/auth.rs
@@ -0,0 +1,122 @@
+use crate::PubkyClient;
+
+use pubky_common::{auth::AuthnSignature, session::Session};
+
+use super::{Error, HttpMethod, Result};
+use pkarr::{Keypair, PublicKey};
+
+impl PubkyClient {
+    /// Signup to a homeserver and update Pkarr accordingly.
+    ///
+    /// The homeserver is a Pkarr domain name, where the TLD is a Pkarr public key
+    /// for example "pubky.o4dksfbqk85ogzdb5osziw6befigbuxmuxkuxq8434q89uj56uyy"
+    pub fn signup(&self, keypair: &Keypair, homeserver: &str) -> Result<()> {
+        let (audience, mut url) = self.resolve_endpoint(homeserver)?;
+
+        url.set_path(&format!("/{}", keypair.public_key()));
+
+        self.request(HttpMethod::Put, &url)
+            .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())
+            .map_err(Box::new)?;
+
+        self.publish_pubky_homeserver(keypair, homeserver);
+
+        Ok(())
+    }
+
+    /// Check the current sesison for a given Pubky in its homeserver.
+    pub fn session(&self, pubky: &PublicKey) -> Result<Session> {
+        let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?;
+
+        url.set_path(&format!("/{}/session", pubky));
+
+        let mut bytes = vec![];
+
+        let result = self.request(HttpMethod::Get, &url).call().map_err(Box::new);
+
+        if let Ok(reader) = result {
+            reader.into_reader().read_to_end(&mut bytes);
+        } else {
+            return Err(Error::NotSignedIn);
+        }
+
+        Ok(Session::deserialize(&bytes)?)
+    }
+
+    /// Signout from a homeserver.
+    pub fn signout(&self, pubky: &PublicKey) -> Result<()> {
+        let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?;
+
+        url.set_path(&format!("/{}/session", pubky));
+
+        self.request(HttpMethod::Delete, &url)
+            .call()
+            .map_err(Box::new)?;
+
+        Ok(())
+    }
+
+    /// Signin to a homeserver.
+    pub fn signin(&self, keypair: &Keypair) -> Result<()> {
+        let pubky = keypair.public_key();
+
+        let (audience, mut url) = self.resolve_pubky_homeserver(&pubky)?;
+
+        url.set_path(&format!("/{}/session", &pubky));
+
+        self.request(HttpMethod::Post, &url)
+            .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())
+            .map_err(Box::new)?;
+
+        Ok(())
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use crate::*;
+
+    use pkarr::{mainline::Testnet, Keypair};
+    use pubky_common::session::Session;
+    use pubky_homeserver::Homeserver;
+
+    #[tokio::test]
+    async fn basic_authn() {
+        let testnet = Testnet::new(3);
+        let server = Homeserver::start_test(&testnet).await.unwrap();
+
+        let client = PubkyClient::test(&testnet).as_async();
+
+        let keypair = Keypair::random();
+
+        client
+            .signup(&keypair, &server.public_key().to_string())
+            .await
+            .unwrap();
+
+        let session = client.session(&keypair.public_key()).await.unwrap();
+
+        assert_eq!(session, Session { ..session.clone() });
+
+        client.signout(&keypair.public_key()).await.unwrap();
+
+        {
+            let session = client.session(&keypair.public_key()).await;
+
+            assert!(session.is_err());
+
+            match session {
+                Err(Error::NotSignedIn) => {}
+                _ => assert!(false, "expected NotSignedInt error"),
+            }
+        }
+
+        client.signin(&keypair).await.unwrap();
+
+        {
+            let session = client.session(&keypair.public_key()).await.unwrap();
+
+            assert_eq!(session, Session { ..session.clone() });
+        }
+    }
+}
diff --git a/pubky/src/client/pkarr.rs b/pubky/src/client/pkarr.rs
new file mode 100644
index 0000000..c036527
--- /dev/null
+++ b/pubky/src/client/pkarr.rs
@@ -0,0 +1,179 @@
+pub use pkarr::{
+    dns::{rdata::SVCB, Packet},
+    mainline::{dht::DhtSettings, Testnet},
+    Keypair, PkarrClient, PublicKey, Settings, SignedPacket,
+};
+
+use super::{Error, PubkyClient, Result, Url};
+
+const MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION: u8 = 3;
+
+impl PubkyClient {
+    /// Publish the SVCB record for `_pubky.<public_key>`.
+    pub(crate) fn publish_pubky_homeserver(&self, keypair: &Keypair, host: &str) -> Result<()> {
+        let mut packet = Packet::new_reply(0);
+
+        if let Some(existing) = self.pkarr.resolve(&keypair.public_key())? {
+            for answer in existing.packet().answers.iter().cloned() {
+                if !answer.name.to_string().starts_with("_pubky") {
+                    packet.answers.push(answer.into_owned())
+                }
+            }
+        }
+
+        let svcb = SVCB::new(0, host.try_into()?);
+
+        packet.answers.push(pkarr::dns::ResourceRecord::new(
+            "_pubky".try_into().unwrap(),
+            pkarr::dns::CLASS::IN,
+            60 * 60,
+            pkarr::dns::rdata::RData::SVCB(svcb),
+        ));
+
+        let signed_packet = SignedPacket::from_packet(keypair, &packet)?;
+
+        self.pkarr.publish(&signed_packet)?;
+
+        Ok(())
+    }
+
+    /// Resolve the homeserver for a pubky.
+    pub(crate) fn resolve_pubky_homeserver(&self, pubky: &PublicKey) -> Result<(PublicKey, Url)> {
+        let target = format!("_pubky.{}", pubky);
+
+        self.resolve_endpoint(&target)
+            .map_err(|_| Error::Generic("Could not resolve homeserver".to_string()))
+    }
+
+    /// Resolve a service's public_key and clearnet url from a Pubky domain
+    pub(crate) fn resolve_endpoint(&self, target: &str) -> Result<(PublicKey, Url)> {
+        // TODO: cache the result of this function?
+        // TODO: use MAX_RECURSIVE_PUBKY_HOMESERVER_RESOLUTION
+        // TODO: move to common?
+
+        let mut target = target.to_string();
+        let mut homeserver_public_key = None;
+        let mut host = target.clone();
+
+        // PublicKey is very good at extracting the Pkarr TLD from a string.
+        while let Ok(public_key) = PublicKey::try_from(target.clone()) {
+            if let Some(signed_packet) = self.pkarr.resolve(&public_key)? {
+                let mut prior = None;
+
+                for answer in signed_packet.resource_records(&target) {
+                    if let pkarr::dns::rdata::RData::SVCB(svcb) = &answer.rdata {
+                        if svcb.priority == 0 {
+                            prior = Some(svcb)
+                        } else if let Some(sofar) = prior {
+                            if svcb.priority >= sofar.priority {
+                                prior = Some(svcb)
+                            }
+                            // TODO return random if priority is the same
+                        } else {
+                            prior = Some(svcb)
+                        }
+                    }
+                }
+
+                if let Some(svcb) = prior {
+                    homeserver_public_key = Some(public_key);
+                    target = svcb.target.to_string();
+
+                    if let Some(port) = svcb.get_param(pkarr::dns::rdata::SVCB::PORT) {
+                        if port.len() < 2 {
+                            // TODO: debug! Error encoding port!
+                        }
+                        let port = u16::from_be_bytes([port[0], port[1]]);
+
+                        host = format!("{target}:{port}");
+                    } else {
+                        host.clone_from(&target);
+                    };
+
+                    continue;
+                }
+            };
+
+            break;
+        }
+
+        if let Some(homeserver) = homeserver_public_key {
+            let url = if host.starts_with("localhost") {
+                format!("http://{host}")
+            } else {
+                format!("https://{host}")
+            };
+
+            return Ok((homeserver, Url::parse(&url)?));
+        }
+
+        Err(Error::Generic("Could not resolve endpoint".to_string()))
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use super::*;
+
+    use pkarr::{
+        dns::{rdata::SVCB, Packet},
+        mainline::{dht::DhtSettings, Testnet},
+        Keypair, PkarrClient, Settings, SignedPacket,
+    };
+    use pubky_homeserver::Homeserver;
+
+    #[tokio::test]
+    async fn resolve_homeserver() {
+        let testnet = Testnet::new(3);
+        let server = Homeserver::start_test(&testnet).await.unwrap();
+
+        // Publish an intermediate controller of the homeserver
+        let pkarr_client = PkarrClient::new(Settings {
+            dht: DhtSettings {
+                bootstrap: Some(testnet.bootstrap.clone()),
+                ..Default::default()
+            },
+            ..Default::default()
+        })
+        .unwrap()
+        .as_async();
+
+        let intermediate = Keypair::random();
+
+        let mut packet = Packet::new_reply(0);
+
+        let server_tld = server.public_key().to_string();
+
+        let mut svcb = SVCB::new(0, server_tld.as_str().try_into().unwrap());
+
+        packet.answers.push(pkarr::dns::ResourceRecord::new(
+            "pubky".try_into().unwrap(),
+            pkarr::dns::CLASS::IN,
+            60 * 60,
+            pkarr::dns::rdata::RData::SVCB(svcb),
+        ));
+
+        let signed_packet = SignedPacket::from_packet(&intermediate, &packet).unwrap();
+
+        pkarr_client.publish(&signed_packet).await.unwrap();
+
+        tokio::task::spawn_blocking(move || {
+            let client = PubkyClient::test(&testnet);
+
+            let pubky = Keypair::random();
+
+            client
+                .publish_pubky_homeserver(&pubky, &format!("pubky.{}", &intermediate.public_key()));
+
+            let (public_key, url) = client
+                .resolve_pubky_homeserver(&pubky.public_key())
+                .unwrap();
+
+            assert_eq!(public_key, server.public_key());
+            assert_eq!(url.host_str(), Some("localhost"));
+            assert_eq!(url.port(), Some(server.port()));
+        })
+        .await
+        .expect("task failed")
+    }
+}
diff --git a/pubky/src/lib.rs b/pubky/src/lib.rs
index b05d067..7125ca1 100644
--- a/pubky/src/lib.rs
+++ b/pubky/src/lib.rs
@@ -6,54 +6,3 @@ mod error;
 
 pub use client::PubkyClient;
 pub use error::Error;
-
-#[cfg(test)]
-mod tests {
-    use super::*;
-
-    use super::error::Error;
-
-    use pkarr::{mainline::Testnet, Keypair};
-    use pubky_common::session::Session;
-    use pubky_homeserver::Homeserver;
-
-    #[tokio::test]
-    async fn basic_authn() {
-        let testnet = Testnet::new(3);
-        let server = Homeserver::start_test(&testnet).await.unwrap();
-
-        let client = PubkyClient::test(&testnet).as_async();
-
-        let keypair = Keypair::random();
-
-        client
-            .signup(&keypair, &server.public_key().to_string())
-            .await
-            .unwrap();
-
-        let session = client.session(&keypair.public_key()).await.unwrap();
-
-        assert_eq!(session, Session { ..session.clone() });
-
-        client.signout(&keypair.public_key()).await.unwrap();
-
-        {
-            let session = client.session(&keypair.public_key()).await;
-
-            assert!(session.is_err());
-
-            match session {
-                Err(Error::NotSignedIn) => {}
-                _ => assert!(false, "expected NotSignedInt error"),
-            }
-        }
-
-        client.signin(&keypair).await.unwrap();
-
-        {
-            let session = client.session(&keypair.public_key()).await.unwrap();
-
-            assert_eq!(session, Session { ..session.clone() });
-        }
-    }
-}

From 8cf18a3c0c1566dbdc4ac0e3f15b4789eed02d56 Mon Sep 17 00:00:00 2001
From: nazeh <ar.nazeh@gmail.com>
Date: Tue, 23 Jul 2024 19:04:02 +0300
Subject: [PATCH 3/4] feat(pubky): add get()

---
 pubky-homeserver/src/database/tables/blobs.rs |   4 +-
 .../src/database/tables/entries.rs            |  23 ++-
 pubky-homeserver/src/error.rs                 |  40 +++--
 pubky-homeserver/src/extractors.rs            |  29 ++++
 pubky-homeserver/src/routes.rs                |   5 +-
 pubky-homeserver/src/routes/drive.rs          |  63 --------
 pubky-homeserver/src/routes/public.rs         | 149 ++++++++++++++++++
 pubky/Cargo.toml                              |   1 +
 pubky/src/client.rs                           |   1 +
 pubky/src/client/auth.rs                      |  20 ++-
 pubky/src/client/public.rs                    | 115 ++++++++++++++
 pubky/src/client_async.rs                     |  29 ++++
 pubky/src/error.rs                            |   6 +
 13 files changed, 395 insertions(+), 90 deletions(-)
 delete mode 100644 pubky-homeserver/src/routes/drive.rs
 create mode 100644 pubky-homeserver/src/routes/public.rs
 create mode 100644 pubky/src/client/public.rs

diff --git a/pubky-homeserver/src/database/tables/blobs.rs b/pubky-homeserver/src/database/tables/blobs.rs
index 9cf1da1..0148d6f 100644
--- a/pubky-homeserver/src/database/tables/blobs.rs
+++ b/pubky-homeserver/src/database/tables/blobs.rs
@@ -5,9 +5,7 @@ use heed::{
     BoxedError, BytesDecode, BytesEncode, Database,
 };
 
-use pubky_common::crypto::Hash;
-
 /// hash of the blob => bytes.
-pub type BlobsTable = Database<Hash, Bytes>;
+pub type BlobsTable = Database<Bytes, Bytes>;
 
 pub const BLOBS_TABLE: &str = "blobs";
diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs
index 5a1cc8e..1d2028e 100644
--- a/pubky-homeserver/src/database/tables/entries.rs
+++ b/pubky-homeserver/src/database/tables/entries.rs
@@ -7,10 +7,10 @@ use heed::{
     BoxedError, BytesDecode, BytesEncode, Database,
 };
 
-use pubky_common::crypto::Hash;
+use pubky_common::{crypto::Hash, timestamp::Timestamp};
 
 /// full_path(pubky/*path) => Entry.
-pub type EntriesTable = Database<Hash, Entry>;
+pub type EntriesTable = Database<Bytes, Bytes>;
 
 pub const ENTRIES_TABLE: &str = "entries";
 
@@ -30,7 +30,10 @@ pub struct Entry {
 
 impl Entry {
     pub fn new() -> Self {
-        Default::default()
+        Self {
+            timestamp: Timestamp::now().into_inner(),
+            ..Default::default()
+        }
     }
 
     // === Setters ===
@@ -63,4 +66,18 @@ impl Entry {
     pub fn content_type(&self) -> &str {
         &self.content_type
     }
+
+    // === Public Method ===
+
+    pub fn serialize(&self) -> Vec<u8> {
+        to_allocvec(self).expect("Session::serialize")
+    }
+
+    pub fn deserialize(bytes: &[u8]) -> core::result::Result<Self, postcard::Error> {
+        if bytes[0] > 0 {
+            panic!("Unknown Entry version");
+        }
+
+        Ok(from_bytes(bytes)?)
+    }
 }
diff --git a/pubky-homeserver/src/error.rs b/pubky-homeserver/src/error.rs
index 46d37d6..57081ab 100644
--- a/pubky-homeserver/src/error.rs
+++ b/pubky-homeserver/src/error.rs
@@ -54,25 +54,49 @@ impl IntoResponse for Error {
 
 impl From<QueryRejection> for Error {
     fn from(error: QueryRejection) -> Self {
-        Self::new(StatusCode::BAD_REQUEST, Some(error))
+        Self::new(StatusCode::BAD_REQUEST, error.into())
     }
 }
 
 impl From<ExtensionRejection> for Error {
     fn from(error: ExtensionRejection) -> Self {
-        Self::new(StatusCode::BAD_REQUEST, Some(error))
+        Self::new(StatusCode::BAD_REQUEST, error.into())
     }
 }
 
 impl From<PathRejection> for Error {
     fn from(error: PathRejection) -> Self {
-        Self::new(StatusCode::BAD_REQUEST, Some(error))
+        Self::new(StatusCode::BAD_REQUEST, error.into())
     }
 }
 
 impl From<std::io::Error> for Error {
     fn from(error: std::io::Error) -> Self {
-        Self::new(StatusCode::INTERNAL_SERVER_ERROR, Some(error))
+        Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
+    }
+}
+
+impl From<heed::Error> for Error {
+    fn from(error: heed::Error) -> Self {
+        Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
+    }
+}
+
+impl From<anyhow::Error> for Error {
+    fn from(error: anyhow::Error) -> Self {
+        Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
+    }
+}
+
+impl From<postcard::Error> for Error {
+    fn from(error: postcard::Error) -> Self {
+        Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
+    }
+}
+
+impl From<axum::Error> for Error {
+    fn from(error: axum::Error) -> Self {
+        Self::new(StatusCode::INTERNAL_SERVER_ERROR, error.into())
     }
 }
 
@@ -89,11 +113,3 @@ impl From<pkarr::Error> for Error {
         Self::new(StatusCode::BAD_REQUEST, Some(error))
     }
 }
-
-impl From<heed::Error> for Error {
-    fn from(error: heed::Error) -> Self {
-        debug!(?error);
-
-        Self::with_status(StatusCode::INTERNAL_SERVER_ERROR)
-    }
-}
diff --git a/pubky-homeserver/src/extractors.rs b/pubky-homeserver/src/extractors.rs
index be65f13..e7192db 100644
--- a/pubky-homeserver/src/extractors.rs
+++ b/pubky-homeserver/src/extractors.rs
@@ -45,3 +45,32 @@ where
         Ok(Pubky(public_key))
     }
 }
+
+pub struct EntryPath(pub(crate) String);
+
+impl EntryPath {
+    pub fn as_bytes(&self) -> &[u8] {
+        self.0.as_bytes()
+    }
+}
+
+#[async_trait]
+impl<S> FromRequestParts<S> for EntryPath
+where
+    S: Send + Sync,
+{
+    type Rejection = Response;
+
+    async fn from_request_parts(parts: &mut Parts, _state: &S) -> Result<Self, Self::Rejection> {
+        let params: Path<HashMap<String, String>> =
+            parts.extract().await.map_err(IntoResponse::into_response)?;
+
+        // TODO: enforce path limits like no trailing '/'
+
+        let path = params
+            .get("path")
+            .ok_or_else(|| (StatusCode::NOT_FOUND, "entry path missing").into_response())?;
+
+        Ok(EntryPath(path.to_string()))
+    }
+}
diff --git a/pubky-homeserver/src/routes.rs b/pubky-homeserver/src/routes.rs
index 6099858..3b872c1 100644
--- a/pubky-homeserver/src/routes.rs
+++ b/pubky-homeserver/src/routes.rs
@@ -9,7 +9,7 @@ use tower_http::trace::TraceLayer;
 use crate::server::AppState;
 
 mod auth;
-mod drive;
+mod public;
 mod root;
 
 pub fn create_app(state: AppState) -> Router {
@@ -19,7 +19,8 @@ pub fn create_app(state: AppState) -> Router {
         .route("/:pubky/session", get(auth::session))
         .route("/:pubky/session", post(auth::signin))
         .route("/:pubky/session", delete(auth::signout))
-        .route("/:pubky/*key", put(drive::put))
+        .route("/:pubky/*path", put(public::put))
+        .route("/:pubky/*path", get(public::get))
         .layer(TraceLayer::new_for_http())
         .layer(CookieManagerLayer::new())
         // TODO: revisit if we enable streaming big payloads
diff --git a/pubky-homeserver/src/routes/drive.rs b/pubky-homeserver/src/routes/drive.rs
deleted file mode 100644
index 12f8fb8..0000000
--- a/pubky-homeserver/src/routes/drive.rs
+++ /dev/null
@@ -1,63 +0,0 @@
-use axum::{
-    body::{Body, Bytes},
-    extract::{Path, State},
-    http::StatusCode,
-    response::IntoResponse,
-    RequestExt, Router,
-};
-use futures_util::stream::StreamExt;
-
-use tracing::debug;
-
-use pubky_common::crypto::Hasher;
-
-use crate::{
-    database::tables::blobs::{BlobsTable, BLOBS_TABLE},
-    error::{Error, Result},
-    extractors::Pubky,
-    server::AppState,
-};
-
-pub async fn put(
-    State(state): State<AppState>,
-    pubky: Pubky,
-    // Path(key): Path<String>,
-    mut body: Body,
-) -> Result<impl IntoResponse> {
-    let mut stream = body.into_data_stream();
-
-    let (tx, rx) = flume::bounded::<Bytes>(1);
-
-    // Offload the write transaction to a blocking task
-    let done = tokio::task::spawn_blocking(move || {
-        // TODO: this is a blocking operation, which is ok for small
-        // payloads (we have 16 kb limit for now) but later we need
-        // to stream this to filesystem, and keep track of any failed
-        // writes to GC these files later.
-
-        let mut wtxn = state.db.env.write_txn().unwrap();
-        let blobs: BlobsTable = state
-            .db
-            .env
-            .open_database(&wtxn, Some(BLOBS_TABLE))
-            .unwrap()
-            .expect("Blobs table already created");
-
-        let hasher = Hasher::new();
-
-        while let Ok(chunk) = rx.recv() {
-            dbg!(chunk);
-        }
-    });
-
-    while let Some(next) = stream.next().await {
-        let chunk = next
-            .map_err(|err| Error::new(StatusCode::INTERNAL_SERVER_ERROR, Some(err.to_string())))?;
-
-        tx.send(chunk);
-    }
-
-    let _ = done.await;
-
-    Ok("Pubky drive...".to_string())
-}
diff --git a/pubky-homeserver/src/routes/public.rs b/pubky-homeserver/src/routes/public.rs
new file mode 100644
index 0000000..38c2741
--- /dev/null
+++ b/pubky-homeserver/src/routes/public.rs
@@ -0,0 +1,149 @@
+use axum::{
+    body::{Body, Bytes},
+    extract::{Path, State},
+    http::StatusCode,
+    response::IntoResponse,
+    RequestExt, Router,
+};
+use axum_extra::body::AsyncReadBody;
+use futures_util::stream::StreamExt;
+
+use tracing::debug;
+
+use pubky_common::crypto::Hasher;
+
+use crate::{
+    database::tables::{
+        blobs::{BlobsTable, BLOBS_TABLE},
+        entries::{EntriesTable, Entry, ENTRIES_TABLE},
+    },
+    error::{Error, Result},
+    extractors::{EntryPath, Pubky},
+    server::AppState,
+};
+
+pub async fn put(
+    State(state): State<AppState>,
+    pubky: Pubky,
+    path: EntryPath,
+    mut body: Body,
+) -> Result<impl IntoResponse> {
+    // TODO: return an error if path does not start with '/pub/'
+
+    let mut stream = body.into_data_stream();
+
+    let (tx, rx) = flume::bounded::<Bytes>(1);
+
+    // TODO: refactor Database to clean up this scope.
+    let done = tokio::task::spawn_blocking(move || -> Result<()> {
+        // TODO: this is a blocking operation, which is ok for small
+        // payloads (we have 16 kb limit for now) but later we need
+        // to stream this to filesystem, and keep track of any failed
+        // writes to GC these files later.
+
+        let public_key = pubky.public_key();
+
+        // TODO: Authorize
+
+        let mut wtxn = state.db.env.write_txn()?;
+        let blobs: BlobsTable = state
+            .db
+            .env
+            .open_database(&wtxn, Some(BLOBS_TABLE))?
+            .expect("Blobs table already created");
+
+        let entries: EntriesTable = state
+            .db
+            .env
+            .open_database(&wtxn, Some(ENTRIES_TABLE))?
+            .expect("Entries table already created");
+
+        let mut hasher = Hasher::new();
+        let mut bytes = vec![];
+        let mut length = 0;
+
+        while let Ok(chunk) = rx.recv() {
+            hasher.update(&chunk);
+            bytes.extend_from_slice(&chunk);
+            length += chunk.len();
+        }
+
+        let hash = hasher.finalize();
+
+        blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;
+
+        let mut entry = Entry::new();
+
+        entry.set_content_hash(hash);
+        entry.set_content_length(length);
+
+        let mut key = vec![];
+        key.extend_from_slice(public_key.as_bytes());
+        key.extend_from_slice(path.as_bytes());
+
+        entries.put(&mut wtxn, &key, &entry.serialize());
+
+        Ok(())
+    });
+
+    while let Some(next) = stream.next().await {
+        let chunk = next?;
+
+        tx.send(chunk);
+    }
+
+    drop(tx);
+    done.await.expect("join error")?;
+
+    // TODO: return relevant headers, like Etag?
+
+    Ok(())
+}
+
+pub async fn get(
+    State(state): State<AppState>,
+    pubky: Pubky,
+    path: EntryPath,
+) -> Result<impl IntoResponse> {
+    // TODO: check the path, return an error if doesn't start with `/pub/`
+
+    // TODO: Enable streaming
+
+    let public_key = pubky.public_key();
+
+    let mut rtxn = state.db.env.read_txn()?;
+
+    let entries: EntriesTable = state
+        .db
+        .env
+        .open_database(&rtxn, Some(ENTRIES_TABLE))?
+        .expect("Entries table already created");
+
+    let blobs: BlobsTable = state
+        .db
+        .env
+        .open_database(&rtxn, Some(BLOBS_TABLE))?
+        .expect("Blobs table already created");
+
+    let mut count = 0;
+
+    for x in entries.iter(&rtxn)? {
+        count += 1
+    }
+
+    return Err(Error::new(StatusCode::NOT_FOUND, count.to_string().into()));
+
+    let mut key = vec![];
+    key.extend_from_slice(public_key.as_bytes());
+    key.extend_from_slice(path.as_bytes());
+
+    if let Some(bytes) = entries.get(&rtxn, &key)? {
+        let entry = Entry::deserialize(bytes)?;
+
+        if let Some(blob) = blobs.get(&rtxn, entry.content_hash())? {
+            return Ok(blob.to_vec());
+        };
+    };
+
+    Err(Error::new(StatusCode::NOT_FOUND, path.0.into()))
+}
diff --git a/pubky/Cargo.toml b/pubky/Cargo.toml
index e10789a..c40cd9c 100644
--- a/pubky/Cargo.toml
+++ b/pubky/Cargo.toml
@@ -11,6 +11,7 @@ ureq = { version = "2.10.0", features = ["cookies"] }
 thiserror = "1.0.62"
 url = "2.5.2"
 flume = { version = "0.11.0", features = ["select", "eventual-fairness"], default-features = false }
+bytes = "1.6.1"
 
 [dev-dependencies]
 pubky_homeserver = { path = "../pubky-homeserver" }
diff --git a/pubky/src/client.rs b/pubky/src/client.rs
index 69f418c..95742df 100644
--- a/pubky/src/client.rs
+++ b/pubky/src/client.rs
@@ -1,5 +1,6 @@
 mod auth;
 mod pkarr;
+mod public;
 
 use std::{collections::HashMap, fmt::format, time::Duration};
 
diff --git a/pubky/src/client/auth.rs b/pubky/src/client/auth.rs
index 25b679c..8445640 100644
--- a/pubky/src/client/auth.rs
+++ b/pubky/src/client/auth.rs
@@ -16,8 +16,7 @@ impl PubkyClient {
         url.set_path(&format!("/{}", keypair.public_key()));
 
         self.request(HttpMethod::Put, &url)
-            .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())
-            .map_err(Box::new)?;
+            .send_bytes(AuthnSignature::generate(keypair, &audience).as_bytes())?;
 
         self.publish_pubky_homeserver(keypair, homeserver);
 
@@ -25,6 +24,9 @@ impl PubkyClient {
     }
 
     /// Check the current sesison for a given Pubky in its homeserver.
+    ///
+    /// Returns an [Error::NotSignedIn] if so, or [ureq::Error] if
+    /// the response has any other `>=400` status code.
     pub fn session(&self, pubky: &PublicKey) -> Result<Session> {
         let (homeserver, mut url) = self.resolve_pubky_homeserver(pubky)?;
 
@@ -34,11 +36,15 @@ impl PubkyClient {
 
         let result = self.request(HttpMethod::Get, &url).call().map_err(Box::new);
 
-        if let Ok(reader) = result {
-            reader.into_reader().read_to_end(&mut bytes);
-        } else {
-            return Err(Error::NotSignedIn);
-        }
+        let reader = self.request(HttpMethod::Get, &url).call().map_err(|err| {
+            match err {
+                ureq::Error::Status(404, _) => Error::NotSignedIn,
+                // TODO: handle other types of errors
+                _ => err.into(),
+            }
+        })?;
+
+        reader.into_reader().read_to_end(&mut bytes);
 
         Ok(Session::deserialize(&bytes)?)
     }
diff --git a/pubky/src/client/public.rs b/pubky/src/client/public.rs
new file mode 100644
index 0000000..1600c92
--- /dev/null
+++ b/pubky/src/client/public.rs
@@ -0,0 +1,115 @@
+use bytes::Bytes;
+
+use pkarr::PublicKey;
+
+use crate::PubkyClient;
+
+use super::Result;
+
+impl PubkyClient {
+    pub fn put(&self, pubky: &PublicKey, path: &str, content: &[u8]) -> Result<()> {
+        let path = normalize_path(path);
+
+        let (_, mut url) = self.resolve_pubky_homeserver(pubky)?;
+
+        url.set_path(&format!("/{pubky}/{path}"));
+
+        self.request(super::HttpMethod::Put, &url)
+            .send_bytes(content)?;
+
+        Ok(())
+    }
+
+    pub fn get(&self, pubky: &PublicKey, path: &str) -> Result<Bytes> {
+        let path = normalize_path(path);
+
+        let (_, mut url) = self.resolve_pubky_homeserver(pubky)?;
+
+        url.set_path(&format!("/{pubky}/{path}"));
+
+        let result = self.request(super::HttpMethod::Get, &url).call();
+
+        if let Err(error) = result {
+            dbg!(&error);
+
+            return Err(error)?;
+        }
+
+        let response = result.unwrap();
+
+        let len = response
+            .header("Content-Length")
+            .and_then(|s| s.parse::<u64>().ok())
+            // TODO: return an error in case content-length header is missing
+            .unwrap_or(0);
+
+        // TODO: bail on too large files.
+
+        let mut bytes = Vec::with_capacity(len as usize);
+
+        response.into_reader().read_exact(&mut bytes);
+
+        Ok(bytes.into())
+    }
+}
+
+fn normalize_path(path: &str) -> String {
+    let mut path = path.to_string();
+
+    if path.starts_with('/') {
+        path = path[1..].to_string()
+    }
+
+    // TODO: should we return error instead?
+    if path.ends_with('/') {
+        path = path[..path.len()].to_string()
+    }
+
+    path
+}
+
+#[cfg(test)]
+mod tests {
+    use std::ops::Deref;
+
+    use crate::*;
+
+    use pkarr::{mainline::Testnet, Keypair};
+    use pubky_common::session::Session;
+    use pubky_homeserver::Homeserver;
+
+    #[tokio::test]
+    async fn put_get() {
+        let testnet = Testnet::new(3);
+        let server = Homeserver::start_test(&testnet).await.unwrap();
+
+        let client = PubkyClient::test(&testnet).as_async();
+
+        let keypair = Keypair::random();
+
+        client
+            .signup(&keypair, &server.public_key().to_string())
+            .await
+            .unwrap();
+
+        let response = client
+            .put(&keypair.public_key(), "/pub/foo.txt", &[0, 1, 2, 3, 4])
+            .await;
+
+        if let Err(Error::Ureq(ureqerror)) = response {
+            if let Some(r) = ureqerror.into_response() {
+                dbg!(r.into_string());
+            }
+        }
+
+        let response = client.get(&keypair.public_key(), "/pub/foo.txt").await;
+
+        if let Err(Error::Ureq(ureqerror)) = response {
+            if let Some(r) = ureqerror.into_response() {
+                dbg!(r.into_string());
+            }
+        }
+
+        // dbg!(response);
+    }
+}
diff --git a/pubky/src/client_async.rs b/pubky/src/client_async.rs
index de9012c..2fb7bd5 100644
--- a/pubky/src/client_async.rs
+++ b/pubky/src/client_async.rs
@@ -1,5 +1,7 @@
 use std::thread;
 
+use bytes::Bytes;
+
 use pkarr::{Keypair, PublicKey};
 use pubky_common::session::Session;
 
@@ -62,4 +64,31 @@ impl PubkyClientAsync {
 
         receiver.recv_async().await?
     }
+
+    /// Async version of [PubkyClient::put]
+    pub async fn put(&self, pubky: &PublicKey, path: &str, content: &[u8]) -> Result<()> {
+        let (sender, receiver) = flume::bounded::<Result<()>>(1);
+
+        let client = self.0.clone();
+        let pubky = pubky.clone();
+        let path = path.to_string();
+        let content = content.to_vec();
+
+        thread::spawn(move || sender.send(client.put(&pubky, &path, &content)));
+
+        receiver.recv_async().await?
+    }
+
+    /// Async version of [PubkyClient::get]
+    pub async fn get(&self, pubky: &PublicKey, path: &str) -> Result<Bytes> {
+        let (sender, receiver) = flume::bounded::<Result<Bytes>>(1);
+
+        let client = self.0.clone();
+        let pubky = pubky.clone();
+        let path = path.to_string();
+
+        thread::spawn(move || sender.send(client.get(&pubky, &path)));
+
+        receiver.recv_async().await?
+    }
 }
diff --git a/pubky/src/error.rs b/pubky/src/error.rs
index 026382e..acbad4b 100644
--- a/pubky/src/error.rs
+++ b/pubky/src/error.rs
@@ -34,3 +34,9 @@ pub enum Error {
     #[error(transparent)]
     Session(#[from] pubky_common::session::Error),
 }
+
+impl From<ureq::Error> for Error {
+    fn from(error: ureq::Error) -> Self {
+        Error::Ureq(Box::new(error))
+    }
+}

From cc97744f25c8a8e479f08b9231905f5cf6e4ddda Mon Sep 17 00:00:00 2001
From: nazeh <ar.nazeh@gmail.com>
Date: Tue, 23 Jul 2024 21:15:41 +0300
Subject: [PATCH 4/4] feat(pubky): get successful

---
 Cargo.lock                                    |   1 +
 pubky-homeserver/src/database.rs              | 113 +++++++++++++++++-
 pubky-homeserver/src/database/migrations.rs   |  10 +-
 .../src/database/migrations/m0.rs             |   2 +-
 pubky-homeserver/src/database/tables.rs       |  26 ++++
 .../src/database/tables/entries.rs            |   2 +-
 pubky-homeserver/src/extractors.rs            |   4 +
 pubky-homeserver/src/routes/public.rs         |  80 ++-----------
 pubky/src/client/public.rs                    |  29 ++---
 9 files changed, 167 insertions(+), 100 deletions(-)

diff --git a/Cargo.lock b/Cargo.lock
index 74ef877..613c9c4 100644
--- a/Cargo.lock
+++ b/Cargo.lock
@@ -1250,6 +1250,7 @@ dependencies = [
 name = "pubky"
 version = "0.1.0"
 dependencies = [
+ "bytes",
  "flume",
  "pkarr",
  "pubky-common",
diff --git a/pubky-homeserver/src/database.rs b/pubky-homeserver/src/database.rs
index 0eb3200..5bbe6c2 100644
--- a/pubky-homeserver/src/database.rs
+++ b/pubky-homeserver/src/database.rs
@@ -1,16 +1,23 @@
 use std::fs;
 use std::path::Path;
 
+use bytes::Bytes;
 use heed::{types::Str, Database, Env, EnvOpenOptions, RwTxn};
 
 mod migrations;
 pub mod tables;
 
-use migrations::TABLES_COUNT;
+use pubky_common::crypto::Hasher;
+
+use tables::{entries::Entry, Tables, TABLES_COUNT};
+
+use pkarr::PublicKey;
+use tables::blobs::{BlobsTable, BLOBS_TABLE};
 
 #[derive(Debug, Clone)]
 pub struct DB {
     pub(crate) env: Env,
+    pub(crate) tables: Tables,
 }
 
 impl DB {
@@ -19,10 +26,110 @@ impl DB {
 
         let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?;
 
-        migrations::run(&env);
+        let tables = migrations::run(&env)?;
 
-        let db = DB { env };
+        let db = DB { env, tables };
 
         Ok(db)
     }
+
+    pub fn put_entry(
+        &mut self,
+        public_key: &PublicKey,
+        path: &str,
+        rx: flume::Receiver<Bytes>,
+    ) -> anyhow::Result<()> {
+        let mut wtxn = self.env.write_txn()?;
+
+        let mut hasher = Hasher::new();
+        let mut bytes = vec![];
+        let mut length = 0;
+
+        while let Ok(chunk) = rx.recv() {
+            hasher.update(&chunk);
+            bytes.extend_from_slice(&chunk);
+            length += chunk.len();
+        }
+
+        let hash = hasher.finalize();
+
+        self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;
+
+        let mut entry = Entry::new();
+
+        entry.set_content_hash(hash);
+        entry.set_content_length(length);
+
+        let mut key = vec![];
+        key.extend_from_slice(public_key.as_bytes());
+        key.extend_from_slice(path.as_bytes());
+
+        self.tables.entries.put(&mut wtxn, &key, &entry.serialize());
+
+        wtxn.commit()?;
+
+        Ok(())
+    }
+
+    pub fn get_blob(
+        &mut self,
+        public_key: &PublicKey,
+        path: &str,
+    ) -> anyhow::Result<Option<Bytes>> {
+        let mut rtxn = self.env.read_txn()?;
+
+        let mut key = vec![];
+        key.extend_from_slice(public_key.as_bytes());
+        key.extend_from_slice(path.as_bytes());
+
+        if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? {
+            let entry = Entry::deserialize(bytes)?;
+
+            if let Some(blob) = self.tables.blobs.get(&rtxn, entry.content_hash())? {
+                return Ok(Some(Bytes::from(blob.to_vec())));
+            };
+        };
+
+        Ok(None)
+    }
+}
+
+#[cfg(test)]
+mod tests {
+    use pkarr::Keypair;
+    use pubky_common::timestamp::Timestamp;
+
+    use crate::config::Config;
+
+    use super::{Bytes, DB};
+
+    #[tokio::test]
+    async fn entries() {
+        let storage = std::env::temp_dir()
+            .join(Timestamp::now().to_string())
+            .join("pubky");
+
+        let mut db = DB::open(&storage).unwrap();
+
+        let keypair = Keypair::random();
+        let path = "/pub/foo.txt";
+
+        let (tx, rx) = flume::bounded::<Bytes>(0);
+
+        let mut cloned = db.clone();
+        let cloned_keypair = keypair.clone();
+
+        let done = tokio::task::spawn_blocking(move || {
+            cloned.put_entry(&cloned_keypair.public_key(), path, rx);
+        });
+
+        tx.send(vec![1, 2, 3, 4, 5].into());
+        drop(tx);
+
+        done.await;
+
+        let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap();
+
+        assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5]));
+    }
 }
diff --git a/pubky-homeserver/src/database/migrations.rs b/pubky-homeserver/src/database/migrations.rs
index dbead07..32f2909 100644
--- a/pubky-homeserver/src/database/migrations.rs
+++ b/pubky-homeserver/src/database/migrations.rs
@@ -2,16 +2,16 @@ use heed::{types::Str, Database, Env, RwTxn};
 
 mod m0;
 
-use super::tables;
+use super::tables::Tables;
 
-pub const TABLES_COUNT: u32 = 4;
-
-pub fn run(env: &Env) -> anyhow::Result<()> {
+pub fn run(env: &Env) -> anyhow::Result<Tables> {
     let mut wtxn = env.write_txn()?;
 
     m0::run(env, &mut wtxn);
 
+    let tables = Tables::new(env, &mut wtxn)?;
+
     wtxn.commit()?;
 
-    Ok(())
+    Ok(tables)
 }
diff --git a/pubky-homeserver/src/database/migrations/m0.rs b/pubky-homeserver/src/database/migrations/m0.rs
index 74d89c4..d7dac79 100644
--- a/pubky-homeserver/src/database/migrations/m0.rs
+++ b/pubky-homeserver/src/database/migrations/m0.rs
@@ -1,6 +1,6 @@
 use heed::{types::Str, Database, Env, RwTxn};
 
-use super::tables::{blobs, entries, sessions, users};
+use crate::database::tables::{blobs, entries, sessions, users};
 
 pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
     let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;
diff --git a/pubky-homeserver/src/database/tables.rs b/pubky-homeserver/src/database/tables.rs
index 4f0c1c5..a019fbe 100644
--- a/pubky-homeserver/src/database/tables.rs
+++ b/pubky-homeserver/src/database/tables.rs
@@ -2,3 +2,29 @@ pub mod blobs;
 pub mod entries;
 pub mod sessions;
 pub mod users;
+
+use heed::{Env, RwTxn};
+
+use blobs::{BlobsTable, BLOBS_TABLE};
+use entries::{EntriesTable, ENTRIES_TABLE};
+
+pub const TABLES_COUNT: u32 = 4;
+
+#[derive(Debug, Clone)]
+pub struct Tables {
+    pub blobs: BlobsTable,
+    pub entries: EntriesTable,
+}
+
+impl Tables {
+    pub fn new(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<Self> {
+        Ok(Self {
+            blobs: env
+                .open_database(wtxn, Some(BLOBS_TABLE))?
+                .expect("Blobs table already created"),
+            entries: env
+                .open_database(wtxn, Some(ENTRIES_TABLE))?
+                .expect("Entries table already created"),
+        })
+    }
+}
diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs
index 1d2028e..7c9e2e3 100644
--- a/pubky-homeserver/src/database/tables/entries.rs
+++ b/pubky-homeserver/src/database/tables/entries.rs
@@ -78,6 +78,6 @@ impl Entry {
             panic!("Unknown Entry version");
         }
 
-        Ok(from_bytes(bytes)?)
+        from_bytes(bytes)
     }
 }
diff --git a/pubky-homeserver/src/extractors.rs b/pubky-homeserver/src/extractors.rs
index e7192db..b89911c 100644
--- a/pubky-homeserver/src/extractors.rs
+++ b/pubky-homeserver/src/extractors.rs
@@ -49,6 +49,10 @@ where
 pub struct EntryPath(pub(crate) String);
 
 impl EntryPath {
+    pub fn as_str(&self) -> &str {
+        self.0.as_str()
+    }
+
     pub fn as_bytes(&self) -> &[u8] {
         self.0.as_bytes()
     }
diff --git a/pubky-homeserver/src/routes/public.rs b/pubky-homeserver/src/routes/public.rs
index 38c2741..cb07cee 100644
--- a/pubky-homeserver/src/routes/public.rs
+++ b/pubky-homeserver/src/routes/public.rs
@@ -23,7 +23,7 @@ use crate::{
 };
 
 pub async fn put(
-    State(state): State<AppState>,
+    State(mut state): State<AppState>,
     pubky: Pubky,
     path: EntryPath,
     mut body: Body,
@@ -45,43 +45,7 @@ pub async fn put(
 
         // TODO: Authorize
 
-        let mut wtxn = state.db.env.write_txn()?;
-        let blobs: BlobsTable = state
-            .db
-            .env
-            .open_database(&wtxn, Some(BLOBS_TABLE))?
-            .expect("Blobs table already created");
-
-        let entries: EntriesTable = state
-            .db
-            .env
-            .open_database(&wtxn, Some(ENTRIES_TABLE))?
-            .expect("Entries table already created");
-
-        let mut hasher = Hasher::new();
-        let mut bytes = vec![];
-        let mut length = 0;
-
-        while let Ok(chunk) = rx.recv() {
-            hasher.update(&chunk);
-            bytes.extend_from_slice(&chunk);
-            length += chunk.len();
-        }
-
-        let hash = hasher.finalize();
-
-        blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;
-
-        let mut entry = Entry::new();
-
-        entry.set_content_hash(hash);
-        entry.set_content_length(length);
-
-        let mut key = vec![];
-        key.extend_from_slice(public_key.as_bytes());
-        key.extend_from_slice(path.as_bytes());
-
-        entries.put(&mut wtxn, &key, &entry.serialize());
+        state.db.put_entry(public_key, path.as_str(), rx);
 
         Ok(())
     });
@@ -101,7 +65,7 @@ pub async fn put(
 }
 
 pub async fn get(
-    State(state): State<AppState>,
+    State(mut state): State<AppState>,
     pubky: Pubky,
     path: EntryPath,
 ) -> Result<impl IntoResponse> {
@@ -111,39 +75,9 @@ pub async fn get(
 
     let public_key = pubky.public_key();
 
-    let mut rtxn = state.db.env.read_txn()?;
-
-    let entries: EntriesTable = state
-        .db
-        .env
-        .open_database(&rtxn, Some(ENTRIES_TABLE))?
-        .expect("Entries table already created");
-
-    let blobs: BlobsTable = state
-        .db
-        .env
-        .open_database(&rtxn, Some(BLOBS_TABLE))?
-        .expect("Blobs table already created");
-
-    let mut count = 0;
-
-    for x in entries.iter(&rtxn)? {
-        count += 1
+    match state.db.get_blob(public_key, path.as_str()) {
+        Err(error) => Err(error)?,
+        Ok(Some(bytes)) => Ok(bytes),
+        Ok(None) => Err(Error::with_status(StatusCode::NOT_FOUND)),
     }
-
-    return Err(Error::new(StatusCode::NOT_FOUND, count.to_string().into()));
-
-    let mut key = vec![];
-    key.extend_from_slice(public_key.as_bytes());
-    key.extend_from_slice(path.as_bytes());
-
-    if let Some(bytes) = entries.get(&rtxn, &key)? {
-        let entry = Entry::deserialize(bytes)?;
-
-        if let Some(blob) = blobs.get(&rtxn, entry.content_hash())? {
-            return Ok(blob.to_vec());
-        };
-    };
-
-    Err(Error::new(StatusCode::NOT_FOUND, path.0.into()))
 }
diff --git a/pubky/src/client/public.rs b/pubky/src/client/public.rs
index 1600c92..7599e8f 100644
--- a/pubky/src/client/public.rs
+++ b/pubky/src/client/public.rs
@@ -27,15 +27,7 @@ impl PubkyClient {
 
         url.set_path(&format!("/{pubky}/{path}"));
 
-        let result = self.request(super::HttpMethod::Get, &url).call();
-
-        if let Err(error) = result {
-            dbg!(&error);
-
-            return Err(error)?;
-        }
-
-        let response = result.unwrap();
+        let response = self.request(super::HttpMethod::Get, &url).call()?;
 
         let len = response
             .header("Content-Length")
@@ -45,7 +37,7 @@ impl PubkyClient {
 
         // TODO: bail on too large files.
 
-        let mut bytes = Vec::with_capacity(len as usize);
+        let mut bytes = vec![0; len as usize];
 
         response.into_reader().read_exact(&mut bytes);
 
@@ -102,14 +94,17 @@ mod tests {
             }
         }
 
-        let response = client.get(&keypair.public_key(), "/pub/foo.txt").await;
+        let response = client
+            .get(&keypair.public_key(), "/pub/foo.txt")
+            .await
+            .unwrap();
 
-        if let Err(Error::Ureq(ureqerror)) = response {
-            if let Some(r) = ureqerror.into_response() {
-                dbg!(r.into_string());
-            }
-        }
+        // if let Err(Error::Ureq(ureqerror)) = response {
+        //     if let Some(r) = ureqerror.into_response() {
+        //         dbg!(r.into_string());
+        //     }
+        // }
 
-        // dbg!(response);
+        assert_eq!(response, bytes::Bytes::from(vec![0, 1, 2, 3, 4]))
     }
 }