@@ -148,9 +148,10 @@ def _read_updated_at(
148
148
index_json_content = None
149
149
assert isinstance (input_dir , Dir )
150
150
151
+ # Try to read index.json locally
151
152
if input_dir .path is not None and os .path .exists (os .path .join (input_dir .path , _INDEX_FILENAME )):
152
- # read index.json file and read last_updation_timestamp
153
153
index_json_content = load_index_file (input_dir .path )
154
+ # Try to read index.json remotely
154
155
elif input_dir .url is not None :
155
156
assert input_dir .url is not None
156
157
# download index.json file and read last_updation_timestamp
@@ -170,11 +171,14 @@ def _read_updated_at(
170
171
171
172
172
173
def _clear_cache_dir_if_updated (input_dir_hash_filepath : str , updated_at_hash : str ) -> None :
173
- """Clear cache dir if it is updated .
174
+ """Clear the cache directory if it is outdated .
174
175
175
- If last_updated has changed and /cache/chunks/{HASH(input_dir.url)} isn't empty, we remove all the files and then
176
- create the cache.
176
+ If the directory at `input_dir_hash_filepath` exists and does not contain only a single subdirectory named
177
+ `updated_at_hash`, the entire directory is deleted to prevent using stale or partial cache data .
177
178
179
+ Args:
180
+ input_dir_hash_filepath (str): Path to the hashed cache directory (e.g., /cache/chunks/{HASH(input_dir.url)}).
181
+ updated_at_hash (str): The expected hash or timestamp for the current dataset state.
178
182
"""
179
183
if os .path .exists (input_dir_hash_filepath ):
180
184
# check if it only contains one directory with updated_at_hash
@@ -189,24 +193,24 @@ def _try_create_cache_dir(
189
193
storage_options : Optional [Dict ] = {},
190
194
index_path : Optional [str ] = None ,
191
195
) -> Optional [str ]:
196
+ """Prepare and return the cache directory for a dataset."""
192
197
resolved_input_dir = _resolve_dir (input_dir )
193
198
updated_at = _read_updated_at (resolved_input_dir , storage_options , index_path )
194
199
200
+ # Fallback to a hash of the input_dir if updated_at is "0"
195
201
if updated_at == "0" and input_dir is not None :
196
202
updated_at = hashlib .md5 (input_dir .encode ()).hexdigest () # noqa: S324
197
203
198
204
dir_url_hash = hashlib .md5 ((resolved_input_dir .url or "" ).encode ()).hexdigest () # noqa: S324
199
205
200
- if "LIGHTNING_CLUSTER_ID" not in os .environ or "LIGHTNING_CLOUD_PROJECT_ID" not in os .environ :
201
- input_dir_hash_filepath = os .path .join (cache_dir or _DEFAULT_CACHE_DIR , dir_url_hash )
202
- _clear_cache_dir_if_updated (input_dir_hash_filepath , updated_at )
203
- cache_dir = os .path .join (input_dir_hash_filepath , updated_at )
204
- os .makedirs (cache_dir , exist_ok = True )
205
- return cache_dir
206
+ # Determine cache root based on environment
207
+ is_lightning_cloud = "LIGHTNING_CLUSTER_ID" in os .environ and "LIGHTNING_CLOUD_PROJECT_ID" in os .environ
208
+ default_cache_root = _DEFAULT_LIGHTNING_CACHE_DIR if is_lightning_cloud else _DEFAULT_CACHE_DIR
209
+ cache_root = cache_dir or default_cache_root
206
210
207
- input_dir_hash_filepath = os .path .join (cache_dir or _DEFAULT_LIGHTNING_CACHE_DIR , dir_url_hash )
208
- _clear_cache_dir_if_updated (input_dir_hash_filepath , updated_at )
209
- cache_dir = os .path .join (input_dir_hash_filepath , updated_at )
211
+ input_dir_hash_path = os .path .join (cache_root , dir_url_hash )
212
+ _clear_cache_dir_if_updated (input_dir_hash_path , updated_at )
213
+ cache_dir = os .path .join (input_dir_hash_path , updated_at )
210
214
os .makedirs (cache_dir , exist_ok = True )
211
215
return cache_dir
212
216
@@ -305,7 +309,7 @@ def copy_index_to_cache_index_filepath(index_path: str, cache_index_filepath: st
305
309
"""Copy Index file from index_path to cache_index_filepath."""
306
310
# If index_path is a directory, append "index.json"
307
311
if os .path .isdir (index_path ):
308
- index_path = os .path .join (index_path , "index.json" )
312
+ index_path = os .path .join (index_path , _INDEX_FILENAME )
309
313
# Check if the file exists before copying
310
314
if not os .path .isfile (index_path ):
311
315
raise FileNotFoundError (f"Index file not found: { index_path } " )
0 commit comments