Skip to content

Commit 11d488d

Browse files
maxrjonesd-v-bdcherianmartindurantdstansby
authored
Support async FSMap objects in zarr.open (#2774)
* WIP: Support fsspec mutable mapping objects in zarr.open * Simplify library availability checking * Improve test coverage * Improve error messages * Consolidate code * Make test more readable * Make async instances from sync fsmap objects * Move test to fsspec store * Re-add type ignore * "Update docstring" * Add another test * Require auto_mkdir for LocalFileSystem * Update test location * Convert older filesystems to async * Use if on fsspec versions rather than try; else * Always use asynchronous=True in _make_async * Improve tests * Apply suggestions from code review Co-authored-by: Martin Durant <[email protected]> * Apply more code suggestions * Fix typing error * Test remote stores in min_deps env * Remove redundant import * Test warning * Lint * Add pytest pin * Add release note * Generate coverage on min_deps and upstream jobs * Update src/zarr/storage/_fsspec.py Co-authored-by: Davis Bennett <[email protected]> * More useful error messages * Add TypeAlias * Fix typing for no fsspec installation * Move imports * Don't mutate FSMap object --------- Co-authored-by: Davis Bennett <[email protected]> Co-authored-by: Deepak Cherian <[email protected]> Co-authored-by: Martin Durant <[email protected]> Co-authored-by: David Stansby <[email protected]>
1 parent c972f7f commit 11d488d

File tree

6 files changed

+216
-23
lines changed

6 files changed

+216
-23
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -104,7 +104,7 @@ jobs:
104104
hatch env run -e ${{ matrix.dependency-set }} list-env
105105
- name: Run Tests
106106
run: |
107-
hatch env run --env ${{ matrix.dependency-set }} run
107+
hatch env run --env ${{ matrix.dependency-set }} run-coverage
108108
- name: Upload coverage
109109
uses: codecov/codecov-action@v5
110110
with:

changes/2774.feature.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Add `zarr.storage.FsspecStore.from_mapper()` so that `zarr.open()` supports stores of type `fsspec.mapping.FSMap`.

pyproject.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,7 @@ dependencies = [
253253
'obstore==0.5.*',
254254
# test deps
255255
'zarr[test]',
256+
'zarr[remote_tests]',
256257
]
257258

258259
[tool.hatch.envs.min_deps.scripts]

src/zarr/storage/_common.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,9 @@
11
from __future__ import annotations
22

3+
import importlib.util
34
import json
45
from pathlib import Path
5-
from typing import TYPE_CHECKING, Any, Literal, Self
6+
from typing import TYPE_CHECKING, Any, Literal, Self, TypeAlias
67

78
from zarr.abc.store import ByteRequest, Store
89
from zarr.core.buffer import Buffer, default_buffer_prototype
@@ -12,6 +13,12 @@
1213
from zarr.storage._memory import MemoryStore
1314
from zarr.storage._utils import normalize_path
1415

16+
_has_fsspec = importlib.util.find_spec("fsspec")
17+
if _has_fsspec:
18+
from fsspec.mapping import FSMap
19+
else:
20+
FSMap = None
21+
1522
if TYPE_CHECKING:
1623
from zarr.core.buffer import BufferPrototype
1724

@@ -227,7 +234,7 @@ def __eq__(self, other: object) -> bool:
227234
return False
228235

229236

230-
StoreLike = Store | StorePath | Path | str | dict[str, Buffer]
237+
StoreLike: TypeAlias = Store | StorePath | FSMap | Path | str | dict[str, Buffer]
231238

232239

233240
async def make_store_path(
@@ -314,9 +321,18 @@ async def make_store_path(
314321
# We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings.
315322
# By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate.
316323
store = await MemoryStore.open(store_dict=store_like, read_only=_read_only)
324+
elif _has_fsspec and isinstance(store_like, FSMap):
325+
if path:
326+
raise ValueError(
327+
"'path' was provided but is not used for FSMap store_like objects. Specify the path when creating the FSMap instance instead."
328+
)
329+
if storage_options:
330+
raise ValueError(
331+
"'storage_options was provided but is not used for FSMap store_like objects. Specify the storage options when creating the FSMap instance instead."
332+
)
333+
store = FsspecStore.from_mapper(store_like, read_only=_read_only)
317334
else:
318-
msg = f"Unsupported type for store_like: '{type(store_like).__name__}'" # type: ignore[unreachable]
319-
raise TypeError(msg)
335+
raise TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'")
320336

321337
result = await StorePath.open(store, path=path_normalized, mode=mode)
322338

src/zarr/storage/_fsspec.py

Lines changed: 74 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,12 @@
11
from __future__ import annotations
22

3+
import json
34
import warnings
45
from contextlib import suppress
56
from typing import TYPE_CHECKING, Any
67

8+
from packaging.version import parse as parse_version
9+
710
from zarr.abc.store import (
811
ByteRequest,
912
OffsetByteRequest,
@@ -17,7 +20,9 @@
1720
if TYPE_CHECKING:
1821
from collections.abc import AsyncIterator, Iterable
1922

23+
from fsspec import AbstractFileSystem
2024
from fsspec.asyn import AsyncFileSystem
25+
from fsspec.mapping import FSMap
2126

2227
from zarr.core.buffer import BufferPrototype
2328
from zarr.core.common import BytesLike
@@ -30,6 +35,42 @@
3035
)
3136

3237

38+
def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem:
39+
"""Convert a sync FSSpec filesystem to an async FFSpec filesystem
40+
41+
If the filesystem class supports async operations, a new async instance is created
42+
from the existing instance.
43+
44+
If the filesystem class does not support async operations, the existing instance
45+
is wrapped with AsyncFileSystemWrapper.
46+
"""
47+
import fsspec
48+
49+
fsspec_version = parse_version(fsspec.__version__)
50+
if fs.async_impl and fs.asynchronous:
51+
# Already an async instance of an async filesystem, nothing to do
52+
return fs
53+
if fs.async_impl:
54+
# Convert sync instance of an async fs to an async instance
55+
fs_dict = json.loads(fs.to_json())
56+
fs_dict["asynchronous"] = True
57+
return fsspec.AbstractFileSystem.from_json(json.dumps(fs_dict))
58+
59+
# Wrap sync filesystems with the async wrapper
60+
if type(fs) is fsspec.implementations.local.LocalFileSystem and not fs.auto_mkdir:
61+
raise ValueError(
62+
f"LocalFilesystem {fs} was created with auto_mkdir=False but Zarr requires the filesystem to automatically create directories"
63+
)
64+
if fsspec_version < parse_version("2024.12.0"):
65+
raise ImportError(
66+
f"The filesystem '{fs}' is synchronous, and the required "
67+
"AsyncFileSystemWrapper is not available. Upgrade fsspec to version "
68+
"2024.12.0 or later to enable this functionality."
69+
)
70+
71+
return fsspec.implementations.asyn_wrapper.AsyncFileSystemWrapper(fs, asynchronous=True)
72+
73+
3374
class FsspecStore(Store):
3475
"""
3576
Store for remote data based on FSSpec.
@@ -137,6 +178,38 @@ def from_upath(
137178
allowed_exceptions=allowed_exceptions,
138179
)
139180

181+
@classmethod
182+
def from_mapper(
183+
cls,
184+
fs_map: FSMap,
185+
read_only: bool = False,
186+
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
187+
) -> FsspecStore:
188+
"""
189+
Create a FsspecStore from a FSMap object.
190+
191+
Parameters
192+
----------
193+
fs_map : FSMap
194+
Fsspec mutable mapping object.
195+
read_only : bool
196+
Whether the store is read-only, defaults to False.
197+
allowed_exceptions : tuple, optional
198+
The exceptions that are allowed to be raised when accessing the
199+
store. Defaults to ALLOWED_EXCEPTIONS.
200+
201+
Returns
202+
-------
203+
FsspecStore
204+
"""
205+
fs = _make_async(fs_map.fs)
206+
return cls(
207+
fs=fs,
208+
path=fs_map.root,
209+
read_only=read_only,
210+
allowed_exceptions=allowed_exceptions,
211+
)
212+
140213
@classmethod
141214
def from_url(
142215
cls,
@@ -175,16 +248,7 @@ def from_url(
175248

176249
fs, path = url_to_fs(url, **opts)
177250
if not fs.async_impl:
178-
try:
179-
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
180-
181-
fs = AsyncFileSystemWrapper(fs, asynchronous=True)
182-
except ImportError as e:
183-
raise ImportError(
184-
f"The filesystem for URL '{url}' is synchronous, and the required "
185-
"AsyncFileSystemWrapper is not available. Upgrade fsspec to version "
186-
"2024.12.0 or later to enable this functionality."
187-
) from e
251+
fs = _make_async(fs)
188252

189253
# fsspec is not consistent about removing the scheme from the path, so check and strip it here
190254
# https://github.com/fsspec/filesystem_spec/issues/1722

tests/test_store/test_fsspec.py

Lines changed: 119 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -5,17 +5,21 @@
55
import re
66
from typing import TYPE_CHECKING, Any
77

8+
import numpy as np
89
import pytest
910
from packaging.version import parse as parse_version
1011

1112
import zarr.api.asynchronous
13+
from zarr import Array
1214
from zarr.abc.store import OffsetByteRequest
1315
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
1416
from zarr.core.sync import _collect_aiterator, sync
1517
from zarr.storage import FsspecStore
18+
from zarr.storage._fsspec import _make_async
1619
from zarr.testing.store import StoreTests
1720

1821
if TYPE_CHECKING:
22+
import pathlib
1923
from collections.abc import Generator
2024
from pathlib import Path
2125

@@ -191,7 +195,11 @@ async def test_fsspec_store_from_uri(self, store: FsspecStore) -> None:
191195
)
192196
assert dict(group.attrs) == {"key": "value"}
193197

194-
meta["attributes"]["key"] = "value-2" # type: ignore[index]
198+
meta = {
199+
"attributes": {"key": "value-2"},
200+
"zarr_format": 3,
201+
"node_type": "group",
202+
}
195203
await store.set(
196204
"directory-2/zarr.json",
197205
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
@@ -201,7 +209,11 @@ async def test_fsspec_store_from_uri(self, store: FsspecStore) -> None:
201209
)
202210
assert dict(group.attrs) == {"key": "value-2"}
203211

204-
meta["attributes"]["key"] = "value-3" # type: ignore[index]
212+
meta = {
213+
"attributes": {"key": "value-3"},
214+
"zarr_format": 3,
215+
"node_type": "group",
216+
}
205217
await store.set(
206218
"directory-3/zarr.json",
207219
self.buffer_cls.from_bytes(json.dumps(meta).encode()),
@@ -264,32 +276,131 @@ async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None:
264276
await store.delete_dir("test_prefix")
265277

266278

279+
def array_roundtrip(store: FsspecStore) -> None:
280+
"""
281+
Round trip an array using a Zarr store
282+
283+
Args:
284+
store: FsspecStore
285+
"""
286+
data = np.ones((3, 3))
287+
arr = zarr.create_array(store=store, overwrite=True, data=data)
288+
assert isinstance(arr, Array)
289+
# Read set values
290+
arr2 = zarr.open_array(store=store)
291+
assert isinstance(arr2, Array)
292+
np.testing.assert_array_equal(arr[:], data)
293+
294+
267295
@pytest.mark.skipif(
268296
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
269297
reason="No AsyncFileSystemWrapper",
270298
)
271-
def test_wrap_sync_filesystem() -> None:
299+
def test_wrap_sync_filesystem(tmp_path: pathlib.Path) -> None:
272300
"""The local fs is not async so we should expect it to be wrapped automatically"""
273301
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
274302

275-
store = FsspecStore.from_url("local://test/path")
276-
303+
store = FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": True})
277304
assert isinstance(store.fs, AsyncFileSystemWrapper)
278305
assert store.fs.async_impl
306+
array_roundtrip(store)
307+
308+
309+
@pytest.mark.skipif(
310+
parse_version(fsspec.__version__) >= parse_version("2024.12.0"),
311+
reason="No AsyncFileSystemWrapper",
312+
)
313+
def test_wrap_sync_filesystem_raises(tmp_path: pathlib.Path) -> None:
314+
"""The local fs is not async so we should expect it to be wrapped automatically"""
315+
with pytest.raises(ImportError, match="The filesystem .*"):
316+
FsspecStore.from_url(f"file://{tmp_path}", storage_options={"auto_mkdir": True})
279317

280318

281319
@pytest.mark.skipif(
282320
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
283321
reason="No AsyncFileSystemWrapper",
284322
)
285323
def test_no_wrap_async_filesystem() -> None:
286-
"""An async fs should not be wrapped automatically; fsspec's https filesystem is such an fs"""
324+
"""An async fs should not be wrapped automatically; fsspec's s3 filesystem is such an fs"""
287325
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper
288326

289-
store = FsspecStore.from_url("https://test/path")
290-
327+
store = FsspecStore.from_url(
328+
f"s3://{test_bucket_name}/foo/spam/",
329+
storage_options={"endpoint_url": endpoint_url, "anon": False, "asynchronous": True},
330+
read_only=False,
331+
)
291332
assert not isinstance(store.fs, AsyncFileSystemWrapper)
292333
assert store.fs.async_impl
334+
array_roundtrip(store)
335+
336+
337+
@pytest.mark.skipif(
338+
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
339+
reason="No AsyncFileSystemWrapper",
340+
)
341+
def test_open_fsmap_file(tmp_path: pathlib.Path) -> None:
342+
min_fsspec_with_async_wrapper = parse_version("2024.12.0")
343+
current_version = parse_version(fsspec.__version__)
344+
345+
fs = fsspec.filesystem("file", auto_mkdir=True)
346+
mapper = fs.get_mapper(tmp_path)
347+
348+
if current_version < min_fsspec_with_async_wrapper:
349+
# Expect ImportError for older versions
350+
with pytest.raises(
351+
ImportError,
352+
match=r"The filesystem .* is synchronous, and the required AsyncFileSystemWrapper is not available.*",
353+
):
354+
array_roundtrip(mapper)
355+
else:
356+
# Newer versions should work
357+
array_roundtrip(mapper)
358+
359+
360+
@pytest.mark.skipif(
361+
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
362+
reason="No AsyncFileSystemWrapper",
363+
)
364+
def test_open_fsmap_file_raises(tmp_path: pathlib.Path) -> None:
365+
fsspec = pytest.importorskip("fsspec.implementations.local")
366+
fs = fsspec.LocalFileSystem(auto_mkdir=False)
367+
mapper = fs.get_mapper(tmp_path)
368+
with pytest.raises(ValueError, match="LocalFilesystem .*"):
369+
array_roundtrip(mapper)
370+
371+
372+
@pytest.mark.parametrize("asynchronous", [True, False])
373+
def test_open_fsmap_s3(asynchronous: bool) -> None:
374+
s3_filesystem = s3fs.S3FileSystem(
375+
asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False
376+
)
377+
mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/")
378+
array_roundtrip(mapper)
379+
380+
381+
def test_open_s3map_raises() -> None:
382+
with pytest.raises(TypeError, match="Unsupported type for store_like:.*"):
383+
zarr.open(store=0, mode="w", shape=(3, 3))
384+
s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False)
385+
mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/")
386+
with pytest.raises(
387+
ValueError, match="'path' was provided but is not used for FSMap store_like objects"
388+
):
389+
zarr.open(store=mapper, path="bar", mode="w", shape=(3, 3))
390+
with pytest.raises(
391+
ValueError,
392+
match="'storage_options was provided but is not used for FSMap store_like objects",
393+
):
394+
zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3))
395+
396+
397+
@pytest.mark.parametrize("asynchronous", [True, False])
398+
def test_make_async(asynchronous: bool) -> None:
399+
s3_filesystem = s3fs.S3FileSystem(
400+
asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False
401+
)
402+
fs = _make_async(s3_filesystem)
403+
assert fs.asynchronous
293404

294405

295406
@pytest.mark.skipif(

0 commit comments

Comments
 (0)