Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
149 changes: 117 additions & 32 deletions keepercommander/commands/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@

BREACHWATCH_MAX = 5
KEEPER_API_BATCH_LIMIT = 999 # Maximum objects per pre_delete API request
KEEPER_API_FOLDER_BATCH_LIMIT = 500 # Maximum folders per pre_delete API request

is_windows = sys.platform.startswith('win')

Expand Down Expand Up @@ -634,7 +635,15 @@ def print_device_info(params: KeeperParams):

class RecordDeleteAllCommand(Command):
"""
Delete all records and folders from the vault.
Delete all records and folders from the user vault.

IMPORTANT: This command automatically skips shared folders and their records.
For shared folders, use the recommended workflow:
1. 'transform-folder <folder_name>' to move records to user folders (fast)
2. Run delete-all to clean remaining user vault content

The command will alert you about any skipped shared folder content.
If the vault is already empty, no confirmation is required.

Args:
params: KeeperParams instance
Expand All @@ -643,34 +652,53 @@ class RecordDeleteAllCommand(Command):
Returns:
None

Note:
Records in multiple folders will be processed using the first
folder found. Deletions are processed in batches of 999 items.
Note:
Only processes user folders and records. Uses batches of up to 999 items per
API call. Records in multiple folders will be processed using the first user
folder found.
"""

def get_parser(self):
return delete_all_parser

def execute(self, params, **kwargs):
"""Delete all records and folders from vault."""
"""Delete all records and folders from vault. Skips confirmation if vault is already empty."""
try:
DeletionValidator.validate_params(params)
except ValueError:
logging.error("Invalid parameters provided")
return

api.sync_down(params)

if self._is_vault_empty(params):
print("No records or folders to delete. Vault is already empty.")
return

if not self._confirm_user_wants_deletion(kwargs):
return

kwargs['force'] = True
api.sync_down(params)

record_stats = self._process_record_deletion(params)
folder_stats = self._process_folder_deletion(params)

self._display_summary(record_stats, folder_stats)
self._finalize_deletion(params, record_stats, folder_stats)

def _is_vault_empty(self, params):
"""Check if vault is completely empty (no records, no folders)."""
from ..subfolder import BaseFolderNode

if len(params.record_cache) > 0:
return False

for folder_uid, folder in params.folder_cache.items():
if folder.type != BaseFolderNode.RootFolderType:
return False

return True

def _confirm_user_wants_deletion(self, kwargs):
"""Show warning and get user confirmation."""
from ..display import bcolors
Expand Down Expand Up @@ -699,15 +727,68 @@ def _confirm_user_wants_deletion(self, kwargs):
return True

def _process_record_deletion(self, params):
"""Collect and delete all records."""
records_with_folders = self._collect_records_with_folders(params)
"""Collect and delete all records, avoiding shared folder records."""
records_with_folders, skipped_stats = self._collect_records_with_folders_safe(params)
if not records_with_folders:
logging.info('No records found to delete')
return DeletionStats()

logging.info('Preparing to delete %s records from Keeper', len(records_with_folders))
if skipped_stats['shared_folders'] > 0 or skipped_stats['shared_records'] > 0:
print(f"\nSHARED FOLDER CONTENT SKIPPED:")
print(f" • {skipped_stats['shared_folders']} shared folders avoided")
print(f" • {skipped_stats['shared_records']} records in shared folders avoided")
print(f"\nFor shared folders with many records, use this workflow:")
print(f" 1. Run 'transform-folder <shared_folder_uid>' to convert shared folder to user folder (fast)")
print(f" 2. Then run delete-all to clean remaining user vault content\n")

logging.info('Preparing to delete %s records from user folders', len(records_with_folders))
return self._delete_objects_in_batches(params, records_with_folders, 'records')

def _collect_records_with_folders_safe(self, params):
"""Collect records from user folders only, avoiding shared folders."""
from ..subfolder import BaseFolderNode

safe_records = []
skipped_stats = {
'shared_folders': 0,
'shared_records': 0
}

shared_folder_uids = set()
shared_record_uids = set()

for folder_uid, folder in params.folder_cache.items():
if folder.type in (BaseFolderNode.SharedFolderType, BaseFolderNode.SharedFolderFolderType):
shared_folder_uids.add(folder_uid)
skipped_stats['shared_folders'] += 1

folder_records = params.subfolder_record_cache.get(folder_uid, set())
shared_record_uids.update(folder_records)

skipped_stats['shared_records'] = len(shared_record_uids)

for record_uid in params.record_cache:
if record_uid in shared_record_uids:
continue

record_folder = None
for folder_uid in params.subfolder_record_cache:
if record_uid in params.subfolder_record_cache.get(folder_uid, set()):
folder = params.folder_cache.get(folder_uid)
if folder and folder.type == BaseFolderNode.UserFolderType:
record_folder = folder
break

if record_folder is None:
root_records = params.subfolder_record_cache.get('', set())
if record_uid in root_records:
record_folder = params.root_folder

if record_folder is not None:
safe_records.append((record_folder, record_uid))

return safe_records, skipped_stats

def _collect_records_with_folders(self, params):
"""Collect all records with their folder contexts."""
from ..subfolder import find_all_folders
Expand Down Expand Up @@ -738,7 +819,7 @@ def _collect_records_with_folders(self, params):
return records_with_folders

def _process_folder_deletion(self, params):
"""Delete all empty folders."""
"""Delete all empty user folders, avoiding shared folders."""
return self._delete_empty_folders(params)

def _delete_objects_in_batches(self, params, objects_with_context, object_type_name):
Expand Down Expand Up @@ -863,7 +944,7 @@ def _finalize_deletion(self, params, record_stats, folder_stats):

def _delete_empty_folders(self, params):
"""
Delete all empty folders after records have been deleted.
Delete all empty user folders after records have been deleted.

Args:
params: KeeperParams instance
Expand All @@ -872,9 +953,8 @@ def _delete_empty_folders(self, params):
int: Number of folders successfully deleted

Note:
Only deletes folders that are completely empty (no records and no subfolders).
Root folder is never deleted. Folders are sorted by depth (deepest first)
to avoid parent-child dependency issues.
Only deletes user folders that are completely empty.
Skips shared folders entirely. Root folder is never deleted.
"""
from ..subfolder import BaseFolderNode

Expand All @@ -886,41 +966,46 @@ def _delete_empty_folders(self, params):
# Sync down to get latest folder state after record deletion
api.sync_down(params)

# Find all folders that are safe to delete
empty_folders = []
empty_user_folders = []
skipped_shared_folders = 0

for folder_uid, folder in params.folder_cache.items():
# Security check: Skip if folder or folder_uid is invalid
if not folder_uid or not folder or not hasattr(folder, 'type'):
logging.warning("Skipping invalid folder entry")
continue

# Security check: Do not attempt to delete root folder
# Delete all other folder types (user folders, shared folders, shared folder subfolders)
if folder.type == BaseFolderNode.RootFolderType:
continue

# Check if folder is empty (no records)
records_in_folder = params.subfolder_record_cache.get(folder_uid, set())
if len(records_in_folder) == 0:
# Also check if it has any subfolders - only delete if completely empty
if not folder.subfolders or len(folder.subfolders) == 0:
empty_folders.append(folder)

if not empty_folders:
logging.info("No folders found to delete")
if folder.type in (BaseFolderNode.SharedFolderType, BaseFolderNode.SharedFolderFolderType):
skipped_shared_folders += 1
continue

if folder.type == BaseFolderNode.UserFolderType:
records_in_folder = params.subfolder_record_cache.get(folder_uid, set())
if len(records_in_folder) == 0:
if not folder.subfolders or len(folder.subfolders) == 0:
empty_user_folders.append(folder)

if skipped_shared_folders > 0:
logging.info(f"Skipped {skipped_shared_folders} shared folders (use transform-folder workflow instead)")

if not empty_user_folders:
logging.info("No empty user folders found to delete")
return 0

logging.info('Found %s folders to delete', len(empty_folders))
logging.info('Found %s empty user folders to delete', len(empty_user_folders))

# Sort folders by depth (deepest first) to avoid parent-child dependency issues
# Folders with longer paths are deeper in the hierarchy
from ..subfolder import get_folder_path
empty_folders.sort(key=lambda f: len(get_folder_path(params, f.uid).split('/')), reverse=True)
empty_user_folders.sort(key=lambda f: len(get_folder_path(params, f.uid).split('/')), reverse=True)

total_folders_deleted = 0
total_folders_failed = 0

folders_to_delete = empty_folders[:]
folders_to_delete = empty_user_folders[:]
while len(folders_to_delete) > 0:
# Prepare pre_delete request for folders
rq = {
Expand All @@ -929,8 +1014,8 @@ def _delete_empty_folders(self, params):
}

# Process chunk of folders
chunk = folders_to_delete[:KEEPER_API_BATCH_LIMIT]
folders_to_delete = folders_to_delete[KEEPER_API_BATCH_LIMIT:]
chunk = folders_to_delete[:KEEPER_API_FOLDER_BATCH_LIMIT]
folders_to_delete = folders_to_delete[KEEPER_API_FOLDER_BATCH_LIMIT:]

for folder in chunk:
# Security check: Validate folder has required attributes
Expand Down