Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat/non blocking writes #44

Merged
merged 6 commits into from
Oct 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 19 additions & 18 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions pubky-homeserver/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ flume = "0.11.0"
futures-util = "0.3.30"
heed = "0.20.3"
hex = "0.4.3"
libc = "0.2.159"
pkarr = { workspace = true }
postcard = { version = "1.0.8", features = ["alloc"] }
pubky-common = { version = "0.1.0", path = "../pubky-common" }
Expand Down
61 changes: 23 additions & 38 deletions pubky-homeserver/src/database.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::fs;
use std::{fs, path::PathBuf};

use heed::{Env, EnvOpenOptions};

Expand All @@ -14,11 +14,17 @@ pub struct DB {
pub(crate) env: Env,
pub(crate) tables: Tables,
pub(crate) config: Config,
pub(crate) buffers_dir: PathBuf,
pub(crate) max_chunk_size: usize,
}

impl DB {
pub fn open(config: Config) -> anyhow::Result<Self> {
fs::create_dir_all(config.storage())?;
let buffers_dir = config.storage().clone().join("buffers");

// Cleanup buffers.
let _ = fs::remove_dir(&buffers_dir);
fs::create_dir_all(&buffers_dir)?;

let env = unsafe {
EnvOpenOptions::new()
Expand All @@ -33,46 +39,25 @@ impl DB {
env,
tables,
config,
buffers_dir,
max_chunk_size: max_chunk_size(),
};

Ok(db)
}
}

#[cfg(test)]
mod tests {
use bytes::Bytes;
use pkarr::{mainline::Testnet, Keypair};

use crate::config::Config;

use super::DB;

#[tokio::test]
async fn entries() {
let db = DB::open(Config::test(&Testnet::new(0))).unwrap();

let keypair = Keypair::random();
let path = "/pub/foo.txt";

let (tx, rx) = flume::bounded::<Bytes>(0);

let mut cloned = db.clone();
let cloned_keypair = keypair.clone();

let done = tokio::task::spawn_blocking(move || {
cloned
.put_entry(&cloned_keypair.public_key(), path, rx)
.unwrap();
});

tx.send(vec![1, 2, 3, 4, 5].into()).unwrap();
drop(tx);

done.await.unwrap();

let blob = db.get_blob(&keypair.public_key(), path).unwrap().unwrap();

assert_eq!(blob, Bytes::from(vec![1, 2, 3, 4, 5]));
}
/// calculate optimal chunk size:
/// - https://lmdb.readthedocs.io/en/release/#storage-efficiency-limits
/// - https://github.com/lmdbjava/benchmarks/blob/master/results/20160710/README.md#test-2-determine-24816-kb-byte-values
fn max_chunk_size() -> usize {
let page_size = unsafe { libc::sysconf(libc::_SC_PAGESIZE) as usize };

// - 16 bytes Header per page (LMDB)
// - Each page has to contain 2 records
// - 8 bytes per record (LMDB) (imperically, it seems to be 10 not 8)
// - 12 bytes key:
// - timestamp : 8 bytes
// - chunk index: 4 bytes
((page_size - 16) / 2) - (8 + 2) - 12
}
36 changes: 11 additions & 25 deletions pubky-homeserver/src/database/tables/blobs.rs
Original file line number Diff line number Diff line change
@@ -1,38 +1,24 @@
use heed::{types::Bytes, Database};
use pkarr::PublicKey;
use heed::{types::Bytes, Database, RoTxn};

use crate::database::DB;

use super::entries::Entry;

/// hash of the blob => bytes.
/// (entry timestamp | chunk_index BE) => bytes
pub type BlobsTable = Database<Bytes, Bytes>;

pub const BLOBS_TABLE: &str = "blobs";

impl DB {
pub fn get_blob(
pub fn read_entry_content<'txn>(
&self,
public_key: &PublicKey,
path: &str,
) -> anyhow::Result<Option<bytes::Bytes>> {
let rtxn = self.env.read_txn()?;

let key = format!("{public_key}/{path}");

let result = if let Some(bytes) = self.tables.entries.get(&rtxn, &key)? {
let entry = Entry::deserialize(bytes)?;

self.tables
.blobs
.get(&rtxn, entry.content_hash())?
.map(|blob| bytes::Bytes::from(blob[8..].to_vec()))
} else {
None
};

rtxn.commit()?;

Ok(result)
rtxn: &'txn RoTxn,
entry: &Entry,
) -> anyhow::Result<impl Iterator<Item = Result<&'txn [u8], heed::Error>> + 'txn> {
Ok(self
.tables
.blobs
.prefix_iter(rtxn, &entry.timestamp().to_bytes())?
.map(|i| i.map(|(_, bytes)| bytes)))
}
}
Loading
Loading