Skip to content

Commit

Permalink
feat(pubky): get successful
Browse files Browse the repository at this point in the history
  • Loading branch information
Nuhvi committed Jul 23, 2024
1 parent 8cf18a3 commit cc97744
Show file tree
Hide file tree
Showing 9 changed files with 167 additions and 100 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

113 changes: 110 additions & 3 deletions pubky-homeserver/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use std::fs;
use std::path::Path;

use bytes::Bytes;
use heed::{types::Str, Database, Env, EnvOpenOptions, RwTxn};

mod migrations;
pub mod tables;

use migrations::TABLES_COUNT;
use pubky_common::crypto::Hasher;

use tables::{entries::Entry, Tables, TABLES_COUNT};

use pkarr::PublicKey;
use tables::blobs::{BlobsTable, BLOBS_TABLE};

#[derive(Debug, Clone)]
pub struct DB {
pub(crate) env: Env,
pub(crate) tables: Tables,
}

impl DB {
Expand All @@ -19,10 +26,110 @@ impl DB {

let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?;

migrations::run(&env);
let tables = migrations::run(&env)?;

let db = DB { env };
let db = DB { env, tables };

Ok(db)
}

pub fn put_entry(
&mut self,
public_key: &PublicKey,
path: &str,
rx: flume::Receiver<Bytes>,
) -> anyhow::Result<()> {
let mut wtxn = self.env.write_txn()?;

let mut hasher = Hasher::new();
let mut bytes = vec![];
let mut length = 0;

while let Ok(chunk) = rx.recv() {
hasher.update(&chunk);
bytes.extend_from_slice(&chunk);
length += chunk.len();
}

let hash = hasher.finalize();

self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;

let mut entry = Entry::new();

entry.set_content_hash(hash);
entry.set_content_length(length);

let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());

self.tables.entries.put(&mut wtxn, &key, &entry.serialize());

wtxn.commit()?;

Ok(())
}

pub fn get_blob(
&mut self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<Bytes>> {
let mut rtxn = self.env.read_txn()?;

let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());

if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;

if let Some(blob) = self.tables.blobs.get(&rtxn, entry.content_hash())? {
return Ok(Some(Bytes::from(blob.to_vec())));
};
};

Ok(None)
}
}

#[cfg(test)]
mod tests {
use pkarr::Keypair;
use pubky_common::timestamp::Timestamp;

use crate::config::Config;

use super::{Bytes, DB};

#[tokio::test]
async fn entries() {
let storage = std::env::temp_dir()
.join(Timestamp::now().to_string())
.join("pubky");

let mut db = DB::open(&storage).unwrap();

let keypair = Keypair::random();
let path = "/pub/foo.txt";

let (tx, rx) = flume::bounded::<Bytes>(0);

let mut cloned = db.clone();
let cloned_keypair = keypair.clone();

let done = tokio::task::spawn_blocking(move || {
cloned.put_entry(&cloned_keypair.public_key(), path, rx);
});

tx.send(vec![1, 2, 3, 4, 5].into());
drop(tx);

done.await;

let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap();

assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5]));
}
}
10 changes: 5 additions & 5 deletions pubky-homeserver/src/database/migrations.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,16 @@ use heed::{types::Str, Database, Env, RwTxn};

mod m0;

use super::tables;
use super::tables::Tables;

pub const TABLES_COUNT: u32 = 4;

pub fn run(env: &Env) -> anyhow::Result<()> {
pub fn run(env: &Env) -> anyhow::Result<Tables> {
let mut wtxn = env.write_txn()?;

m0::run(env, &mut wtxn);

let tables = Tables::new(env, &mut wtxn)?;

wtxn.commit()?;

Ok(())
Ok(tables)
}
2 changes: 1 addition & 1 deletion pubky-homeserver/src/database/migrations/m0.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
use heed::{types::Str, Database, Env, RwTxn};

use super::tables::{blobs, entries, sessions, users};
use crate::database::tables::{blobs, entries, sessions, users};

pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;
Expand Down
26 changes: 26 additions & 0 deletions pubky-homeserver/src/database/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,3 +2,29 @@ pub mod blobs;
pub mod entries;
pub mod sessions;
pub mod users;

use heed::{Env, RwTxn};

use blobs::{BlobsTable, BLOBS_TABLE};
use entries::{EntriesTable, ENTRIES_TABLE};

pub const TABLES_COUNT: u32 = 4;

#[derive(Debug, Clone)]
pub struct Tables {
pub blobs: BlobsTable,
pub entries: EntriesTable,
}

impl Tables {
pub fn new(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<Self> {
Ok(Self {
blobs: env
.open_database(wtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created"),
entries: env
.open_database(wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created"),
})
}
}
2 changes: 1 addition & 1 deletion pubky-homeserver/src/database/tables/entries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,6 @@ impl Entry {
panic!("Unknown Entry version");
}

Ok(from_bytes(bytes)?)
from_bytes(bytes)
}
}
4 changes: 4 additions & 0 deletions pubky-homeserver/src/extractors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,10 @@ where
pub struct EntryPath(pub(crate) String);

impl EntryPath {
pub fn as_str(&self) -> &str {
self.0.as_str()
}

pub fn as_bytes(&self) -> &[u8] {
self.0.as_bytes()
}
Expand Down
80 changes: 7 additions & 73 deletions pubky-homeserver/src/routes/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use crate::{
};

pub async fn put(
State(state): State<AppState>,
State(mut state): State<AppState>,
pubky: Pubky,
path: EntryPath,
mut body: Body,
Expand All @@ -45,43 +45,7 @@ pub async fn put(

// TODO: Authorize

let mut wtxn = state.db.env.write_txn()?;
let blobs: BlobsTable = state
.db
.env
.open_database(&wtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created");

let entries: EntriesTable = state
.db
.env
.open_database(&wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created");

let mut hasher = Hasher::new();
let mut bytes = vec![];
let mut length = 0;

while let Ok(chunk) = rx.recv() {
hasher.update(&chunk);
bytes.extend_from_slice(&chunk);
length += chunk.len();
}

let hash = hasher.finalize();

blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;

let mut entry = Entry::new();

entry.set_content_hash(hash);
entry.set_content_length(length);

let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());

entries.put(&mut wtxn, &key, &entry.serialize());
state.db.put_entry(public_key, path.as_str(), rx);

Ok(())
});
Expand All @@ -101,7 +65,7 @@ pub async fn put(
}

pub async fn get(
State(state): State<AppState>,
State(mut state): State<AppState>,
pubky: Pubky,
path: EntryPath,
) -> Result<impl IntoResponse> {
Expand All @@ -111,39 +75,9 @@ pub async fn get(

let public_key = pubky.public_key();

let mut rtxn = state.db.env.read_txn()?;

let entries: EntriesTable = state
.db
.env
.open_database(&rtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created");

let blobs: BlobsTable = state
.db
.env
.open_database(&rtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created");

let mut count = 0;

for x in entries.iter(&rtxn)? {
count += 1
match state.db.get_blob(public_key, path.as_str()) {
Err(error) => Err(error)?,
Ok(Some(bytes)) => Ok(bytes),
Ok(None) => Err(Error::with_status(StatusCode::NOT_FOUND)),
}

return Err(Error::new(StatusCode::NOT_FOUND, count.to_string().into()));

let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());

if let Some(bytes) = entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;

if let Some(blob) = blobs.get(&rtxn, entry.content_hash())? {
return Ok(blob.to_vec());
};
};

Err(Error::new(StatusCode::NOT_FOUND, path.0.into()))
}
29 changes: 12 additions & 17 deletions pubky/src/client/public.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,7 @@ impl PubkyClient {

url.set_path(&format!("/{pubky}/{path}"));

let result = self.request(super::HttpMethod::Get, &url).call();

if let Err(error) = result {
dbg!(&error);

return Err(error)?;
}

let response = result.unwrap();
let response = self.request(super::HttpMethod::Get, &url).call()?;

let len = response
.header("Content-Length")
Expand All @@ -45,7 +37,7 @@ impl PubkyClient {

// TODO: bail on too large files.

let mut bytes = Vec::with_capacity(len as usize);
let mut bytes = vec![0; len as usize];

response.into_reader().read_exact(&mut bytes);

Expand Down Expand Up @@ -102,14 +94,17 @@ mod tests {
}
}

let response = client.get(&keypair.public_key(), "/pub/foo.txt").await;
let response = client
.get(&keypair.public_key(), "/pub/foo.txt")
.await
.unwrap();

if let Err(Error::Ureq(ureqerror)) = response {
if let Some(r) = ureqerror.into_response() {
dbg!(r.into_string());
}
}
// if let Err(Error::Ureq(ureqerror)) = response {
// if let Some(r) = ureqerror.into_response() {
// dbg!(r.into_string());
// }
// }

// dbg!(response);
assert_eq!(response, bytes::Bytes::from(vec![0, 1, 2, 3, 4]))
}
}

0 comments on commit cc97744

Please sign in to comment.