1
+ import asyncio
2
+ from dataclasses import dataclass , field
3
+ import os
4
+ from pathlib import Path
5
+ import shutil
6
+ from typing import Any , Iterable , Optional
7
+
8
+ import sysrsync
9
+
10
+ from snakemake_interface_storage_plugins .settings import StorageProviderSettingsBase
11
+ from snakemake_interface_storage_plugins .storage_provider import (
12
+ StorageProviderBase ,
13
+ StorageQueryValidationResult ,
14
+ )
15
+ from snakemake_interface_storage_plugins .storage_object import (
16
+ StorageObjectRead ,
17
+ StorageObjectWrite ,
18
+ StorageObjectGlob ,
19
+ retry_decorator ,
20
+ )
21
+ from snakemake_interface_storage_plugins .io import (
22
+ IOCacheStorageInterface ,
23
+ get_constant_prefix ,
24
+ )
25
+
26
+
27
+ # Required:
28
+ # Implementation of your storage provider
29
+ # This class can be empty as the one below.
30
+ # You can however use it to store global information or maintain e.g. a connection
31
+ # pool.
32
+ class StorageProvider (StorageProviderBase ):
33
+ # For compatibility with future changes, you should not overwrite the __init__
34
+ # method. Instead, use __post_init__ to set additional attributes and initialize
35
+ # futher stuff.
36
+
37
+ def __post_init__ (self ):
38
+ # This is optional and can be removed if not needed.
39
+ # Alternatively, you can e.g. prepare a connection to your storage backend here.
40
+ # and set additional attributes.
41
+ pass
42
+
43
+ @classmethod
44
+ def is_valid_query (cls , query : str ) -> StorageQueryValidationResult :
45
+ """Return whether the given query is valid for this storage provider."""
46
+ # Ensure that also queries containing wildcards (e.g. {sample}) are accepted
47
+ # and considered valid. The wildcards will be resolved before the storage
48
+ # object is actually used.
49
+ try :
50
+ Path (query )
51
+ except Exception :
52
+ return False
53
+
54
+ def list_objects (self , query : Any ) -> Iterable [str ]:
55
+ """Return an iterator over all objects in the storage that match the query.
56
+
57
+ This is optional and can raise a NotImplementedError() instead.
58
+ """
59
+ query = Path (query )
60
+ if query .is_dir ():
61
+ return map (str , Path (query ).rglob ("*" ))
62
+ elif query .exists ():
63
+ return query ,
64
+ else :
65
+ return ()
66
+
67
+
68
+ # Required:
69
+ # Implementation of storage object. If certain methods cannot be supported by your
70
+ # storage (e.g. because it is read-only see
71
+ # snakemake-storage-http for comparison), remove the corresponding base classes
72
+ # from the list of inherited items.
73
+ class StorageObject (StorageObjectRead , StorageObjectWrite , StorageObjectGlob ):
74
+ # For compatibility with future changes, you should not overwrite the __init__
75
+ # method. Instead, use __post_init__ to set additional attributes and initialize
76
+ # futher stuff.
77
+
78
+ def __post_init__ (self ):
79
+ # This is optional and can be removed if not needed.
80
+ # Alternatively, you can e.g. prepare a connection to your storage backend here.
81
+ # and set additional attributes.
82
+ self .query_path = Path (self .query )
83
+
84
+ async def inventory (self , cache : IOCacheStorageInterface ):
85
+ """From this file, try to find as much existence and modification date
86
+ information as possible. Only retrieve that information that comes for free
87
+ given the current object.
88
+ """
89
+ # This is optional and can be left as is
90
+
91
+ # If this is implemented in a storage object, results have to be stored in
92
+ # the given IOCache object.
93
+ key = self .cache_key ()
94
+ try :
95
+ stat = self ._stat ()
96
+ except FileNotFoundError :
97
+ cache .exists_in_storage [key ] = False
98
+ if self .query_path .is_symlink ():
99
+ # get symlink stat
100
+ lstat = self ._stat (follow_symlinks = False )
101
+ else :
102
+ lstat = stat
103
+ cache .mtime [key ] = lstat .st_mtime
104
+ cache .size [key ] = stat .st_size
105
+ cache .exists_in_storage [key ] = True
106
+
107
+ def get_inventory_parent (self ) -> Optional [str ]:
108
+ """Return the parent directory of this object."""
109
+ # this is optional and can be left as is
110
+ parent = self .query_path .parent
111
+ if parent == Path ("." ):
112
+ return None
113
+ else :
114
+ return parent
115
+
116
+ def local_suffix (self ) -> str :
117
+ """Return a unique suffix for the local path, determined from self.query."""
118
+ suffix = self .query
119
+ if suffix .startswith ("/" ):
120
+ # convert absolute path to unique relative path
121
+ suffix = f"__abspath__/{ suffix [1 :]} "
122
+ return self .query .removeprefix ("/" )
123
+
124
+ def close (self ):
125
+ # Nothing to be done here.
126
+ pass
127
+
128
+ # Fallible methods should implement some retry logic.
129
+ # The easiest way to do this (but not the only one) is to use the retry_decorator
130
+ # provided by snakemake-interface-storage-plugins.
131
+ def exists (self ) -> bool :
132
+ # return True if the object exists
133
+ return self .query_path .exists ()
134
+
135
+ def mtime (self ) -> float :
136
+ # return the modification time
137
+ return self ._stat (follow_symlinks = False ).st_mtime
138
+
139
+ def size (self ) -> int :
140
+ # return the size in bytes
141
+ return self ._stat ().st_size
142
+
143
+ def retrieve_object (self ):
144
+ # Ensure that the object is accessible locally under self.local_path()
145
+ sysrsync .run (
146
+ self .query_path ,
147
+ self .local_path (),
148
+ )
149
+
150
+ def store_object (self ):
151
+ # Ensure that the object is stored at the location specified by
152
+ # self.local_path().
153
+ sysrsync .run (
154
+ self .local_path (),
155
+ self .query_path ,
156
+ )
157
+
158
+ def remove (self ):
159
+ # Remove the object from the storage.
160
+ shutil .rmtree (self .query_path )
161
+
162
+ def list_candidate_matches (self ) -> Iterable [str ]:
163
+ """Return a list of candidate matches in the storage for the query."""
164
+ # This is used by glob_wildcards() to find matches for wildcards in the query.
165
+ # The method has to return concretized queries without any remaining wildcards.
166
+ prefix = Path (get_constant_prefix (self .query ))
167
+ if prefix .is_dir ():
168
+ return map (str , prefix .rglob ("*" ))
169
+ else :
170
+ return prefix ,
171
+
172
+ def _stat (self , follow_symlinks : bool = True ):
173
+ # We don't want the cached variant (Path.stat), as we cache ourselves in
174
+ # inventory and afterwards the information may change.
175
+ return os .stat (self .query_path , follow_symlinks = follow_symlinks )
0 commit comments