-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathmsys2-repo-prune
executable file
·220 lines (170 loc) · 7.67 KB
/
msys2-repo-prune
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
#!/usr/bin/env python3
import os
import re
import sys
import fnmatch
import argparse
from datetime import datetime, timedelta
from msys2_devtools.db import ExtTarFile
def get_safe_patterns_for_db(db_path):
"""Returns a list of filename patterns that are 'referenced' by the DB
Any files matching any of the patterns should not be deleted.
"""
safe_patterns = {
"*.db.*",
"*.files.*",
"*.db",
"*.files",
}
with ExtTarFile.open(db_path, mode='r') as tar:
for info in tar.getmembers():
file_name = info.name.rsplit("/", 1)[-1]
if file_name == "desc":
infodata = tar.extractfile(info).read().decode()
lines = infodata.splitlines()
def get_value(key, default=None):
if key in lines:
return lines[lines.index(key) + 1]
assert default is not None
return default
filename = get_value("%FILENAME%")
assert fnmatch.fnmatchcase(filename, "*.pkg.*")
filename += "*"
sourcename = get_value("%BASE%", get_value("%NAME%"))
sourcename += "-" + get_value("%VERSION%") + "*.src.*"
safe_patterns.add(filename)
safe_patterns.add(sourcename)
return safe_patterns
def log(*message):
"""Print to stderr, so we can redirect stdout for the file list"""
print('\033[94m' + "[LOG]" + '\033[0m', end=" ", file=sys.stderr)
print(*message, file=sys.stderr)
def find_dbs(target_dir):
"""Recursively look for DB files"""
db_paths = set()
target_dir = os.path.realpath(target_dir)
for root, dirs, files in os.walk(target_dir):
for name in files:
if fnmatch.fnmatch(name, '*.db'):
db_paths.add(os.path.join(root, name))
return db_paths
def get_dirs_to_prune(db_path):
"""For every DB file we also have a sources directory"""
dir_ = os.path.dirname(db_path)
sources = os.path.normpath(os.path.join(dir_, '..', 'sources'))
assert os.path.exists(dir_)
assert os.path.exists(sources)
return {dir_, sources}
def fnmatch_filter_case_multi(names, patterns):
"""Like fnmatch.filter() but case sensitive and supports multiple patterns"""
regex = re.compile('|'.join(fnmatch.translate(p) for p in patterns))
return [e for e in names if regex.match(e)]
def get_files_to_prune(target_dir, time_delta):
"""Gives a list of paths to delete"""
newest_mtime = 0.0
prune_mapping = {}
for db_path in find_dbs(target_dir):
# Make sure we don't look at one repo alone, otherwise we might delete sources
# referenced from another repo
if os.path.samefile(os.path.dirname(db_path), target_dir):
raise SystemExit("Error: root dir is same as repo dir, move one level up at least")
log("Found DB:", db_path)
db_mtime = os.path.getmtime(db_path)
if db_mtime > newest_mtime:
newest_mtime = db_mtime
patterns = get_safe_patterns_for_db(db_path)
for prune_dir in get_dirs_to_prune(db_path):
if prune_dir not in prune_mapping:
log("Found prune location:", prune_dir)
prune_mapping[prune_dir] = set(patterns)
else:
prune_mapping[prune_dir].update(patterns)
log("Searching...")
ref_mtime = datetime.fromtimestamp(newest_mtime)
paths_to_delete = set()
for prune_dir, safe_patterns in sorted(prune_mapping.items()):
log("Dir:", prune_dir)
def get_entries_not_to_delete(entries):
return set(fnmatch_filter_case_multi(entries, safe_patterns))
def group_by_package_name_and_ext(entries):
groups = {}
for e in entries:
name = e.rsplit("-", 3)[0]
ext = e.rsplit("-", 1)[-1].split(".", 1)[-1]
# normalize different compression types
ext = ext.replace(".xz", "").replace(".zst", "").replace(".gz", "")
if ext.startswith("src."):
name = e.rsplit("-", 2)[0]
else:
name = e.rsplit("-", 3)[0]
groups.setdefault((name, ext), []).append(e)
return list(groups.values())
def get_timestamp(entry):
path = os.path.join(prune_dir, entry)
return datetime.fromtimestamp(os.path.getmtime(path))
def is_too_old(entry, factor=1):
dt = get_timestamp(entry)
return dt <= ref_mtime and ref_mtime - dt > time_delta * factor
def group_get_paths_too_old(group, not_to_delete):
# For every group we keep one package that is older than the cut-off point
# so that at the cut-off point all packages of the DB synced at the time still exist
group.sort(key=get_timestamp)
maybe_too_old = set()
for i, e in enumerate(group):
if not is_too_old(e):
maybe_too_old.update(group[:max(0, i-1)])
break
else:
# XXX: In case a package got removed from the DB and all are too old we might still
# want to keep the last one since it might have been in the DB back then.
# We can't be sure though, so just keek it if it's not older than 4 * time_delta
if is_too_old(group[-1], 4):
maybe_too_old.update(group)
else:
maybe_too_old.update(group[:-1])
return [os.path.join(prune_dir, e) for e in maybe_too_old if e not in not_to_delete]
# we group package by type and name and keep only one package per group around
# that is older than the prune date
entries = sorted(os.listdir(prune_dir))
not_to_delete = get_entries_not_to_delete(entries)
for group in group_by_package_name_and_ext(entries):
paths_to_delete.update(group_get_paths_too_old(group, not_to_delete))
def get_related_files(path: str) -> list[str]:
"""Returns a list of related files which should be deleted together with the given file"""
if not path.endswith(".sig"):
return [path + ".sig"]
else:
return [path[:-4]]
def related_also_to_delete(path: str) -> bool:
"""Returns True if all related files are also to be deleted, or don't exist"""
for related in get_related_files(path):
if related not in paths_to_delete and os.path.exists(related):
return False
return True
paths_to_delete = set(p for p in paths_to_delete if related_also_to_delete(p))
return paths_to_delete
def main(argv):
parser = argparse.ArgumentParser(
description="List old packages to prune", allow_abbrev=False)
parser.add_argument("root", help="path to root dir")
parser.add_argument("--days", default=365 * 1.75, type=int,
help="days after which a package can be pruned")
args = parser.parse_args(argv[1:])
log(f"Pruning unused files older than {args.days} days")
time_delta = timedelta(days=args.days)
paths = get_files_to_prune(args.root, time_delta)
log(f"Found {len(paths)} files to prune")
size = 0
for path in sorted(paths):
size += os.path.getsize(path)
log(str(datetime.fromtimestamp(os.path.getmtime(path))).split()[0], path)
log(f"Removing would save {size / 1024 ** 3: .3f} GB")
choice = input("OK to delete? [Y/n] ")
if choice.upper() != "Y":
print("Aborting")
return
for path in sorted(paths):
log(f"Removing {path}")
os.remove(path)
if __name__ == '__main__':
main(sys.argv)