diff --git a/pubky-homeserver/src/database/tables/blobs.rs b/pubky-homeserver/src/database/tables/blobs.rs index 25f57c0..c430a58 100644 --- a/pubky-homeserver/src/database/tables/blobs.rs +++ b/pubky-homeserver/src/database/tables/blobs.rs @@ -26,7 +26,7 @@ impl DB { self.tables .blobs .get(&rtxn, entry.content_hash())? - .map(|blob| bytes::Bytes::from(blob.to_vec())) + .map(|blob| bytes::Bytes::from(blob[8..].to_vec())) } else { None }; diff --git a/pubky-homeserver/src/database/tables/entries.rs b/pubky-homeserver/src/database/tables/entries.rs index e41a5df..081f606 100644 --- a/pubky-homeserver/src/database/tables/entries.rs +++ b/pubky-homeserver/src/database/tables/entries.rs @@ -43,7 +43,26 @@ impl DB { let hash = hasher.finalize(); - self.tables.blobs.put(&mut wtxn, hash.as_bytes(), &bytes)?; + let key = hash.as_bytes(); + + let mut bytes_with_ref_count = Vec::with_capacity(bytes.len() + 8); + bytes_with_ref_count.extend_from_slice(&u64::to_be_bytes(0)); + bytes_with_ref_count.extend_from_slice(&bytes); + + // TODO: For now, we set the first 8 bytes to a reference counter + let exists = self + .tables + .blobs + .get(&wtxn, key)? + .unwrap_or(bytes_with_ref_count.as_slice()); + + let new_count = u64::from_be_bytes(exists[0..8].try_into().unwrap()) + 1; + + bytes_with_ref_count[0..8].copy_from_slice(&u64::to_be_bytes(new_count)); + + self.tables + .blobs + .put(&mut wtxn, hash.as_bytes(), &bytes_with_ref_count)?; let mut entry = Entry::new(); @@ -82,8 +101,28 @@ impl DB { let deleted = if let Some(bytes) = self.tables.entries.get(&wtxn, &key)? { let entry = Entry::deserialize(bytes)?; - // TODO: reference counting of blobs - let deleted_blobs = self.tables.blobs.delete(&mut wtxn, entry.content_hash())?; + let mut bytes_with_ref_count = self + .tables + .blobs + .get(&wtxn, entry.content_hash())? + .map_or(vec![], |s| s.to_vec()); + + let arr: [u8; 8] = bytes_with_ref_count[0..8].try_into().unwrap_or([0; 8]); + let reference_count = u64::from_be_bytes(arr); + + let deleted_blobs = if reference_count > 1 { + // decrement reference count + + bytes_with_ref_count[0..8].copy_from_slice(&(reference_count - 1).to_be_bytes()); + + self.tables + .blobs + .put(&mut wtxn, entry.content_hash(), &bytes_with_ref_count)?; + + true + } else { + self.tables.blobs.delete(&mut wtxn, entry.content_hash())? + }; let deleted_entry = self.tables.entries.delete(&mut wtxn, &key)?; @@ -102,7 +141,7 @@ impl DB { // TODO: move to events.rs } - deleted_entry & deleted_blobs + deleted_entry && deleted_blobs } else { false }; diff --git a/pubky/src/shared/pkarr.rs b/pubky/src/shared/pkarr.rs index d01eded..85055ef 100644 --- a/pubky/src/shared/pkarr.rs +++ b/pubky/src/shared/pkarr.rs @@ -141,7 +141,7 @@ impl PubkyClient { return Err(Error::ResolveEndpoint(original_target.into())); } - if let Some(public_key) = endpoint_public_key { + if endpoint_public_key.is_some() { let url = Url::parse(&format!( "{}://{}", if origin.starts_with("localhost") { @@ -152,7 +152,7 @@ impl PubkyClient { origin ))?; - return Ok(Endpoint { public_key, url }); + return Ok(Endpoint { url }); } Err(Error::ResolveEndpoint(original_target.into())) @@ -173,8 +173,6 @@ impl PubkyClient { #[derive(Debug)] pub(crate) struct Endpoint { - // TODO: we don't use this at all? - pub public_key: PublicKey, pub url: Url, } @@ -326,12 +324,11 @@ mod tests { .await .unwrap(); - let Endpoint { public_key, url } = client + let Endpoint { url, .. } = client .resolve_pubky_homeserver(&pubky.public_key()) .await .unwrap(); - assert_eq!(public_key, server.public_key()); assert_eq!(url.host_str(), Some("localhost")); assert_eq!(url.port(), Some(server.port())); } diff --git a/pubky/src/shared/public.rs b/pubky/src/shared/public.rs index febd1fe..becf2fb 100644 --- a/pubky/src/shared/public.rs +++ b/pubky/src/shared/public.rs @@ -765,4 +765,60 @@ mod tests { let get = client.get(url.as_str()).await.unwrap(); dbg!(get); } + + #[tokio::test] + async fn dont_delete_shared_blobs() { + let testnet = Testnet::new(10); + let homeserver = Homeserver::start_test(&testnet).await.unwrap(); + let client = PubkyClient::test(&testnet); + + let homeserver_pubky = homeserver.public_key(); + + let user_1 = Keypair::random(); + let user_2 = Keypair::random(); + + client.signup(&user_1, &homeserver_pubky).await.unwrap(); + client.signup(&user_2, &homeserver_pubky).await.unwrap(); + + let user_1_id = user_1.public_key(); + let user_2_id = user_2.public_key(); + + let url_1 = format!("pubky://{user_1_id}/pub/pubky.app/file/file_1"); + let url_2 = format!("pubky://{user_2_id}/pub/pubky.app/file/file_1"); + + let file = vec![1]; + client.put(url_1.as_str(), &file).await.unwrap(); + client.put(url_2.as_str(), &file).await.unwrap(); + + // Delete file 1 + client.delete(url_1.as_str()).await.unwrap(); + + let blob = client.get(url_2.as_str()).await.unwrap().unwrap(); + + assert_eq!(blob, file); + + let feed_url = format!("http://localhost:{}/events/", homeserver.port()); + + let response = client + .request( + Method::GET, + format!("{feed_url}").as_str().try_into().unwrap(), + ) + .send() + .await + .unwrap(); + + let text = response.text().await.unwrap(); + let lines = text.split('\n').collect::>(); + + assert_eq!( + lines, + vec![ + format!("PUT pubky://{user_1_id}/pub/pubky.app/file/file_1",), + format!("PUT pubky://{user_2_id}/pub/pubky.app/file/file_1",), + format!("DEL pubky://{user_1_id}/pub/pubky.app/file/file_1",), + lines.last().unwrap().to_string() + ] + ) + } }