Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support async FSMap objects in zarr.open #2774

Open
wants to merge 23 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 18 additions & 3 deletions src/zarr/storage/_common.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
from __future__ import annotations

import importlib
import importlib.util
import json
from pathlib import Path
from typing import TYPE_CHECKING, Any, Literal
Expand All @@ -12,6 +14,10 @@
from zarr.storage._memory import MemoryStore
from zarr.storage._utils import normalize_path

_has_fsspec = importlib.util.find_spec("fsspec")
if _has_fsspec:
from fsspec.mapping import FSMap

if TYPE_CHECKING:
from zarr.core.buffer import BufferPrototype

Expand Down Expand Up @@ -228,7 +234,7 @@


async def make_store_path(
store_like: StoreLike | None,
store_like: StoreLike | FSMap | None,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Doesn't this fail if fsspec isn't present, because FSMap isn't imported? Maybe you need

if _has_fsspec:
    from fsspec.mapping import FSMap
else:
    FSMap = None

or make fsspec a required dependency.

*,
path: str | None = "",
mode: AccessModeLiteral | None = None,
Expand Down Expand Up @@ -311,9 +317,18 @@
# We deliberate only consider dict[str, Buffer] here, and not arbitrary mutable mappings.
# By only allowing dictionaries, which are in-memory, we know that MemoryStore appropriate.
store = await MemoryStore.open(store_dict=store_like, read_only=_read_only)
elif _has_fsspec:
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you

Suggested change
elif _has_fsspec:
elif _has_fsspec and isinstance(store_like, FSMap):

and then you fall to the same exception in else rather than having the Type Error twice

if not isinstance(store_like, FSMap):
raise (TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'"))
if path:
raise ValueError("'path' was provided but is not used for FSMap store_like objects")
if storage_options:
raise ValueError(
"'storage_options was provided but is not used for FSMap store_like objects"
)
store = FsspecStore.from_mapper(store_like, read_only=_read_only)
else:
msg = f"Unsupported type for store_like: '{type(store_like).__name__}'" # type: ignore[unreachable]
raise TypeError(msg)
raise (TypeError(f"Unsupported type for store_like: '{type(store_like).__name__}'"))

Check warning on line 331 in src/zarr/storage/_common.py

View check run for this annotation

Codecov / codecov/patch

src/zarr/storage/_common.py#L331

Added line #L331 was not covered by tests
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The linter requires the redundant outer parentheses?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Apparently this is not covered; switching the condition as I suggested above would make it so, or making a test explicitly with a bad type of store.

Copy link
Member Author

@maxrjones maxrjones Feb 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll check on the linter issue, as for the bad coverage I think we still need this in case fsspec isn't installed. #2872 should fix the code coverage gap, but it's taking longer than I expected to finish that PR because it uncovered other issues with the test matrix design (I'm working on it but might not finish today)


result = await StorePath.open(store, path=path_normalized, mode=mode)

Expand Down
85 changes: 75 additions & 10 deletions src/zarr/storage/_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,9 @@
if TYPE_CHECKING:
from collections.abc import AsyncIterator, Iterable

from fsspec import AbstractFileSystem
from fsspec.asyn import AsyncFileSystem
from fsspec.mapping import FSMap

from zarr.core.buffer import BufferPrototype
from zarr.core.common import BytesLike
Expand All @@ -30,6 +32,45 @@
)


def _make_async(fs: AbstractFileSystem) -> AsyncFileSystem:
"""Convert a sync FSSpec filesystem to an async FFSpec filesystem

If the filesystem class supports async operations, a new async instance is created
from the existing instance.

If the filesystem class does not support async operations, the existing instance
is wrapped with AsyncFileSystemWrapper.
"""
import fsspec
from packaging.version import parse as parse_version

fsspec_version = parse_version(fsspec.__version__)
if fs.async_impl and fs.asynchronous:
# Already an async instance of an async filesystem, nothing to do
return fs
if fs.async_impl:
# Convert sync instance of an async fs to an async instance
import json

fs_dict = json.loads(fs.to_json())
fs_dict["asynchronous"] = True
return fsspec.AbstractFileSystem.from_json(json.dumps(fs_dict))

# Wrap sync filesystems with the async wrapper
if type(fs) is fsspec.implementations.local.LocalFileSystem and not fs.auto_mkdir:
raise ValueError(
f"LocalFilesystem {fs} was created with auto_mkdir=False but Zarr requires the filesystem to automatically create directories"
)
if fsspec_version < parse_version("2024.12.0"):
raise ImportError(

Check warning on line 65 in src/zarr/storage/_fsspec.py

View check run for this annotation

Codecov / codecov/patch

src/zarr/storage/_fsspec.py#L65

Added line #L65 was not covered by tests
"The filesystem '{fs}' is synchronous, and the required "
"AsyncFileSystemWrapper is not available. Upgrade fsspec to version "
"2024.12.0 or later to enable this functionality."
)

return fsspec.implementations.asyn_wrapper.AsyncFileSystemWrapper(fs, asynchronous=True)


class FsspecStore(Store):
"""
A remote Store based on FSSpec
Expand Down Expand Up @@ -137,6 +178,39 @@
allowed_exceptions=allowed_exceptions,
)

@classmethod
def from_mapper(
cls,
fs_map: FSMap,
read_only: bool = False,
allowed_exceptions: tuple[type[Exception], ...] = ALLOWED_EXCEPTIONS,
) -> FsspecStore:
"""
Create a FsspecStore from a FSMap object.

Parameters
----------
fs_map : FSMap
Fsspec mutable mapping object.
read_only : bool
Whether the store is read-only, defaults to False.
allowed_exceptions : tuple, optional
The exceptions that are allowed to be raised when accessing the
store. Defaults to ALLOWED_EXCEPTIONS.

Returns
-------
FsspecStore
"""
if not fs_map.fs.async_impl or not fs_map.fs.asynchronous:
fs_map.fs = _make_async(fs_map.fs)
return cls(
fs=fs_map.fs,
path=fs_map.root,
read_only=read_only,
allowed_exceptions=allowed_exceptions,
)

@classmethod
def from_url(
cls,
Expand Down Expand Up @@ -175,16 +249,7 @@

fs, path = url_to_fs(url, **opts)
if not fs.async_impl:
try:
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper

fs = AsyncFileSystemWrapper(fs, asynchronous=True)
except ImportError as e:
raise ImportError(
f"The filesystem for URL '{url}' is synchronous, and the required "
"AsyncFileSystemWrapper is not available. Upgrade fsspec to version "
"2024.12.0 or later to enable this functionality."
) from e
fs = _make_async(fs)

# fsspec is not consistent about removing the scheme from the path, so check and strip it here
# https://github.com/fsspec/filesystem_spec/issues/1722
Expand Down
103 changes: 97 additions & 6 deletions tests/test_store/test_fsspec.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,21 @@
import os
from typing import TYPE_CHECKING

import numpy as np
import pytest
from packaging.version import parse as parse_version

import zarr.api.asynchronous
from zarr import Array
from zarr.abc.store import OffsetByteRequest
from zarr.core.buffer import Buffer, cpu, default_buffer_prototype
from zarr.core.sync import _collect_aiterator, sync
from zarr.storage import FsspecStore
from zarr.storage._fsspec import _make_async
from zarr.testing.store import StoreTests

if TYPE_CHECKING:
import pathlib
from collections.abc import Generator

import botocore.client
Expand Down Expand Up @@ -240,32 +244,119 @@ async def test_delete_dir_unsupported_deletes(self, store: FsspecStore) -> None:
await store.delete_dir("test_prefix")


def array_roundtrip(store):
"""
Round trip an array using a Zarr store

Args:
store: Store-Like object (e.g., FSMap)
"""
arr = zarr.open(store=store, mode="w", shape=(3, 3))
assert isinstance(arr, Array)
# Set values
arr[:] = 1
# Read set values
arr = zarr.open(store=store, mode="r", shape=(3, 3))
assert isinstance(arr, Array)
np.testing.assert_array_equal(np.ones((3, 3)), arr[:])


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_wrap_sync_filesystem():
def test_wrap_sync_filesystem(tmp_path):
"""The local fs is not async so we should expect it to be wrapped automatically"""
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper

store = FsspecStore.from_url("local://test/path")

store = FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True})
assert isinstance(store.fs, AsyncFileSystemWrapper)
assert store.fs.async_impl
array_roundtrip(store)


@pytest.mark.skipif(
parse_version(fsspec.__version__) >= parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_wrap_sync_filesystem_raises(tmp_path):
"""The local fs is not async so we should expect it to be wrapped automatically"""
with pytest.raises(ImportError, match="The filesystem .*"):
FsspecStore.from_url(f"local://{tmp_path}", storage_options={"auto_mkdir": True})


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_no_wrap_async_filesystem():
"""An async fs should not be wrapped automatically; fsspec's https filesystem is such an fs"""
"""An async fs should not be wrapped automatically; fsspec's s3 filesystem is such an fs"""
from fsspec.implementations.asyn_wrapper import AsyncFileSystemWrapper

store = FsspecStore.from_url("https://test/path")

store = FsspecStore.from_url(
f"s3://{test_bucket_name}/foo/spam/",
storage_options={"endpoint_url": endpoint_url, "anon": False},
)
assert not isinstance(store.fs, AsyncFileSystemWrapper)
assert store.fs.async_impl
array_roundtrip(store)


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_open_fsmap_file(tmp_path: pathlib.Path) -> None:
fsspec = pytest.importorskip("fsspec")
fs = fsspec.filesystem("file", auto_mkdir=True)
mapper = fs.get_mapper(tmp_path)
array_roundtrip(mapper)


@pytest.mark.skipif(
parse_version(fsspec.__version__) < parse_version("2024.12.0"),
reason="No AsyncFileSystemWrapper",
)
def test_open_fsmap_file_raises(tmp_path: pathlib.Path) -> None:
fsspec = pytest.importorskip("fsspec.implementations.local")
fs = fsspec.LocalFileSystem(auto_mkdir=False)
mapper = fs.get_mapper(tmp_path)
with pytest.raises(ValueError, match="LocalFilesystem .*"):
array_roundtrip(mapper)


@pytest.mark.parametrize("asynchronous", [True, False])
def test_open_fsmap_s3(asynchronous: bool) -> None:
s3_filesystem = s3fs.S3FileSystem(
asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False
)
mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/")
array_roundtrip(mapper)


def test_open_s3map_raises() -> None:
with pytest.raises(TypeError, match="Unsupported type for store_like:.*"):
zarr.open(store=0, mode="w", shape=(3, 3))
s3_filesystem = s3fs.S3FileSystem(asynchronous=True, endpoint_url=endpoint_url, anon=False)
mapper = s3_filesystem.get_mapper(f"s3://{test_bucket_name}/map/foo/")
with pytest.raises(
ValueError, match="'path' was provided but is not used for FSMap store_like objects"
):
zarr.open(store=mapper, path="bar", mode="w", shape=(3, 3))
with pytest.raises(
ValueError,
match="'storage_options was provided but is not used for FSMap store_like objects",
):
zarr.open(store=mapper, storage_options={"anon": True}, mode="w", shape=(3, 3))


@pytest.mark.parametrize("asynchronous", [True, False])
def test_make_async(asynchronous: bool) -> None:
s3_filesystem = s3fs.S3FileSystem(
asynchronous=asynchronous, endpoint_url=endpoint_url, anon=False
)
fs = _make_async(s3_filesystem)
assert fs.asynchronous


@pytest.mark.skipif(
Expand Down
Loading