Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion python/zarrs/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
CollapsedDimensionError,
DiscontiguousArrayError,
FillValueNoneError,
get_implicit_fill_value,
make_chunk_info_for_rust_with_indices,
)

Expand Down Expand Up @@ -62,8 +63,10 @@ def get_codec_pipeline_impl(
),
num_threads=config.get("threading.max_workers", None),
direct_io=config.get("codec_pipeline.direct_io", False),
fill_value=get_implicit_fill_value(metadata.dtype, metadata.fill_value),
dtype_str=str(metadata.dtype.to_native_dtype()),
)
except TypeError as e:
except (TypeError, ValueError) as e:
warn(
f"Array is unsupported by ZarrsCodecPipeline: {e}",
category=UserWarning,
Expand Down
6 changes: 3 additions & 3 deletions python/zarrs/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from zarr.core.array_spec import ArraySpec
from zarr.core.indexing import SelectorTuple, is_integer

from zarrs._internal import Basic, WithSubset
from zarrs._internal import WithSubset

if TYPE_CHECKING:
from collections.abc import Iterable
Expand Down Expand Up @@ -178,7 +178,6 @@ def make_chunk_info_for_rust_with_indices(
chunk_spec.config,
chunk_spec.prototype,
)
chunk_info = Basic(byte_getter, chunk_spec)
out_selection_as_slices = selector_tuple_to_slice_selection(out_selection)
chunk_selection_as_slices = selector_tuple_to_slice_selection(chunk_selection)
shape_chunk_selection_slices = get_shape_for_selector(
Expand All @@ -196,8 +195,9 @@ def make_chunk_info_for_rust_with_indices(
)
chunk_info_with_indices.append(
WithSubset(
chunk_info,
key=byte_getter.path,
chunk_subset=chunk_selection_as_slices,
chunk_shape=chunk_spec.shape,
subset=out_selection_as_slices,
shape=shape,
)
Expand Down
113 changes: 40 additions & 73 deletions src/chunk_item.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,20 @@ use std::num::NonZeroU64;

use pyo3::{
Bound, PyAny, PyErr, PyResult,
exceptions::{PyIndexError, PyRuntimeError, PyValueError},
exceptions::{PyIndexError, PyValueError},
pyclass, pymethods,
types::{PyAnyMethods, PyBytes, PyBytesMethods, PyInt, PySlice, PySliceMethods as _},
};
use pyo3_stub_gen::derive::{gen_stub_pyclass, gen_stub_pymethods};
use zarrs::{
array::{ChunkRepresentation, DataType, FillValue},
array_subset::ArraySubset,
metadata::v3::MetadataV3,
storage::StoreKey,
};

use crate::utils::PyErrExt;

pub(crate) trait ChunksItem {
fn key(&self) -> &StoreKey;
fn representation(&self) -> &ChunkRepresentation;
}

#[derive(Clone)]
#[gen_stub_pyclass]
#[pyclass]
pub(crate) struct Basic {
key: StoreKey,
representation: ChunkRepresentation,
}

fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
pub fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult<Vec<u8>> {
if dtype == "string" {
// Match zarr-python 2.x.x string fill value behaviour with a 0 fill value
// See https://github.com/zarr-developers/zarr-python/issues/2792#issuecomment-2644362122
Expand All @@ -55,40 +41,34 @@ fn fill_value_to_bytes(dtype: &str, fill_value: &Bound<'_, PyAny>) -> PyResult<V
}
}

#[gen_stub_pymethods]
#[pymethods]
impl Basic {
#[new]
fn new(byte_interface: &Bound<'_, PyAny>, chunk_spec: &Bound<'_, PyAny>) -> PyResult<Self> {
let path: String = byte_interface.getattr("path")?.extract()?;

let chunk_shape = chunk_spec.getattr("shape")?.extract()?;
let mut dtype: String = chunk_spec
.getattr("dtype")?
.call_method0("to_native_dtype")?
.call_method0("__str__")?
.extract()?;
if dtype == "object" {
// zarrs doesn't understand `object` which is the output of `np.dtype("|O").__str__()`
// but maps it to "string" internally https://github.com/LDeakin/zarrs/blob/0532fe983b7b42b59dbf84e50a2fe5e6f7bad4ce/zarrs_metadata/src/v2_to_v3.rs#L288
dtype = String::from("string");
}
let fill_value: Bound<'_, PyAny> = chunk_spec.getattr("fill_value")?;
let fill_value_bytes = fill_value_to_bytes(&dtype, &fill_value)?;
Ok(Self {
key: StoreKey::new(path).map_py_err::<PyValueError>()?,
representation: get_chunk_representation(chunk_shape, &dtype, fill_value_bytes)?,
})
}
}

#[derive(Clone)]
#[gen_stub_pyclass]
#[pyclass]
pub(crate) struct WithSubset {
pub item: Basic,
pub key: StoreKey,
pub chunk_subset: ArraySubset,
pub subset: ArraySubset,
chunk_shape: Vec<u64>,
}

pub trait WithRepresentations {
fn with_representations(
self,
data_type: DataType,
fill_value: FillValue,
) -> PyResult<Vec<(ChunkRepresentation, WithSubset)>>;
}

impl WithRepresentations for Vec<WithSubset> {
fn with_representations(
self,
data_type: DataType,
fill_value: FillValue,
) -> PyResult<Vec<(ChunkRepresentation, WithSubset)>> {
self.into_iter()
.map(|f| Ok((f.representation(data_type.clone(), fill_value.clone())?, f)))
.collect::<PyResult<Vec<(ChunkRepresentation, WithSubset)>>>()
}
}

#[gen_stub_pymethods]
Expand All @@ -97,13 +77,13 @@ impl WithSubset {
#[new]
#[allow(clippy::needless_pass_by_value)]
fn new(
item: Basic,
key: String,
chunk_subset: Vec<Bound<'_, PySlice>>,
chunk_shape: Vec<u64>,
subset: Vec<Bound<'_, PySlice>>,
shape: Vec<u64>,
) -> PyResult<Self> {
let chunk_subset =
selection_to_array_subset(&chunk_subset, &item.representation.shape_u64())?;
let chunk_subset = selection_to_array_subset(&chunk_subset, &chunk_shape)?;
let subset = selection_to_array_subset(&subset, &shape)?;
// Check that subset and chunk_subset have the same number of elements.
// This permits broadcasting of a constant input.
Expand All @@ -112,50 +92,37 @@ impl WithSubset {
"the size of the chunk subset {chunk_subset} and input/output subset {subset} are incompatible",
)));
}

Ok(Self {
item,
key: StoreKey::new(key).map_py_err::<PyValueError>()?,
chunk_subset,
subset,
chunk_shape,
})
}
}

impl ChunksItem for Basic {
fn key(&self) -> &StoreKey {
&self.key
}
fn representation(&self) -> &ChunkRepresentation {
&self.representation
}
}

impl ChunksItem for WithSubset {
fn key(&self) -> &StoreKey {
&self.item.key
}
fn representation(&self) -> &ChunkRepresentation {
&self.item.representation
impl WithSubset {
fn representation(
&self,
dtype: DataType,
fill_value: FillValue,
) -> PyResult<ChunkRepresentation> {
get_chunk_representation(self.chunk_shape.clone(), dtype, fill_value)
}
}

fn get_chunk_representation(
chunk_shape: Vec<u64>,
dtype: &str,
fill_value: Vec<u8>,
dtype: DataType,
fill_value: FillValue,
) -> PyResult<ChunkRepresentation> {
// Get the chunk representation
let data_type = DataType::from_metadata(
&MetadataV3::new(dtype),
zarrs::config::global_config().data_type_aliases_v3(),
)
.map_py_err::<PyRuntimeError>()?;
let chunk_shape = chunk_shape
.into_iter()
.map(|x| NonZeroU64::new(x).expect("chunk shapes should always be non-zero"))
.collect();
let chunk_representation =
ChunkRepresentation::new(chunk_shape, data_type, FillValue::new(fill_value))
.map_py_err::<PyValueError>()?;
ChunkRepresentation::new(chunk_shape, dtype, fill_value).map_py_err::<PyValueError>()?;
Ok(chunk_representation)
}

Expand Down
12 changes: 4 additions & 8 deletions src/concurrency.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
use pyo3::PyResult;
use zarrs::array::{
ArrayCodecTraits, RecommendedConcurrency, codec::CodecOptions,
ArrayCodecTraits, ChunkRepresentation, RecommendedConcurrency, codec::CodecOptions,
concurrency::calc_concurrency_outer_inner,
};

use crate::{CodecPipelineImpl, chunk_item::ChunksItem, utils::PyCodecErrExt as _};
use crate::{CodecPipelineImpl, chunk_item::WithSubset, utils::PyCodecErrExt as _};

pub trait ChunkConcurrentLimitAndCodecOptions {
fn get_chunk_concurrent_limit_and_codec_options(
Expand All @@ -13,19 +13,15 @@ pub trait ChunkConcurrentLimitAndCodecOptions {
) -> PyResult<Option<(usize, CodecOptions)>>;
}

impl<T> ChunkConcurrentLimitAndCodecOptions for Vec<T>
where
T: ChunksItem,
{
impl ChunkConcurrentLimitAndCodecOptions for Vec<(ChunkRepresentation, WithSubset)> {
fn get_chunk_concurrent_limit_and_codec_options(
&self,
codec_pipeline_impl: &CodecPipelineImpl,
) -> PyResult<Option<(usize, CodecOptions)>> {
let num_chunks = self.len();
let Some(chunk_descriptions0) = self.first() else {
let Some((chunk_representation, _)) = self.first() else {
return Ok(None);
};
let chunk_representation = chunk_descriptions0.representation();

let codec_concurrency = codec_pipeline_impl
.codec_chain
Expand Down
Loading
Loading