-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathutil.py
163 lines (136 loc) · 5.12 KB
/
util.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
from contextlib import contextmanager
import gzip
import json
import logging
import os
from tempfile import NamedTemporaryFile
from typing import (
List,
Optional,
Sequence,
Union,
MutableMapping,
)
import uuid
from _pathlib import Path
def open_maybe_gz(path, mode: str, **kwargs):
"""
Like the open() built-in but transparently handles .gz files.
Can be used as a context manager.
"""
# Since `open` and `gzip.open` disagree on the default mode and whether just
# 'r' implies 'b' or 't', the caller must be unambiguously specify the mode.
# Plus, we don't support any of the write modes.
if mode in ('rb', 'rt'):
with open(path, 'rb') as f:
header = f.read(2)
if header == b'\x1f\x8b': # GZIP magic cookie
return gzip.open(path, mode, **kwargs)
else:
return open(path, mode, **kwargs)
else:
raise ValueError("Unsupported mode (must be 'rb' or 'rt'):", mode)
def generate_project_uuid(accessions: Union[str, Sequence[str]]) -> str:
"""
Deterministically generate a project UUID based on one or more GEO accession ids.
"""
if isinstance(accessions, str):
accessions = [accessions]
namespace_uuid = uuid.UUID('296e1002-1c99-4877-bb7f-bb6a3b752638')
return str(uuid.uuid5(namespace_uuid, ''.join(sorted(accessions))))
def generate_file_uuid(bundle_uuid: str, file_name: str) -> str:
"""
Deterministically generate a file UUID based on the parent bundle uuid and its file name.
"""
namespace_uuid = uuid.UUID('4c52e3d0-ffe5-4b4d-a4d0-cb6a6f372b31')
return str(uuid.uuid5(namespace_uuid, bundle_uuid + file_name))
WORKING_SET_ENV_VAR = 'SKUNK_ACCESSIONS'
def is_working_set_defined() -> bool:
return WORKING_SET_ENV_VAR in os.environ
def get_skunk_accessions() -> Optional[List[str]]:
try:
accessions = os.environ[WORKING_SET_ENV_VAR]
except KeyError:
return None
else:
return [acc.strip() for acc in accessions.split(',') if acc.strip()]
def get_target_spreadsheets() -> MutableMapping[str, Path]:
accessions = get_skunk_accessions()
paths_by_accession = {}
ext = '.0.xlsx'
def get_accession_from_path(path):
assert path.name.endswith(ext)
return path.name[:-len(ext)]
for sub_dir in ('existing', 'new'):
src_dir = Path('spreadsheets') / sub_dir
paths = list(src_dir.iterdir())
paths = [
path
for path in paths
if path.is_file() and path.name.endswith(ext)
]
subdir_paths_by_accession = {
get_accession_from_path(path): path
for path in paths
}
assert len(paths) == len(subdir_paths_by_accession)
subdir_paths_by_accession = {
accession: path for accession, path in subdir_paths_by_accession.items()
if accessions is None or accession in accessions
}
paths_by_accession.update(subdir_paths_by_accession)
return paths_by_accession
def get_target_project_dirs(follow_links: bool = False) -> List[Path]:
"""
Return all or a subset of the project directories, if that subset is
configured.
:param follow_links: If True, follow the symbolic accession links and return
Path instances referring to the physical, UUID-based
project directories. Otherwise Path instances referring
to the symbolic accession links will be returned.
"""
projects_dir = Path('projects')
accessions = get_skunk_accessions()
symlinks = [
path for path in projects_dir.iterdir()
if path.is_dir() and path.is_symlink() and (accessions is None or path.name in accessions)
]
# Validate the links even though strictly speaking its only necessary to
# follow them when follow_links is on.
project_dirs = []
for symlink in symlinks:
project_dir = symlink.follow()
assert project_dir.is_dir()
assert not project_dir.is_symlink()
assert not project_dir.is_absolute()
assert project_dir.parent == projects_dir
accession = symlink.name
project_uuid = generate_project_uuid([accession])
assert project_dir.name == project_uuid
project_dirs.append(project_dir)
return project_dirs if follow_links else symlinks
@contextmanager
def update_project_stats(project_dir: Path):
"""
Read a project's stats.json and yield contents as a dict
that will then be written back to the stats.json file.
"""
stats_file = project_dir / 'stats.json'
try:
with open(str(stats_file), 'r') as f:
stats = json.load(f)
except FileNotFoundError:
stats = {
'project_uuid': generate_project_uuid(project_dir.name)
}
yield stats
temporary_file = stats_file.with_suffix(stats_file.suffix + '.tmp')
try:
with open(str(temporary_file), 'w') as f:
json.dump(stats, f, sort_keys=True, indent=4)
except:
Path(f.name).unlink()
raise
else:
logging.info('Writing to %s', stats_file)
Path(f.name).rename(stats_file)