Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions dvc/cli/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@
move,
params,
plots,
purge,
queue,
remote,
remove,
Expand Down Expand Up @@ -90,6 +91,7 @@
move,
params,
plots,
purge,
queue,
remote,
remove,
Expand Down
108 changes: 108 additions & 0 deletions dvc/commands/purge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
import os

from dvc.cli import formatter
from dvc.cli.command import CmdBase
from dvc.cli.utils import append_doc_link
from dvc.log import logger
from dvc.ui import ui

logger = logger.getChild(__name__)


class CmdPurge(CmdBase):
def run(self):
if not self.args.dry_run:
msg = "This will permanently remove local DVC-tracked outputs "
else:
msg = "This will show what local DVC-tracked outputs would be removed "
if self.args.targets:
msg += "for the following targets:\n - " + "\n - ".join(
[os.path.abspath(t) for t in self.args.targets]
)
else:
msg += "for the entire workspace."

if self.args.recursive:
msg += "\nRecursive purge is enabled."

if self.args.dry_run:
msg += "\n(dry-run: showing what would be removed, no changes)."

logger.warning(msg)

if (
not self.args.force
and not self.args.dry_run
and not self.args.yes
and not ui.confirm("Are you sure you want to proceed?")
):
return 1

# Call repo API
self.repo.purge(
targets=self.args.targets,
recursive=self.args.recursive,
force=self.args.force,
dry_run=self.args.dry_run,
unused_cache=self.args.unused_cache,
)
return 0


def add_parser(subparsers, parent_parser):
PURGE_HELP = "Remove tracked outputs and their cache."
PURGE_DESCRIPTION = (
"Removes cache objects and workspace copies of DVC-tracked outputs.\n"
"Metadata remains intact, and non-DVC files are untouched.\n\n"
"`--unused-cache` mode will clear the cache of any files not checked\n"
"out in the current workspace."
)
purge_parser = subparsers.add_parser(
"purge",
parents=[parent_parser],
description=append_doc_link(PURGE_DESCRIPTION, "purge"),
help=PURGE_HELP,
formatter_class=formatter.RawDescriptionHelpFormatter,
)

purge_parser.add_argument(
"targets",
nargs="*",
help="Optional list of files/directories to purge (default: entire repo).",
)
purge_parser.add_argument(
"-r",
"--recursive",
action="store_true",
default=False,
help="Recursively purge directories.",
)
purge_parser.add_argument(
"--dry-run",
dest="dry_run",
action="store_true",
default=False,
help="Only print what would be removed without actually removing.",
)
purge_parser.add_argument(
"-f",
"--force",
action="store_true",
default=False,
help="Force purge, bypassing safety checks and prompts.",
)
purge_parser.add_argument(
"-y",
"--yes",
action="store_true",
default=False,
help="Do not prompt for confirmation (respects safety checks).",
)
purge_parser.add_argument(
"--unused-cache",
action="store_true",
default=False,
help="Remove cache objects not currently checked out in the workspace.",
)

purge_parser.set_defaults(func=CmdPurge)
1 change: 1 addition & 0 deletions dvc/repo/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ class Repo:
from dvc.repo.ls_url import ls_url as _ls_url # type: ignore[misc]
from dvc.repo.move import move # type: ignore[misc]
from dvc.repo.pull import pull # type: ignore[misc]
from dvc.repo.purge import purge # type: ignore[misc]
from dvc.repo.push import push # type: ignore[misc]
from dvc.repo.remove import remove # type: ignore[misc]
from dvc.repo.reproduce import reproduce # type: ignore[misc]
Expand Down
234 changes: 234 additions & 0 deletions dvc/repo/purge.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,234 @@
from typing import TYPE_CHECKING, Optional

from dvc.config import NoRemoteError, RemoteNotFoundError
from dvc.exceptions import DvcException
from dvc.log import logger

from . import locked

if TYPE_CHECKING:
from dvc.output import Output
from dvc.repo import Repo

logger = logger.getChild(__name__)


class PurgeError(DvcException):
"""Raised when purge fails due to safety or internal errors."""


def _flatten_stages_or_outs(items) -> list["Output"]:
"""Normalize collect() results into a flat list of Output objects."""
outs = []
for item in items:
if isinstance(item, list):
outs.extend(_flatten_stages_or_outs(item))
elif hasattr(item, "outs"): # Stage
outs.extend(item.outs)
elif hasattr(item, "use_cache"): # Already an Output
outs.append(item)
else:
logger.debug("Skipping non-stage item in collect(): %r", item)
return outs


def _check_dirty(outs, force: bool) -> None:
dirty = [o for o in outs if o.use_cache and o.changed()]
if dirty and not force:
raise PurgeError(
"Some tracked outputs have uncommitted changes. "
"Use `--force` to purge anyway.\n - "
+ "\n - ".join(str(o) for o in dirty)
)


def _get_remote_odb(repo: "Repo"):
try:
return repo.cloud.get_remote_odb(None)
except (RemoteNotFoundError, NoRemoteError):
return None


def _check_remote_backup(repo: "Repo", outs, force: bool) -> None:
remote_odb = _get_remote_odb(repo)

if not remote_odb:
if not force:
raise PurgeError(
"No default remote configured. "
"Cannot safely purge outputs without verifying remote backup.\n"
"Use `--force` to purge anyway."
)
logger.warning(
"No default remote configured. Proceeding with purge due to --force. "
"Outputs may be permanently lost."
)
return

# remote exists, check objects
not_in_remote = [
str(o)
for o in outs
if o.use_cache
and o.hash_info
and o.hash_info.value
and not remote_odb.exists(o.hash_info.value)
]
if not_in_remote and not force:
raise PurgeError(
"Some outputs are not present in the remote cache and would be "
"permanently lost if purged:\n - "
+ "\n - ".join(not_in_remote)
+ "\nUse `--force` to purge anyway."
)
if not_in_remote and force:
logger.warning(
"Some outputs are not present in the remote cache and may be "
"permanently lost:\n - %s",
"\n - ".join(not_in_remote),
)


def _remove_outs(outs, dry_run: bool) -> int:
removed = 0
for out in outs:
if dry_run:
logger.info("[dry-run] Would remove %s", out)
continue

try:
# remove workspace file
if out.exists:
out.remove(ignore_remove=False)

# remove cache entry
if out.use_cache and out.hash_info:
cache_path = out.cache.oid_to_path(out.hash_info.value)
if out.cache.fs.exists(cache_path):
out.cache.fs.remove(cache_path, recursive=True)

removed += 1
except Exception:
logger.exception("Failed to remove %s", out)
return removed


def _compute_checked_out_hashes(repo: "Repo"):
# Collect all stages
items = list(repo.index.stages)

# Flatten to outs
all_outs = []
for st in items:
all_outs.extend(st.outs)

# Keep only outs that actually exist in the workspace
used = set()
for out in all_outs:
if out.use_cache and out.exists and out.hash_info and out.hash_info.value:
used.add(out.hash_info.value)

return used


def _remove_unused_cache(repo: "Repo", dry_run: bool) -> int:
"""
Remove cache objects whose outputs are not currently checked out.
A 'used' object is defined as: workspace file exists AND has a hash.
"""
# Compute hashes for outputs that are currently checked out
used_hashes = _compute_checked_out_hashes(repo)

removed = 0

# Iterate through all local cache ODBs
for _scheme, odb in repo.cache.by_scheme():
if not odb:
continue

# Iterate through all cached object IDs
for obj_id in list(odb.all()):
if obj_id in used_hashes:
continue

cache_path = odb.oid_to_path(obj_id)

if dry_run:
logger.info("[dry-run] Would remove unused cache %s", cache_path)
else:
try:
odb.fs.remove(cache_path, recursive=True)
removed += 1
except Exception:
logger.exception("Failed to remove unused cache %s", cache_path)

return removed


@locked
def purge(
self: "Repo",
targets: Optional[list[str]] = None,
recursive: bool = False,
force: bool = False,
dry_run: bool = False,
unused_cache: bool = False,
) -> int:
"""
Purge removes local copies of DVC-tracked outputs and their cache.

- Collects outs from .dvc files and dvc.yaml.
- Ensures safety (no dirty outs unless --force).
- Ensures outputs are backed up to remote (unless --force).
- Removes both workspace copies and cache objects.
- Metadata remains intact.
"""
from dvc.repo.collect import collect
from dvc.stage.exceptions import StageFileDoesNotExistError

try:
items = (
collect(self, targets=targets, recursive=recursive)
if targets
else list(self.index.stages)
)
except StageFileDoesNotExistError as e:
raise PurgeError(str(e)) from e

outs = _flatten_stages_or_outs(items)
if not outs:
logger.info("No DVC-tracked outputs found to purge.")
return 0

# Determine whether we should remove outs.
# If unused_cache mode, don't remove anything.
remove_outs = not unused_cache
if unused_cache and targets is not None:
logger.warning(
"`--unused-cache` mode should be used exclusively,"
" other args have been provided but will be ignored."
)

removed = 0
if remove_outs:
# Run safety checks
_check_dirty(outs, force)
_check_remote_backup(self, outs, force)

# Remove outs
removed = _remove_outs(outs, dry_run)

# Remove unused cache if requested
if unused_cache:
logger.info("Removing unused cache objects...")
unused_removed = _remove_unused_cache(self, dry_run=dry_run)
if unused_removed:
logger.info("Removed %d unused cache objects.", unused_removed)
else:
logger.info("No unused cache objects to remove.")

if removed:
logger.info("Removed %d outputs (workspace + cache).", removed)
else:
logger.info("Nothing to purge.")
return 0
Loading
Loading