Skip to content

Commit

Permalink
Filter uploads.
Browse files Browse the repository at this point in the history
  • Loading branch information
milesj committed Jan 27, 2025
1 parent f82aa71 commit 2198643
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 4 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
based endpoints. Can also be used to set headers for all requests.
- Added support for Depot cloud-based caching: https://depot.dev/docs/cache/overview
- Added support for the HTTP protocol: https://bazel.build/remote/caching#http-caching
- Added timeout and concurrency limit support (non-configurable at the moment).
- Added a `MOON_DEBUG_REMOTE` environment variable, which can be used to debug internal errors for
diagnosing connection/integration issues.

Expand Down
29 changes: 25 additions & 4 deletions crates/remote/src/remote_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ impl RemoteService {
.install_default()
.is_err()
{
error!("Failed to initialize cryptography for gRPC!");
error!("Failed to initialize cryptography for TLS/mTLS!");

return Ok(());
}
Expand Down Expand Up @@ -185,7 +185,7 @@ impl RemoteService {
// compression level, digest strings, etc. All of these "add up" and can
// bump the total body size larger than the actual limit. Is there a better
// way to handle this? Probably...
max - (1024 * 10)
max - (1024 * 25)
}

#[instrument(skip(self, state))]
Expand Down Expand Up @@ -261,6 +261,7 @@ impl RemoteService {
)
.await;

// Don't save the action result if some of the blobs failed to upload
if upload_result.is_err() || upload_result.is_ok_and(|res| !res) {
return;
}
Expand Down Expand Up @@ -364,7 +365,21 @@ async fn batch_upload_blobs(
blobs: Vec<Blob>,
max_size: usize,
) -> miette::Result<bool> {
let blob_groups = partition_into_groups(blobs, max_size, |blob| blob.bytes.len());
let missing_blob_digests = client
.find_missing_blobs(blobs.iter().map(|blob| blob.digest.clone()).collect())
.await?;

// Everything exists in CAS already!
if missing_blob_digests.is_empty() {
return Ok(true);
}

let blob_groups = partition_into_groups(
blobs,
max_size,
|blob| blob.bytes.len(),
|blob| missing_blob_digests.contains(&blob.digest),
);

if blob_groups.is_empty() {
return Ok(false);
Expand Down Expand Up @@ -428,7 +443,8 @@ async fn batch_download_blobs(
}
}

let digest_groups = partition_into_groups(digests, max_size, |dig| dig.size_bytes as usize);
let digest_groups =
partition_into_groups(digests, max_size, |dig| dig.size_bytes as usize, |_| true);

if digest_groups.is_empty() {
return Ok(());
Expand Down Expand Up @@ -486,10 +502,15 @@ fn partition_into_groups<T>(
items: Vec<T>,
max_size: usize,
get_size: impl Fn(&T) -> usize,
is_filtered: impl Fn(&T) -> bool,
) -> BTreeMap<i32, Partition<T>> {
let mut groups = BTreeMap::<i32, Partition<T>>::default();

for item in items {
if !is_filtered(&item) {
continue;
}

let item_size = get_size(&item);
let mut index_to_use = -1;

Expand Down

0 comments on commit 2198643

Please sign in to comment.