From 9e6b65460c7fd264f35f6693d79ca872ec2dff13 Mon Sep 17 00:00:00 2001 From: nazeh Date: Thu, 12 Sep 2024 17:56:53 +0300 Subject: [PATCH 1/3] fix(homeserver): add a reference counter for chunks to avoid deleting shared blobs --- pubky-homeserver/src/database/tables/blobs.rs | 2 +- .../src/database/tables/entries.rs | 44 ++++++++++++++++++- pubky/src/shared/public.rs | 33 ++++++++++++++ 3 files changed, 77 insertions(+), 2 deletions(-) diff --git a/pubky-homeserver/src/database/tables/blobs.rs b/pubky-homeserver/src/database/tables/blobs.rs index 25f57c0..c430a58 100644 --- a/pubky-homeserver/src/database/tables/blobs.rs +++ b/pubky-homeserver/src/database/tables/blobs.rs @@ -26,7 +26,7 @@ impl DB { self.tables .blobs .get(&rtxn, entry.content_hash())? - .map(|blob| bytes::Bytes::from(blob.to_vec())) + .map(|blob| bytes::Bytes::from(blob[8..].to_vec())) } else { None }; diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs index e41a5df..4e61405 100644 --- a/pubky-homeserver/src/database/tables/entries.rs +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -43,7 +43,26 @@ impl DB { let hash = hasher.finalize(); - self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?; + let key = hash.as_bytes(); + + let mut bytes_with_ref_count = Vec::with_capacity(bytes.len() + 8); + bytes_with_ref_count.extend_from_slice(&u64::to_be_bytes(0)); + bytes_with_ref_count.extend_from_slice(&bytes); + + // TODO: For now, we set the first 8 bytes to a reference counter + let exists = self + .tables + .blobs + .get(&wtxn, key)? + .unwrap_or(bytes_with_ref_count.as_slice()); + + let new_count = u64::from_be_bytes(exists[0..8].try_into().unwrap()) + 1; + + bytes_with_ref_count[0..8].copy_from_slice(&u64::to_be_bytes(new_count)); + + self.tables + .blobs + .put(&mut wtxn, hash.as_bytes(), &bytes_with_ref_count)?; let mut entry = Entry::new(); @@ -82,6 +101,29 @@ impl DB { let deleted = if let Some(bytes) = self.tables.entries.get(&wtxn, &key)? { let entry = Entry::deserialize(bytes)?; + let mut bytes_with_ref_count = self + .tables + .blobs + .get(&wtxn, entry.content_hash())? + .map_or(vec![], |s| s.to_vec()); + + let arr: [u8; 8] = bytes_with_ref_count[0..8].try_into().unwrap_or([0; 8]); + let reference_count = u64::from_be_bytes(arr); + + if reference_count > 1 { + // decrement reference count + + bytes_with_ref_count[0..8].copy_from_slice(&(reference_count - 1).to_be_bytes()); + + self.tables + .blobs + .put(&mut wtxn, entry.content_hash(), &bytes_with_ref_count)?; + + let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?; + + return Ok(deleted_entry); + } + // TODO: reference counting of blobs let deleted_blobs = self.tables.blobs.delete(&mut wtxn, entry.content_hash())?; diff --git a/pubky/src/shared/public.rs b/pubky/src/shared/public.rs index febd1fe..d6c90f1 100644 --- a/pubky/src/shared/public.rs +++ b/pubky/src/shared/public.rs @@ -765,4 +765,37 @@ mod tests { let get = client.get(url.as_str()).await.unwrap(); dbg!(get); } + + #[tokio::test] + async fn dont_delete_shared_blobs() { + let testnet = Testnet::new(10); + let homeserver = Homeserver::start_test(&testnet).await.unwrap(); + let client = PubkyClient::test(&testnet); + + // Step 1: Create first user (follower) + let keypair = Keypair::random(); + + let user_id = keypair.public_key().to_z32(); + client + .signup(&keypair, &homeserver.public_key()) + .await + .unwrap(); + + // Both files are identical, leads to error + let file_1 = vec![1]; + let file_2 = vec![1]; + + let url_1 = format!("pubky://{}/pub/pubky.app/file/file_1", user_id); + let url_2 = format!("pubky://{}/pub/pubky.app/file/file_1", user_id); + + client.put(url_1.as_str(), &file_1).await.unwrap(); + client.put(url_2.as_str(), &file_2).await.unwrap(); + + // Delete file 1 + client.delete(url_1.as_str()).await.unwrap(); + + let blob = client.get(url_2.as_str()).await.unwrap().unwrap(); + + assert_eq!(blob, vec![1]) + } } From f381f9f1a3b8eea315410261e6db8e028e16308e Mon Sep 17 00:00:00 2001 From: nazeh Date: Fri, 13 Sep 2024 10:53:15 +0300 Subject: [PATCH 2/3] fix(homeserver): add DEL event when entry is deleted but not its blob --- .../src/database/tables/entries.rs | 15 +++--- pubky/src/shared/public.rs | 53 +++++++++++++------ 2 files changed, 44 insertions(+), 24 deletions(-) diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs index 4e61405..081f606 100644 --- a/pubky-homeserver/src/database/tables/entries.rs +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -110,7 +110,7 @@ impl DB { let arr: [u8; 8] = bytes_with_ref_count[0..8].try_into().unwrap_or([0; 8]); let reference_count = u64::from_be_bytes(arr); - if reference_count > 1 { + let deleted_blobs = if reference_count > 1 { // decrement reference count bytes_with_ref_count[0..8].copy_from_slice(&(reference_count - 1).to_be_bytes()); @@ -119,13 +119,10 @@ impl DB { .blobs .put(&mut wtxn, entry.content_hash(), &bytes_with_ref_count)?; - let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?; - - return Ok(deleted_entry); - } - - // TODO: reference counting of blobs - let deleted_blobs = self.tables.blobs.delete(&mut wtxn, entry.content_hash())?; + true + } else { + self.tables.blobs.delete(&mut wtxn, entry.content_hash())? + }; let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?; @@ -144,7 +141,7 @@ impl DB { // TODO: move to events.rs } - deleted_entry & deleted_blobs + deleted_entry && deleted_blobs } else { false }; diff --git a/pubky/src/shared/public.rs b/pubky/src/shared/public.rs index d6c90f1..becf2fb 100644 --- a/pubky/src/shared/public.rs +++ b/pubky/src/shared/public.rs @@ -772,30 +772,53 @@ mod tests { let homeserver = Homeserver::start_test(&testnet).await.unwrap(); let client = PubkyClient::test(&testnet); - // Step 1: Create first user (follower) - let keypair = Keypair::random(); + let homeserver_pubky = homeserver.public_key(); - let user_id = keypair.public_key().to_z32(); - client - .signup(&keypair, &homeserver.public_key()) - .await - .unwrap(); + let user_1 = Keypair::random(); + let user_2 = Keypair::random(); - // Both files are identical, leads to error - let file_1 = vec![1]; - let file_2 = vec![1]; + client.signup(&user_1, &homeserver_pubky).await.unwrap(); + client.signup(&user_2, &homeserver_pubky).await.unwrap(); - let url_1 = format!("pubky://{}/pub/pubky.app/file/file_1", user_id); - let url_2 = format!("pubky://{}/pub/pubky.app/file/file_1", user_id); + let user_1_id = user_1.public_key(); + let user_2_id = user_2.public_key(); - client.put(url_1.as_str(), &file_1).await.unwrap(); - client.put(url_2.as_str(), &file_2).await.unwrap(); + let url_1 = format!("pubky://{user_1_id}/pub/pubky.app/file/file_1"); + let url_2 = format!("pubky://{user_2_id}/pub/pubky.app/file/file_1"); + + let file = vec![1]; + client.put(url_1.as_str(), &file).await.unwrap(); + client.put(url_2.as_str(), &file).await.unwrap(); // Delete file 1 client.delete(url_1.as_str()).await.unwrap(); let blob = client.get(url_2.as_str()).await.unwrap().unwrap(); - assert_eq!(blob, vec![1]) + assert_eq!(blob, file); + + let feed_url = format!("http://localhost:{}/events/", homeserver.port()); + + let response = client + .request( + Method::GET, + format!("{feed_url}").as_str().try_into().unwrap(), + ) + .send() + .await + .unwrap(); + + let text = response.text().await.unwrap(); + let lines = text.split('\n').collect::>(); + + assert_eq!( + lines, + vec![ + format!("PUT pubky://{user_1_id}/pub/pubky.app/file/file_1",), + format!("PUT pubky://{user_2_id}/pub/pubky.app/file/file_1",), + format!("DEL pubky://{user_1_id}/pub/pubky.app/file/file_1",), + lines.last().unwrap().to_string() + ] + ) } } From f999475f2bbfe3f17631c2d7b223955132604ae6 Mon Sep 17 00:00:00 2001 From: nazeh Date: Fri, 13 Sep 2024 10:56:09 +0300 Subject: [PATCH 3/3] refactor(pubky): remove publickey from endpoint struct for clippy --- pubky/src/shared/pkarr.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/pubky/src/shared/pkarr.rs b/pubky/src/shared/pkarr.rs index d01eded..85055ef 100644 --- a/pubky/src/shared/pkarr.rs +++ b/pubky/src/shared/pkarr.rs @@ -141,7 +141,7 @@ impl PubkyClient { return Err(Error::ResolveEndpoint(original_target.into())); } - if let Some(public_key) = endpoint_public_key { + if endpoint_public_key.is_some() { let url = Url::parse(&format!( "{}://{}", if origin.starts_with("localhost") { @@ -152,7 +152,7 @@ impl PubkyClient { origin ))?; - return Ok(Endpoint { public_key, url }); + return Ok(Endpoint { url }); } Err(Error::ResolveEndpoint(original_target.into())) @@ -173,8 +173,6 @@ impl PubkyClient { #[derive(Debug)] pub(crate) struct Endpoint { - // TODO: we don't use this at all? - pub public_key: PublicKey, pub url: Url, } @@ -326,12 +324,11 @@ mod tests { .await .unwrap(); - let Endpoint { public_key, url } = client + let Endpoint { url, .. } = client .resolve_pubky_homeserver(&pubky.public_key()) .await .unwrap(); - assert_eq!(public_key, server.public_key()); assert_eq!(url.host_str(), Some("localhost")); assert_eq!(url.port(), Some(server.port())); }