Skip to content

Commit

Permalink
drop intershard reference struct
Browse files Browse the repository at this point in the history
  • Loading branch information
seanses committed Sep 10, 2024
1 parent 2c94529 commit 6916dae
Show file tree
Hide file tree
Showing 11 changed files with 21 additions and 659 deletions.
36 changes: 2 additions & 34 deletions rust/gitxetcore/src/data/data_processing_v2.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,6 @@ use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBC
use mdb_shard::error::MDBShardError;
use mdb_shard::file_structs::{FileDataSequenceEntry, FileDataSequenceHeader, MDBFileInfo};
use mdb_shard::hash_is_global_dedup_eligible;
use mdb_shard::intershard_reference_structs::IntershardReferenceSequence;
use mdb_shard::shard_file_handle::MDBShardFile;
use mdb_shard::shard_file_manager::ShardFileManager;
use mdb_shard::shard_file_reconstructor::FileReconstructor;
Expand Down Expand Up @@ -302,33 +301,6 @@ impl PointerFileTranslatorV2 {
}
}

/// Fetches all the shards in the shard hints that correspond to a given file hash.
pub async fn get_hinted_shard_list_for_file(
&self,
file_hash: &MerkleHash,
) -> Result<IntershardReferenceSequence> {
// First, get the shard corresponding to the file hash

let Some((_, shard_hash_opt)) = self
.remote_shards
.get_file_reconstruction_info(file_hash)
.await?
else {
warn!("get_hinted_shard_list_for_file: file reconstruction not found; ignoring.");
return Ok(<_>::default());
};

let Some(shard_hash) = shard_hash_opt else {
debug!("get_hinted_shard_list_for_file: file reconstruction found in non-permanent shard, ignoring.");
return Ok(<_>::default());
};

debug!("Retrieving shard hints associated with {shard_hash:?}");
let shard_file = self.open_or_fetch_shard(&shard_hash).await?;

Ok(shard_file.get_intershard_references()?)
}

/** Cleans the file.
*/
pub async fn clean_file_and_report_progress(
Expand Down Expand Up @@ -835,17 +807,13 @@ impl PointerFileTranslatorV2 {
}

// Now register any new files as needed.
for (mut fi, chunk_hash_indices, shard_dedup_tracking) in
take(&mut cas_data.pending_file_info)
{
for (mut fi, chunk_hash_indices, _) in take(&mut cas_data.pending_file_info) {
for i in chunk_hash_indices {
debug_assert_eq!(fi.segments[i].cas_hash, MerkleHash::default());
fi.segments[i].cas_hash = cas_hash;
}

self.shard_manager
.add_file_reconstruction_info(fi, Some(shard_dedup_tracking))
.await?;
self.shard_manager.add_file_reconstruction_info(fi).await?;
}

FILTER_CAS_BYTES_PRODUCED.inc_by(compressed_bytes_len as u64);
Expand Down
13 changes: 2 additions & 11 deletions rust/gitxetcore/src/data/data_processing_v3.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use futures::stream::iter;
use futures::StreamExt;
use mdb_shard::cas_structs::{CASChunkSequenceEntry, CASChunkSequenceHeader, MDBCASInfo};
use mdb_shard::file_structs::MDBFileInfo;
use mdb_shard::{IntershardReferenceSequence, ShardFileManager};
use mdb_shard::ShardFileManager;
use merkledb::aggregate_hashes::cas_node_hash;
use merkledb::ObjectRange;
use merklehash::MerkleHash;
Expand Down Expand Up @@ -240,7 +240,7 @@ pub async fn register_new_cas_block(
fi.segments[i].cas_hash = cas_hash;
}

shard_manager.add_file_reconstruction_info(fi, None).await?;
shard_manager.add_file_reconstruction_info(fi).await?;
}

FILTER_CAS_BYTES_PRODUCED.inc_by(compressed_bytes_len as u64);
Expand Down Expand Up @@ -736,15 +736,6 @@ impl PointerFileTranslatorV3 {
self.shard_manager.clone()
}

pub async fn get_hinted_shard_list_for_file(
&self,
_file_hash: &MerkleHash,
) -> Result<IntershardReferenceSequence> {
Err(DataProcessingError::DeprecatedError(
"getting hinted shard list for file is a deprecated feature".to_owned(),
))
}

pub fn get_config(&self) -> XetConfig {
self.xet.clone()
}
Expand Down
118 changes: 1 addition & 117 deletions rust/gitxetcore/src/xetblob/xet_repo.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@ use super::*;
use crate::command::CliOverrides;
use crate::config::remote_to_repo_info;
use crate::config::{ConfigGitPathOption, XetConfig};
use crate::constants::{
GIT_NOTES_MERKLEDB_V1_REF_NAME, GIT_NOTES_MERKLEDB_V2_REF_NAME, MAX_CONCURRENT_DOWNLOADS,
};
use crate::constants::{GIT_NOTES_MERKLEDB_V1_REF_NAME, GIT_NOTES_MERKLEDB_V2_REF_NAME};
use crate::data::cas_interface::old_create_cas_client;
use crate::data::configurations::GlobalDedupPolicy;
use crate::data::*;
Expand All @@ -24,16 +22,13 @@ use lazy_static::lazy_static;
use mdb_shard::constants::MDB_SHARD_MIN_TARGET_SIZE;
use mdb_shard::session_directory::consolidate_shards_in_directory;
use mdb_shard::shard_version::ShardVersion;
use merkledb::constants::TARGET_CDC_CHUNK_SIZE;
use merkledb::MerkleMemDB;
use merklehash::MerkleHash;
use std::collections::{HashMap, HashSet};
use std::mem::take;
use std::path::{Path, PathBuf};
use std::sync::atomic::AtomicUsize;
use std::sync::Arc;
use tempdir::TempDir;
use tokio::sync::Mutex;
use tracing::{debug, error, info};
use url::Url;

Expand Down Expand Up @@ -549,117 +544,6 @@ impl XetRepo {
transaction_tag,
})
}

/// Fetches all the shard in the hints corresponding to one or more source endpoints.
/// The reference files for preparing the dedup are specified by a list of (branch, path)
/// tuples.
///
/// As a further criteria, only shards that define chunks in the reference files with dedupable size
/// exceeding min_dedup_byte_threshholds are downloaded.
pub async fn fetch_hinted_shards_for_dedup(
&self,
reference_files: &[(&str, &str)],
min_dedup_byte_threshhold: usize,
) -> anyhow::Result<()> {
let PFTRouter::V2(ref tr_v2) = &self.translator.pft else {
return Ok(());
};

debug!(
"fetch_hinted_shards_for_dedup: Called with reference files {:?}.",
reference_files
);

// Go through and fetch all the shards needed for deduplication, building a list of new shards.
let shard_download_info = Arc::new(Mutex::new(HashMap::<MerkleHash, usize>::new()));
let shard_download_info_ref = &shard_download_info;

// Download all the shard hints in parallel.
let min_dedup_chunk_count = min_dedup_byte_threshhold / TARGET_CDC_CHUNK_SIZE;

parutils::tokio_par_for_each(
Vec::from(reference_files),
*MAX_CONCURRENT_DOWNLOADS,
|(branch, filename), _| async move {
let shard_download_info = shard_download_info_ref.clone();
if let Ok(body) = self
.bbq_client
.perform_bbq_query(self.remote_base_url.clone(), branch, filename)
.await
{
debug!("Querying shard hints associated with {filename}");

let file_string = std::str::from_utf8(&body).unwrap_or("");

let ptr_file =
PointerFile::init_from_string(file_string, filename);

if ptr_file.is_valid() {
let filename = filename.to_owned();

info!("fetch_hinted_shards_for_dedup: Retrieving shard hints associated with {filename}");

// TODO: strategies to limit this, and limit the number of shards downloaded?
let file_hash = ptr_file.hash()?;
let shard_list = tr_v2.get_hinted_shard_list_for_file(&file_hash).await?;

if !shard_list.is_empty() {
let mut downloads = shard_download_info.lock().await;

for e in shard_list.entries {
if !tr_v2.get_shard_manager().shard_is_registered(&e.shard_hash).await {
*downloads.entry(e.shard_hash).or_default() +=
e.total_dedup_chunks as usize;
}
}
}
} else {
debug!("Destination for {filename} not a pointer file.");
}
} else {
debug!("No destination value found for {filename}");
}

Ok(())
},
)
.await
.map_err(|e| match e {
parutils::ParallelError::JoinError => {
anyhow::anyhow!("Join Error")
}
parutils::ParallelError::TaskError(e) => e,
})?;

// Now, go through and exclude the ones that don't meet a dedup criteria cutoff.
let shard_download_list: Vec<MerkleHash> = shard_download_info
.lock()
.await
.iter()
.filter_map(|(k, v)| {
if *v >= min_dedup_chunk_count {
Some(*k)
} else {
None
}
})
.collect();

let hinted_shards = mdb::download_shards_to_cache(
&self.config,
&self.config.merkledb_v2_cache,
shard_download_list,
)
.await?;

// Register all the new shards.
tr_v2
.get_shard_manager()
.register_shards_by_path(&hinted_shards, true)
.await?;

Ok(())
}
}

impl XetRepoWriteTransaction {
Expand Down
Loading

0 comments on commit 6916dae

Please sign in to comment.