Skip to content

Commit d6add16

Browse files
authored
Adds prune_tag_stream public api method to spfs Cleaner (#1171)
Refactors prune_tag_stream_and_walk method into prune_tag_stream and walk_attached_objects. Signed-off-by: David Gilligan-Cook <[email protected]>
1 parent 8bcd163 commit d6add16

File tree

2 files changed

+37
-19
lines changed

2 files changed

+37
-19
lines changed

crates/spfs/src/clean.rs

+36-18
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,34 @@ where
358358
}
359359

360360
async fn prune_tag_stream_and_walk(&self, tag_spec: tracking::TagSpec) -> Result<CleanResult> {
361+
let (mut result, to_keep) = self.prune_tag_stream(tag_spec).await?;
362+
result += self.walk_attached_objects(&to_keep).await?;
363+
364+
Ok(result)
365+
}
366+
367+
#[async_recursion::async_recursion]
368+
async fn walk_attached_objects(&self, digests: &[encoding::Digest]) -> Result<CleanResult> {
369+
let mut result = CleanResult::default();
370+
371+
let mut walk_stream = futures::stream::iter(digests.iter())
372+
.then(|digest| ready(self.visit_attached_objects(*digest).boxed()))
373+
.buffer_unordered(self.discover_concurrency)
374+
.boxed();
375+
while let Some(res) = walk_stream.try_next().await? {
376+
result += res;
377+
}
378+
379+
Ok(result)
380+
}
381+
382+
/// Visit the tag and its history, pruning as configured and
383+
/// returning a cleaning (pruning) result and list of tags that
384+
/// were kept.
385+
pub async fn prune_tag_stream(
386+
&self,
387+
tag_spec: tracking::TagSpec,
388+
) -> Result<(CleanResult, Vec<encoding::Digest>)> {
361389
let history = self
362390
.repo
363391
.read_tag(&tag_spec)
@@ -377,7 +405,7 @@ where
377405
if self.prune_params.should_prune(&spec, &tag) {
378406
to_prune.push(tag);
379407
} else {
380-
to_keep.push(tag);
408+
to_keep.push(tag.target);
381409
}
382410
}
383411

@@ -395,19 +423,11 @@ where
395423

396424
result.pruned_tags.insert(tag_spec, to_prune);
397425

398-
let mut walk_stream = futures::stream::iter(to_keep.iter())
399-
.then(|tag| ready(self.discover_attached_objects(tag.target).boxed()))
400-
.buffer_unordered(self.discover_concurrency)
401-
.boxed();
402-
while let Some(res) = walk_stream.try_next().await? {
403-
result += res;
404-
}
405-
406-
Ok(result)
426+
Ok((result, to_keep))
407427
}
408428

409429
#[async_recursion::async_recursion]
410-
async fn discover_attached_objects(&self, digest: encoding::Digest) -> Result<CleanResult> {
430+
async fn visit_attached_objects(&self, digest: encoding::Digest) -> Result<CleanResult> {
411431
let mut result = CleanResult::default();
412432
if !self.attached.insert(digest) {
413433
return Ok(result);
@@ -432,13 +452,11 @@ where
432452
result.visited_payloads += 1;
433453
self.reporter.visit_payload(&b);
434454
}
435-
let mut walk_stream = futures::stream::iter(obj.child_objects())
436-
.then(|child| ready(self.discover_attached_objects(child).boxed()))
437-
.buffer_unordered(self.discover_concurrency)
438-
.boxed();
439-
while let Some(res) = walk_stream.try_next().await? {
440-
result += res;
441-
}
455+
456+
// This recursively calls visit_attached_objects() (this
457+
// method) on any child objects.
458+
result += self.walk_attached_objects(&obj.child_objects()).await?;
459+
442460
Ok(result)
443461
}
444462

crates/spfs/src/clean_test.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ async fn test_attached_objects(#[future] tmprepo: TempRepo) {
2424

2525
let cleaner = Cleaner::new(&tmprepo).with_reporter(TracingCleanReporter);
2626
cleaner
27-
.discover_attached_objects(manifest.digest().unwrap())
27+
.visit_attached_objects(manifest.digest().unwrap())
2828
.await
2929
.unwrap();
3030

0 commit comments

Comments
 (0)