Skip to content

Commit

Permalink
Merge pull request #21 from pubky/feat/basic-data-store
Browse files Browse the repository at this point in the history
Feat/basic data store
  • Loading branch information
Nuhvi committed Jul 23, 2024
2 parents 0109ab4 + cc97744 commit 5cdf299
Show file tree
Hide file tree
Showing 25 changed files with 891 additions and 344 deletions.
18 changes: 18 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions pubky-common/src/crypto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ pub type Hash = blake3::Hash;

pub use blake3::hash;

pub use blake3::Hasher;

pub fn random_hash() -> Hash {
let mut rng = rand::thread_rng();
Hash::from_bytes(rng.gen())
Expand Down
4 changes: 3 additions & 1 deletion pubky-homeserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ edition = "2021"
[dependencies]
anyhow = "1.0.82"
axum = "0.7.5"
axum-extra = { version = "0.9.3", features = ["typed-header"] }
axum-extra = { version = "0.9.3", features = ["typed-header", "async-read-body"] }
base32 = "0.5.1"
bytes = "1.6.1"
dirs-next = "2.0.0"
flume = "0.11.0"
futures-util = "0.3.30"
heed = "0.20.3"
pkarr = { version = "2.1.0", features = ["async"] }
postcard = { version = "1.0.8", features = ["alloc"] }
Expand Down
15 changes: 9 additions & 6 deletions pubky-homeserver/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,13 +40,16 @@ impl Config {

/// Test configurations
pub fn test(testnet: &pkarr::mainline::Testnet) -> Self {
let bootstrap = Some(testnet.bootstrap.to_owned());
let storage = Some(
std::env::temp_dir()
.join(Timestamp::now().to_string())
.join(DEFAULT_STORAGE_DIR),
);

Self {
bootstrap: Some(testnet.bootstrap.to_owned()),
storage: Some(
std::env::temp_dir()
.join(Timestamp::now().to_string())
.join(DEFAULT_STORAGE_DIR),
),
bootstrap,
storage,
..Default::default()
}
}
Expand Down
108 changes: 102 additions & 6 deletions pubky-homeserver/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
use std::fs;
use std::path::Path;

use bytes::Bytes;
use heed::{types::Str, Database, Env, EnvOpenOptions, RwTxn};

mod migrations;
pub mod tables;

use migrations::TABLES_COUNT;
use pubky_common::crypto::Hasher;

use tables::{entries::Entry, Tables, TABLES_COUNT};

use pkarr::PublicKey;
use tables::blobs::{BlobsTable, BLOBS_TABLE};

#[derive(Debug, Clone)]
pub struct DB {
pub(crate) env: Env,
pub(crate) tables: Tables,
}

impl DB {
Expand All @@ -19,21 +26,110 @@ impl DB {

let env = unsafe { EnvOpenOptions::new().max_dbs(TABLES_COUNT).open(storage) }?;

let db = DB { env };
let tables = migrations::run(&env)?;

db.run_migrations();
let db = DB { env, tables };

Ok(db)
}

fn run_migrations(&self) -> anyhow::Result<()> {
pub fn put_entry(
&mut self,
public_key: &PublicKey,
path: &str,
rx: flume::Receiver<Bytes>,
) -> anyhow::Result<()> {
let mut wtxn = self.env.write_txn()?;

migrations::create_users_table(&self.env, &mut wtxn);
migrations::create_sessions_table(&self.env, &mut wtxn);
let mut hasher = Hasher::new();
let mut bytes = vec![];
let mut length = 0;

while let Ok(chunk) = rx.recv() {
hasher.update(&chunk);
bytes.extend_from_slice(&chunk);
length += chunk.len();
}

let hash = hasher.finalize();

self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?;

let mut entry = Entry::new();

entry.set_content_hash(hash);
entry.set_content_length(length);

let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());

self.tables.entries.put(&mut wtxn, &key, &entry.serialize());

wtxn.commit()?;

Ok(())
}

pub fn get_blob(
&mut self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<Bytes>> {
let mut rtxn = self.env.read_txn()?;

let mut key = vec![];
key.extend_from_slice(public_key.as_bytes());
key.extend_from_slice(path.as_bytes());

if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;

if let Some(blob) = self.tables.blobs.get(&rtxn, entry.content_hash())? {
return Ok(Some(Bytes::from(blob.to_vec())));
};
};

Ok(None)
}
}

#[cfg(test)]
mod tests {
use pkarr::Keypair;
use pubky_common::timestamp::Timestamp;

use crate::config::Config;

use super::{Bytes, DB};

#[tokio::test]
async fn entries() {
let storage = std::env::temp_dir()
.join(Timestamp::now().to_string())
.join("pubky");

let mut db = DB::open(&storage).unwrap();

let keypair = Keypair::random();
let path = "/pub/foo.txt";

let (tx, rx) = flume::bounded::<Bytes>(0);

let mut cloned = db.clone();
let cloned_keypair = keypair.clone();

let done = tokio::task::spawn_blocking(move || {
cloned.put_entry(&cloned_keypair.public_key(), path, rx);
});

tx.send(vec![1, 2, 3, 4, 5].into());
drop(tx);

done.await;

let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap();

assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5]));
}
}
20 changes: 9 additions & 11 deletions pubky-homeserver/src/database/migrations.rs
Original file line number Diff line number Diff line change
@@ -1,19 +1,17 @@
use heed::{types::Str, Database, Env, RwTxn};

use super::tables;
mod m0;

pub const TABLES_COUNT: u32 = 2;
use super::tables::Tables;

pub fn create_users_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: tables::users::UsersTable =
env.create_database(wtxn, Some(tables::users::USERS_TABLE))?;
pub fn run(env: &Env) -> anyhow::Result<Tables> {
let mut wtxn = env.write_txn()?;

Ok(())
}
m0::run(env, &mut wtxn);

let tables = Tables::new(env, &mut wtxn)?;

pub fn create_sessions_table(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: tables::sessions::SessionsTable =
env.create_database(wtxn, Some(tables::sessions::SESSIONS_TABLE))?;
wtxn.commit()?;

Ok(())
Ok(tables)
}
15 changes: 15 additions & 0 deletions pubky-homeserver/src/database/migrations/m0.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
use heed::{types::Str, Database, Env, RwTxn};

use crate::database::tables::{blobs, entries, sessions, users};

pub fn run(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<()> {
let _: users::UsersTable = env.create_database(wtxn, Some(users::USERS_TABLE))?;

let _: sessions::SessionsTable = env.create_database(wtxn, Some(sessions::SESSIONS_TABLE))?;

let _: blobs::BlobsTable = env.create_database(wtxn, Some(blobs::BLOBS_TABLE))?;

let _: entries::EntriesTable = env.create_database(wtxn, Some(entries::ENTRIES_TABLE))?;

Ok(())
}
28 changes: 28 additions & 0 deletions pubky-homeserver/src/database/tables.rs
Original file line number Diff line number Diff line change
@@ -1,2 +1,30 @@
pub mod blobs;
pub mod entries;
pub mod sessions;
pub mod users;

use heed::{Env, RwTxn};

use blobs::{BlobsTable, BLOBS_TABLE};
use entries::{EntriesTable, ENTRIES_TABLE};

pub const TABLES_COUNT: u32 = 4;

#[derive(Debug, Clone)]
pub struct Tables {
pub blobs: BlobsTable,
pub entries: EntriesTable,
}

impl Tables {
pub fn new(env: &Env, wtxn: &mut RwTxn) -> anyhow::Result<Self> {
Ok(Self {
blobs: env
.open_database(wtxn, Some(BLOBS_TABLE))?
.expect("Blobs table already created"),
entries: env
.open_database(wtxn, Some(ENTRIES_TABLE))?
.expect("Entries table already created"),
})
}
}
11 changes: 11 additions & 0 deletions pubky-homeserver/src/database/tables/blobs.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
use std::{borrow::Cow, time::SystemTime};

use heed::{
types::{Bytes, Str},
BoxedError, BytesDecode, BytesEncode, Database,
};

/// hash of the blob => bytes.
pub type BlobsTable = Database<Bytes, Bytes>;

pub const BLOBS_TABLE: &str = "blobs";
Loading

0 comments on commit 5cdf299

Please sign in to comment.