Skip to content

Commit 73d4ee6

Browse files
committed
feat: Add support for NDU storages
1 parent e747720 commit 73d4ee6

19 files changed

+2018
-66
lines changed

src/crawlee/_types.py

Lines changed: 18 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,7 @@ class PushDataFunctionCall(PushDataKwargs):
189189
data: list[dict[str, Any]] | dict[str, Any]
190190
dataset_id: str | None
191191
dataset_name: str | None
192+
dataset_alias: str | None
192193

193194

194195
class KeyValueStoreInterface(Protocol):
@@ -255,7 +256,7 @@ def __init__(self, *, key_value_store_getter: GetKeyValueStoreFunction) -> None:
255256
self._key_value_store_getter = key_value_store_getter
256257
self.add_requests_calls = list[AddRequestsKwargs]()
257258
self.push_data_calls = list[PushDataFunctionCall]()
258-
self.key_value_store_changes = dict[tuple[str | None, str | None], KeyValueStoreChangeRecords]()
259+
self.key_value_store_changes = dict[tuple[str | None, str | None, str | None], KeyValueStoreChangeRecords]()
259260

260261
async def add_requests(
261262
self,
@@ -270,6 +271,7 @@ async def push_data(
270271
data: list[dict[str, Any]] | dict[str, Any],
271272
dataset_id: str | None = None,
272273
dataset_name: str | None = None,
274+
dataset_alias: str | None = None,
273275
**kwargs: Unpack[PushDataKwargs],
274276
) -> None:
275277
"""Track a call to the `push_data` context helper."""
@@ -278,6 +280,7 @@ async def push_data(
278280
data=data,
279281
dataset_id=dataset_id,
280282
dataset_name=dataset_name,
283+
dataset_alias=dataset_alias,
281284
**kwargs,
282285
)
283286
)
@@ -287,13 +290,14 @@ async def get_key_value_store(
287290
*,
288291
id: str | None = None,
289292
name: str | None = None,
293+
alias: str | None = None,
290294
) -> KeyValueStoreInterface:
291-
if (id, name) not in self.key_value_store_changes:
292-
self.key_value_store_changes[id, name] = KeyValueStoreChangeRecords(
293-
await self._key_value_store_getter(id=id, name=name)
295+
if (id, name, alias) not in self.key_value_store_changes:
296+
self.key_value_store_changes[id, name, alias] = KeyValueStoreChangeRecords(
297+
await self._key_value_store_getter(id=id, name=name, alias=alias)
294298
)
295299

296-
return self.key_value_store_changes[id, name]
300+
return self.key_value_store_changes[id, name, alias]
297301

298302

299303
@docs_group('Functions')
@@ -424,12 +428,14 @@ def __call__(
424428
*,
425429
id: str | None = None,
426430
name: str | None = None,
431+
alias: str | None = None,
427432
) -> Coroutine[None, None, KeyValueStore]:
428433
"""Call dunder method.
429434
430435
Args:
431436
id: The ID of the `KeyValueStore` to get.
432-
name: The name of the `KeyValueStore` to get.
437+
name: The name of the `KeyValueStore` to get (global scope).
438+
alias: The alias of the `KeyValueStore` to get (run scope, unnamed).
433439
"""
434440

435441

@@ -444,12 +450,14 @@ def __call__(
444450
*,
445451
id: str | None = None,
446452
name: str | None = None,
453+
alias: str | None = None,
447454
) -> Coroutine[None, None, KeyValueStoreInterface]:
448455
"""Call dunder method.
449456
450457
Args:
451458
id: The ID of the `KeyValueStore` to get.
452-
name: The name of the `KeyValueStore` to get.
459+
name: The name of the `KeyValueStore` to get (global scope).
460+
alias: The alias of the `KeyValueStore` to get (run scope, unnamed).
453461
"""
454462

455463

@@ -466,14 +474,16 @@ def __call__(
466474
data: list[dict[str, Any]] | dict[str, Any],
467475
dataset_id: str | None = None,
468476
dataset_name: str | None = None,
477+
dataset_alias: str | None = None,
469478
**kwargs: Unpack[PushDataKwargs],
470479
) -> Coroutine[None, None, None]:
471480
"""Call dunder method.
472481
473482
Args:
474483
data: The data to push to the `Dataset`.
475484
dataset_id: The ID of the `Dataset` to push the data to.
476-
dataset_name: The name of the `Dataset` to push the data to.
485+
dataset_name: The name of the `Dataset` to push the data to (global scope).
486+
dataset_alias: The alias of the `Dataset` to push the data to (run scope, unnamed).
477487
**kwargs: Additional keyword arguments.
478488
"""
479489

src/crawlee/crawlers/_basic/_basic_crawler.py

Lines changed: 18 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -557,18 +557,20 @@ async def get_dataset(
557557
*,
558558
id: str | None = None,
559559
name: str | None = None,
560+
alias: str | None = None,
560561
) -> Dataset:
561562
"""Return the `Dataset` with the given ID or name. If none is provided, return the default one."""
562-
return await Dataset.open(id=id, name=name)
563+
return await Dataset.open(id=id, name=name, alias=alias)
563564

564565
async def get_key_value_store(
565566
self,
566567
*,
567568
id: str | None = None,
568569
name: str | None = None,
570+
alias: str | None = None,
569571
) -> KeyValueStore:
570572
"""Return the `KeyValueStore` with the given ID or name. If none is provided, return the default KVS."""
571-
return await KeyValueStore.open(id=id, name=name)
573+
return await KeyValueStore.open(id=id, name=name, alias=alias)
572574

573575
def error_handler(
574576
self, handler: ErrorHandler[TCrawlingContext | BasicCrawlingContext]
@@ -772,6 +774,7 @@ async def get_data(
772774
self,
773775
dataset_id: str | None = None,
774776
dataset_name: str | None = None,
777+
dataset_alias: str | None = None,
775778
**kwargs: Unpack[GetDataKwargs],
776779
) -> DatasetItemsListPage:
777780
"""Retrieve data from a `Dataset`.
@@ -781,20 +784,22 @@ async def get_data(
781784
782785
Args:
783786
dataset_id: The ID of the `Dataset`.
784-
dataset_name: The name of the `Dataset`.
787+
dataset_name: The name of the `Dataset` (global scope).
788+
dataset_alias: The alias of the `Dataset` (run scope, unnamed).
785789
kwargs: Keyword arguments to be passed to the `Dataset.get_data()` method.
786790
787791
Returns:
788792
The retrieved data.
789793
"""
790-
dataset = await Dataset.open(id=dataset_id, name=dataset_name)
794+
dataset = await Dataset.open(id=dataset_id, name=dataset_name, alias=dataset_alias)
791795
return await dataset.get_data(**kwargs)
792796

793797
async def export_data(
794798
self,
795799
path: str | Path,
796800
dataset_id: str | None = None,
797801
dataset_name: str | None = None,
802+
dataset_alias: str | None = None,
798803
) -> None:
799804
"""Export all items from a Dataset to a JSON or CSV file.
800805
@@ -805,9 +810,10 @@ async def export_data(
805810
Args:
806811
path: The destination file path. Must end with '.json' or '.csv'.
807812
dataset_id: The ID of the Dataset to export from. If None, uses `name` parameter instead.
808-
dataset_name: The name of the Dataset to export from. If None, uses `id` parameter instead.
813+
dataset_name: The name of the Dataset to export from (global scope). If None, uses `id` parameter instead.
814+
dataset_alias: The alias of the Dataset to export from (run scope, unnamed).
809815
"""
810-
dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
816+
dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias)
811817

812818
path = path if isinstance(path, Path) else Path(path)
813819
dst = path.open('w', newline='')
@@ -824,6 +830,7 @@ async def _push_data(
824830
data: list[dict[str, Any]] | dict[str, Any],
825831
dataset_id: str | None = None,
826832
dataset_name: str | None = None,
833+
dataset_alias: str | None = None,
827834
**kwargs: Unpack[PushDataKwargs],
828835
) -> None:
829836
"""Push data to a `Dataset`.
@@ -834,10 +841,11 @@ async def _push_data(
834841
Args:
835842
data: The data to push to the `Dataset`.
836843
dataset_id: The ID of the `Dataset`.
837-
dataset_name: The name of the `Dataset`.
844+
dataset_name: The name of the `Dataset` (global scope).
845+
dataset_alias: The alias of the `Dataset` (run scope, unnamed).
838846
kwargs: Keyword arguments to be passed to the `Dataset.push_data()` method.
839847
"""
840-
dataset = await self.get_dataset(id=dataset_id, name=dataset_name)
848+
dataset = await self.get_dataset(id=dataset_id, name=dataset_name, alias=dataset_alias)
841849
await dataset.push_data(data, **kwargs)
842850

843851
def _should_retry_request(self, context: BasicCrawlingContext, error: Exception) -> bool:
@@ -1226,8 +1234,8 @@ async def _commit_key_value_store_changes(
12261234
result: RequestHandlerRunResult, get_kvs: GetKeyValueStoreFromRequestHandlerFunction
12271235
) -> None:
12281236
"""Store key value store changes recorded in result."""
1229-
for (id, name), changes in result.key_value_store_changes.items():
1230-
store = await get_kvs(id=id, name=name)
1237+
for (id, name, alias), changes in result.key_value_store_changes.items():
1238+
store = await get_kvs(id=id, name=name, alias=alias)
12311239
for key, value in changes.updates.items():
12321240
await store.set_value(key, value.content, value.content_type)
12331241

src/crawlee/storage_clients/_base/_storage_client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -34,6 +34,7 @@ async def create_dataset_client(
3434
*,
3535
id: str | None = None,
3636
name: str | None = None,
37+
alias: str | None = None,
3738
configuration: Configuration | None = None,
3839
) -> DatasetClient:
3940
"""Create a dataset client."""
@@ -44,6 +45,7 @@ async def create_kvs_client(
4445
*,
4546
id: str | None = None,
4647
name: str | None = None,
48+
alias: str | None = None,
4749
configuration: Configuration | None = None,
4850
) -> KeyValueStoreClient:
4951
"""Create a key-value store client."""
@@ -54,6 +56,7 @@ async def create_rq_client(
5456
*,
5557
id: str | None = None,
5658
name: str | None = None,
59+
alias: str | None = None,
5760
configuration: Configuration | None = None,
5861
) -> RequestQueueClient:
5962
"""Create a request queue client."""

src/crawlee/storage_clients/_file_system/_dataset_client.py

Lines changed: 43 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ def __init__(
5858
metadata: DatasetMetadata,
5959
storage_dir: Path,
6060
lock: asyncio.Lock,
61+
directory_name: str | None = None,
6162
) -> None:
6263
"""Initialize a new instance.
6364
@@ -68,6 +69,9 @@ def __init__(
6869
self._storage_dir = storage_dir
6970
"""The base directory where the storage data are being persisted."""
7071

72+
self._directory_name = directory_name
73+
"""The directory name to use for this dataset. If None, uses metadata.name or default."""
74+
7175
self._lock = lock
7276
"""A lock to ensure that only one operation is performed at a time."""
7377

@@ -78,6 +82,10 @@ async def get_metadata(self) -> DatasetMetadata:
7882
@property
7983
def path_to_dataset(self) -> Path:
8084
"""The full path to the dataset directory."""
85+
# Use the explicit directory name if provided, otherwise fall back to metadata.name or default
86+
if self._directory_name is not None:
87+
return self._storage_dir / self._STORAGE_SUBDIR / self._directory_name
88+
8189
if self._metadata.name is None:
8290
return self._storage_dir / self._STORAGE_SUBDIR / self._STORAGE_SUBSUBDIR_DEFAULT
8391

@@ -94,6 +102,7 @@ async def open(
94102
*,
95103
id: str | None,
96104
name: str | None,
105+
alias: str | None = None,
97106
configuration: Configuration,
98107
) -> FileSystemDatasetClient:
99108
"""Open or create a file system dataset client.
@@ -104,15 +113,21 @@ async def open(
104113
105114
Args:
106115
id: The ID of the dataset to open. If provided, searches for existing dataset by ID.
107-
name: The name of the dataset to open. If not provided, uses the default dataset.
116+
name: The name of the dataset for named storages. Mutually exclusive with alias.
117+
alias: The alias of the dataset for unnamed storages. Mutually exclusive with name.
108118
configuration: The configuration object containing storage directory settings.
109119
110120
Returns:
111121
An instance for the opened or created storage client.
112122
113123
Raises:
114-
ValueError: If a dataset with the specified ID is not found, or if metadata is invalid.
124+
ValueError: If a dataset with the specified ID is not found, if metadata is invalid,
125+
or if both name and alias are provided.
115126
"""
127+
# Validate parameters - exactly one of name or alias should be provided (or neither for default)
128+
if name is not None and alias is not None:
129+
raise ValueError('Cannot specify both name and alias parameters')
130+
116131
storage_dir = Path(configuration.storage_dir)
117132
dataset_base_path = storage_dir / cls._STORAGE_SUBDIR
118133

@@ -140,6 +155,7 @@ async def open(
140155
metadata=metadata,
141156
storage_dir=storage_dir,
142157
lock=asyncio.Lock(),
158+
directory_name=dataset_dir.name, # Use the actual directory name
143159
)
144160
await client._update_metadata(update_accessed_at=True)
145161
found = True
@@ -152,10 +168,27 @@ async def open(
152168
if not found:
153169
raise ValueError(f'Dataset with ID "{id}" not found')
154170

155-
# Get a new instance by name.
171+
# Get a new instance by name or alias.
156172
else:
173+
# Determine the directory name and metadata name based on whether this is a named or alias storage
174+
if alias is not None:
175+
# For alias storages, use the alias as directory name and set metadata.name to None
176+
# Special case: alias='default' should use the same directory as default storage
177+
directory_name = None if alias == 'default' else alias
178+
actual_name = None
179+
elif name is not None:
180+
# For named storages, use the name as both directory name and metadata.name
181+
directory_name = name
182+
actual_name = name
183+
else:
184+
# For default storage (no name or alias), use None for both - same as alias='default'
185+
directory_name = None
186+
actual_name = None
187+
157188
dataset_path = (
158-
dataset_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT if name is None else dataset_base_path / name
189+
dataset_base_path / cls._STORAGE_SUBSUBDIR_DEFAULT
190+
if directory_name is None
191+
else dataset_base_path / directory_name
159192
)
160193
metadata_path = dataset_path / METADATA_FILENAME
161194

@@ -168,13 +201,17 @@ async def open(
168201
await asyncio.to_thread(file.close)
169202
try:
170203
metadata = DatasetMetadata(**file_content)
204+
# For aliases, ensure the metadata.name is None
205+
if alias is not None:
206+
metadata = metadata.model_copy(update={'name': None})
171207
except ValidationError as exc:
172208
raise ValueError(f'Invalid metadata file for dataset "{name}"') from exc
173209

174210
client = cls(
175211
metadata=metadata,
176212
storage_dir=storage_dir,
177213
lock=asyncio.Lock(),
214+
directory_name=directory_name,
178215
)
179216

180217
await client._update_metadata(update_accessed_at=True)
@@ -184,7 +221,7 @@ async def open(
184221
now = datetime.now(timezone.utc)
185222
metadata = DatasetMetadata(
186223
id=crypto_random_object_id(),
187-
name=name,
224+
name=actual_name, # Use actual_name which will be None for aliases
188225
created_at=now,
189226
accessed_at=now,
190227
modified_at=now,
@@ -194,6 +231,7 @@ async def open(
194231
metadata=metadata,
195232
storage_dir=storage_dir,
196233
lock=asyncio.Lock(),
234+
directory_name=directory_name,
197235
)
198236
await client._update_metadata()
199237

0 commit comments

Comments
 (0)