Skip to content
405 changes: 389 additions & 16 deletions src/zarr/abc/codec.py

Large diffs are not rendered by default.

44 changes: 43 additions & 1 deletion src/zarr/abc/store.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,17 @@

from zarr.core.buffer import Buffer, BufferPrototype

__all__ = ["ByteGetter", "ByteSetter", "Store", "set_or_delete"]
__all__ = [
"ByteGetter",
"ByteSetter",
"Store",
"SupportsDeleteSync",
"SupportsGetSync",
"SupportsSetRangeSync",
"SupportsSetSync",
"SupportsSyncStore",
"set_or_delete",
]


@dataclass
Expand Down Expand Up @@ -700,6 +710,38 @@ async def delete(self) -> None: ...
async def set_if_not_exists(self, default: Buffer) -> None: ...


@runtime_checkable
class SupportsGetSync(Protocol):
def get_sync(
self,
key: str,
*,
prototype: BufferPrototype | None = None,
byte_range: ByteRequest | None = None,
) -> Buffer | None: ...


@runtime_checkable
class SupportsSetSync(Protocol):
def set_sync(self, key: str, value: Buffer) -> None: ...


@runtime_checkable
class SupportsSetRangeSync(Protocol):
def set_range_sync(self, key: str, value: Buffer, start: int) -> None: ...


@runtime_checkable
class SupportsDeleteSync(Protocol):
def delete_sync(self, key: str) -> None: ...


@runtime_checkable
class SupportsSyncStore(
SupportsGetSync, SupportsSetSync, SupportsSetRangeSync, SupportsDeleteSync, Protocol
): ...


async def set_or_delete(byte_setter: ByteSetter, value: Buffer | None) -> None:
"""Set or delete a value in a byte setter

Expand Down
27 changes: 17 additions & 10 deletions src/zarr/codecs/blosc.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,28 +299,35 @@ def _blosc_codec(self) -> Blosc:
config_dict["typesize"] = self.typesize
return Blosc.from_config(config_dict)

def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return as_numpy_array_wrapper(self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper, self._blosc_codec.decode, chunk_bytes, chunk_spec.prototype
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return chunk_spec.prototype.buffer.from_bytes(
self._blosc_codec.encode(chunk_bytes.as_numpy_array())
)

async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
# Since blosc only support host memory, we convert the input and output of the encoding
# between numpy array and buffer
return await asyncio.to_thread(
lambda chunk: chunk_spec.prototype.buffer.from_bytes(
self._blosc_codec.encode(chunk.as_numpy_array())
),
chunk_bytes,
)
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)

def compute_encoded_size(self, _input_byte_length: int, _chunk_spec: ArraySpec) -> int:
raise NotImplementedError
18 changes: 16 additions & 2 deletions src/zarr/codecs/bytes.py
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ def evolve_from_array_spec(self, array_spec: ArraySpec) -> Self:
)
return self

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -88,7 +88,14 @@ async def _decode_single(
)
return chunk_array

async def _encode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> NDBuffer:
return self._decode_sync(chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
Expand All @@ -109,5 +116,12 @@ async def _encode_single(
nd_array = nd_array.ravel().view(dtype="B")
return chunk_spec.prototype.buffer.from_array_like(nd_array)

async def _encode_single(
self,
chunk_array: NDBuffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return self._encode_sync(chunk_array, chunk_spec)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length
18 changes: 16 additions & 2 deletions src/zarr/codecs/crc32c_.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
def to_dict(self) -> dict[str, JSON]:
return {"name": "crc32c"}

async def _decode_single(
def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -51,7 +51,14 @@ async def _decode_single(
)
return chunk_spec.prototype.buffer.from_array_like(inner_bytes)

async def _encode_single(
async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return self._decode_sync(chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
Expand All @@ -64,5 +71,12 @@ async def _encode_single(
# Append the checksum (as bytes) to the data
return chunk_spec.prototype.buffer.from_array_like(np.append(data, checksum.view("B")))

async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return self._encode_sync(chunk_bytes, chunk_spec)

def compute_encoded_size(self, input_byte_length: int, _chunk_spec: ArraySpec) -> int:
return input_byte_length + 4
27 changes: 21 additions & 6 deletions src/zarr/codecs/gzip.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

import asyncio
from dataclasses import dataclass
from functools import cached_property
from typing import TYPE_CHECKING

from numcodecs.gzip import GZip
Expand Down Expand Up @@ -48,23 +49,37 @@ def from_dict(cls, data: dict[str, JSON]) -> Self:
def to_dict(self) -> dict[str, JSON]:
return {"name": "gzip", "configuration": {"level": self.level}}

@cached_property
def _gzip_codec(self) -> GZip:
return GZip(self.level)

def _decode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return as_numpy_array_wrapper(self._gzip_codec.decode, chunk_bytes, chunk_spec.prototype)

async def _decode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer:
return await asyncio.to_thread(
as_numpy_array_wrapper, GZip(self.level).decode, chunk_bytes, chunk_spec.prototype
)
return await asyncio.to_thread(self._decode_sync, chunk_bytes, chunk_spec)

def _encode_sync(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return as_numpy_array_wrapper(self._gzip_codec.encode, chunk_bytes, chunk_spec.prototype)

async def _encode_single(
self,
chunk_bytes: Buffer,
chunk_spec: ArraySpec,
) -> Buffer | None:
return await asyncio.to_thread(
as_numpy_array_wrapper, GZip(self.level).encode, chunk_bytes, chunk_spec.prototype
)
return await asyncio.to_thread(self._encode_sync, chunk_bytes, chunk_spec)

def compute_encoded_size(
self,
Expand Down
Loading
Loading