1
1
from __future__ import annotations
2
2
3
3
import logging
4
+ import os
5
+ import shutil
4
6
import subprocess
5
7
from contextlib import contextmanager
6
8
from pathlib import Path
16
18
17
19
_SQUASHFS_DIR : Final = Path ("/var/run/squashfs/" )
18
20
21
+ def _squashfs_removemount (mount_point : Path ):
22
+ if mount_point .exists ():
23
+ if mount_point .is_mount ():
24
+ return_code = subprocess .call (["umount" , mount_point ])
25
+ if return_code != 0 :
26
+ _logger .debug ("squashfs_rom: unmounting %s failed" , mount_point )
27
+ raise BatoceraException (f"Unable to unmount the file { mount_point } " )
28
+ mount_point .rmdir ()
29
+
30
+ def _squashfs_rom_cleanup (mount_point : Path , overlay : bool ):
31
+ _logger .debug ("squashfs_rom: cleaning up %s" , mount_point )
32
+ os .chdir (_SQUASHFS_DIR )
33
+ _squashfs_removemount (mount_point )
34
+
35
+ if overlay :
36
+ overlay_root = mount_point .parent
37
+ mount_point_rw = overlay_root / "overlay"
38
+
39
+ if overlay_root .exists ():
40
+ _logger .debug ("squashfs_rom: cleaning up overlay root %s" , overlay_root )
41
+ _squashfs_removemount (mount_point_rw )
42
+ shutil .rmtree (overlay_root )
19
43
20
44
@contextmanager
21
- def squashfs_rom (rom : Path , / ) -> Iterator [Path ]:
45
+ def squashfs_rom (rom : Path , overlay : bool ) -> Iterator [Path ]:
22
46
_logger .debug ("squashfs_rom(%s)" , rom )
23
- mount_point = _SQUASHFS_DIR / rom .stem
24
47
25
48
mkdir_if_not_exists (_SQUASHFS_DIR )
26
49
27
- # first, try to clean an empty remaining directory (for example because of a crash)
28
- if mount_point .exists () and mount_point .is_dir ():
29
- _logger .debug ("squashfs_rom: %s already exists" , mount_point )
30
- # try to remove an empty directory, else, run the directory, ignoring the .squashfs
31
- try :
32
- mount_point .rmdir ()
33
- except (FileNotFoundError , OSError ):
34
- _logger .debug ("squashfs_rom: failed to rmdir %s" , mount_point )
35
- yield mount_point
36
- # No cleanup is necessary
37
- return
50
+ if overlay :
51
+ overlay_root = _SQUASHFS_DIR / rom .stem
52
+ mount_point = overlay_root / "rom"
53
+ overlay_delta = overlay_root / "delta"
54
+ overlay_tmp = overlay_root / "tmp"
55
+ mount_point_rw = overlay_root / "overlay"
56
+
57
+ if overlay_root .exists () and overlay_root .is_dir ():
58
+ _logger .debug ("squashfs_rom: %s overlay root already exists" , overlay_root )
59
+ _squashfs_rom_cleanup (mount_point , overlay )
60
+ else :
61
+ mount_point = _SQUASHFS_DIR / rom .stem
62
+
63
+ # first, try to clean an empty remaining directories (for example because of a crash)
64
+ if mount_point .exists () and mount_point .is_dir ():
65
+ _logger .debug ("squashfs_rom: %s already exists" , mount_point )
66
+
67
+ # try to remove an empty directory, else, run the directory, ignoring the .squashfs
68
+ try :
69
+ mount_point .rmdir ()
70
+ except (FileNotFoundError , OSError ):
71
+ _logger .debug ("squashfs_rom: failed to rmdir %s" , mount_point )
72
+ yield mount_point
73
+ # No cleanup is necessary
74
+ return
38
75
39
76
# ok, the base directory doesn't exist, let's create it and mount the squashfs on it
40
- mount_point .mkdir ()
77
+ mount_point .mkdir (parents = True )
41
78
42
79
return_code = subprocess .call (["mount" , rom , mount_point ])
43
80
if return_code != 0 :
@@ -48,28 +85,33 @@ def squashfs_rom(rom: Path, /) -> Iterator[Path]:
48
85
pass
49
86
raise BatoceraException (f"Unable to mount the file { rom } " )
50
87
88
+ if overlay :
89
+ for dir in [overlay_delta , overlay_tmp , mount_point_rw ]: dir .mkdir (exist_ok = True )
90
+ return_code = subprocess .call (["mount" ,
91
+ "-t" , "overlay" ,
92
+ "overlay" ,
93
+ "-o" , f"lowerdir={ mount_point } ,upperdir={ overlay_delta } ,workdir={ overlay_tmp } " ,
94
+ mount_point_rw ])
95
+ if return_code != 0 :
96
+ _logger .debug ("squashfs_rom: mounting overlay %s failed" , mount_point_rw )
97
+ _squashfs_rom_cleanup (mount_point , overlay )
98
+ raise BatoceraException (f"Unable to mount the file { rom } " )
99
+
51
100
try :
101
+ working_mount_point = mount_point_rw if overlay else mount_point
102
+
52
103
# if the squashfs contains a single file with the same name, take it as the rom file
53
- rom_single = mount_point / rom .stem
104
+ rom_single = working_mount_point / rom .stem
54
105
if len (list (mount_point .iterdir ())) == 1 and rom_single .exists ():
55
106
_logger .debug ("squashfs: single rom %s" , rom_single )
56
107
yield rom_single
57
108
else :
58
109
try :
59
- rom_linked = (mount_point / ".ROM" ).resolve (strict = True )
110
+ rom_linked = (working_mount_point / ".ROM" ).resolve (strict = True )
60
111
except OSError :
61
- yield mount_point
112
+ yield working_mount_point
62
113
else :
63
114
_logger .debug ("squashfs: linked rom %s" , rom_linked )
64
115
yield rom_linked
65
116
finally :
66
- _logger .debug ("squashfs_rom: cleaning up %s" , mount_point )
67
-
68
- # unmount
69
- return_code = subprocess .call (["umount" , mount_point ])
70
- if return_code != 0 :
71
- _logger .debug ("squashfs_rom: unmounting %s failed" , mount_point )
72
- raise BatoceraException (f"Unable to unmount the file { mount_point } " )
73
-
74
- # cleaning the empty directory
75
- mount_point .rmdir ()
117
+ _squashfs_rom_cleanup (mount_point , overlay )
0 commit comments