Skip to content

Commit

Permalink
Merge pull request #44 from pubky/feat/non-blocking-writes
Browse files Browse the repository at this point in the history
Feat/non blocking writes
  • Loading branch information
Nuhvi authored Oct 16, 2024
2 parents 279db97 + ffd3cfb commit eb5beea
Show file tree
Hide file tree
Showing 9 changed files with 467 additions and 214 deletions.
37 changes: 19 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pubky-homeserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ flume = "0.11.0"
futures-util = "0.3.30"
heed = "0.20.3"
hex = "0.4.3"
libc = "0.2.159"
pkarr = { workspace = true }
postcard = { version = "1.0.8", features = ["alloc"] }
pubky-common = { version = "0.1.0", path = "../pubky-common" }
Expand Down
61 changes: 23 additions & 38 deletions pubky-homeserver/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fs;
use std::{fs, path::PathBuf};

use heed::{Env, EnvOpenOptions};

Expand All @@ -14,11 +14,17 @@ pub struct DB {
pub(crate) env: Env,
pub(crate) tables: Tables,
pub(crate) config: Config,
pub(crate) buffers_dir: PathBuf,
pub(crate) max_chunk_size: usize,
}

impl DB {
pub fn open(config: Config) -> anyhow::Result<Self> {
fs::create_dir_all(config.storage())?;
let buffers_dir = config.storage().clone().join("buffers");

// Cleanup buffers.
let _ = fs::remove_dir(&buffers_dir);
fs::create_dir_all(&buffers_dir)?;

let env = unsafe {
EnvOpenOptions::new()
Expand All @@ -33,46 +39,25 @@ impl DB {
env,
tables,
config,
buffers_dir,
max_chunk_size: max_chunk_size(),
};

Ok(db)
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use pkarr::{mainline::Testnet, Keypair};

use crate::config::Config;

use super::DB;

#[tokio::test]
async fn entries() {
let db = DB::open(Config::test(&Testnet::new(0))).unwrap();

let keypair = Keypair::random();
let path = "/pub/foo.txt";

let (tx, rx) = flume::bounded::<Bytes>(0);

let mut cloned = db.clone();
let cloned_keypair = keypair.clone();

let done = tokio::task::spawn_blocking(move || {
cloned
.put_entry(&cloned_keypair.public_key(), path, rx)
.unwrap();
});

tx.send(vec![1, 2, 3, 4, 5].into()).unwrap();
drop(tx);

done.await.unwrap();

let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap();

assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5]));
}
/// calculate optimal chunk size:
/// - https://lmdb.readthedocs.io/en/release/#storage-efficiency-limits
/// - https://github.com/lmdbjava/benchmarks/blob/master/results/20160710/README.md#test-2-determine-24816-kb-byte-values
fn max_chunk_size() -> usize {
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize };

// - 16 bytes Header per page (LMDB)
// - Each page has to contain 2 records
// - 8 bytes per record (LMDB) (imperically, it seems to be 10 not 8)
// - 12 bytes key:
// - timestamp : 8 bytes
// - chunk index: 4 bytes
((page_size - 16) / 2) - (8 + 2) - 12
}
36 changes: 11 additions & 25 deletions pubky-homeserver/src/database/tables/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,24 @@
use heed::{types::Bytes, Database};
use pkarr::PublicKey;
use heed::{types::Bytes, Database, RoTxn};

use crate::database::DB;

use super::entries::Entry;

/// hash of the blob => bytes.
/// (entry timestamp | chunk_index BE) => bytes
pub type BlobsTable = Database<Bytes, Bytes>;

pub const BLOBS_TABLE: &str = "blobs";

impl DB {
pub fn get_blob(
pub fn read_entry_content<'txn>(
&self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<bytes::Bytes>> {
let rtxn = self.env.read_txn()?;

let key = format!("{public_key}/{path}");

let result = if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;

self.tables
.blobs
.get(&rtxn, entry.content_hash())?
.map(|blob| bytes::Bytes::from(blob[8..].to_vec()))
} else {
None
};

rtxn.commit()?;

Ok(result)
rtxn: &'txn RoTxn,
entry: &Entry,
) -> anyhow::Result<impl Iterator<Item = Result<&'txn [u8], heed::Error>> + 'txn> {
Ok(self
.tables
.blobs
.prefix_iter(rtxn, &entry.timestamp().to_bytes())?
.map(|i| i.map(|(_, bytes)| bytes)))
}
}
Loading

0 comments on commit eb5beea

Please sign in to comment.