Skip to content

Commit 5eb6897

Browse files
authored
Use block processing to compute basic statistics from a DelayedArray. (#59)
This re-uses some of the existing code for the SparseNdarray statistics.
1 parent 5721e67 commit 5eb6897

File tree

5 files changed

+527
-142
lines changed

5 files changed

+527
-142
lines changed

src/delayedarray/DelayedArray.py

+203-15
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
1-
from typing import Sequence, Tuple, Union
1+
from typing import Sequence, Tuple, Union, Optional, List, Callable
22
import numpy
33
from numpy import array, dtype, integer, issubdtype, ndarray, prod, array2string
4+
from collections import namedtuple
45

56
from .SparseNdarray import SparseNdarray
67
from .BinaryIsometricOp import BinaryIsometricOp
@@ -12,15 +13,18 @@
1213
from .UnaryIsometricOpSimple import UnaryIsometricOpSimple
1314
from .UnaryIsometricOpWithArgs import UnaryIsometricOpWithArgs
1415

15-
from ._subset import _getitem_subset_preserves_dimensions, _getitem_subset_discards_dimensions, _repr_subset
16-
from ._isometric import translate_ufunc_to_op_simple, translate_ufunc_to_op_with_args
1716
from .extract_dense_array import extract_dense_array, to_dense_array
1817
from .extract_sparse_array import extract_sparse_array
18+
from .apply_over_blocks import apply_over_blocks, choose_block_shape_for_iteration
1919
from .create_dask_array import create_dask_array
2020
from .chunk_shape import chunk_shape
2121
from .is_sparse import is_sparse
2222
from .is_masked import is_masked
2323

24+
from ._subset import _getitem_subset_preserves_dimensions, _getitem_subset_discards_dimensions, _repr_subset
25+
from ._isometric import translate_ufunc_to_op_simple, translate_ufunc_to_op_with_args
26+
from ._statistics import array_mean, array_var, array_sum, _create_offset_multipliers
27+
2428
__author__ = "ltla"
2529
__copyright__ = "ltla"
2630
__license__ = "MIT"
@@ -711,20 +715,117 @@ def __getitem__(self, subset: Tuple[Union[slice, Sequence], ...]) -> Union["Dela
711715

712716

713717
# For python-level compute.
714-
def sum(self, *args, **kwargs):
715-
"""See :py:meth:`~numpy.sums` for details."""
716-
target = to_dense_array(self._seed)
717-
return target.sum(*args, **kwargs)
718+
def sum(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None, buffer_size: int = 1e8) -> numpy.ndarray:
719+
"""
720+
Take the sum of values across the ``DelayedArray``, possibly over a
721+
given axis or set of axes. If the seed has a ``sum()`` method, that
722+
method is called directly with the supplied arguments.
723+
724+
Args:
725+
axis:
726+
A single integer specifying the axis to perform the calculation.
727+
Alternatively a tuple or None, see ``numpy.sum`` for details.
728+
729+
dtype:
730+
NumPy type for the output array. If None, this is automatically
731+
chosen based on the type of the ``DelayedArray``, see
732+
``numpy.sum`` for details.
733+
734+
buffer_size:
735+
Buffer size in bytes to use for block processing. Larger values
736+
generally improve speed at the cost of memory.
737+
738+
Returns:
739+
A NumPy array containing the summed values. If ``axis = None``,
740+
this will be a NumPy scalar instead.
741+
"""
742+
if hasattr(self._seed, "sum"):
743+
return self._seed.sum(axis=axis, dtype=dtype)
744+
else:
745+
return array_sum(
746+
self,
747+
axis=axis,
748+
dtype=dtype,
749+
reduce_over_x=lambda x, axes, op : _reduce(x, axes, op, buffer_size),
750+
masked=is_masked(self),
751+
)
718752

719-
def var(self, *args, **kwargs):
720-
"""See :py:meth:`~numpy.vars` for details."""
721-
target = to_dense_array(self._seed)
722-
return target.var(*args, **kwargs)
723753

724-
def mean(self, *args, **kwargs):
725-
"""See :py:meth:`~numpy.means` for details."""
726-
target = to_dense_array(self._seed)
727-
return target.mean(*args, **kwargs)
754+
def mean(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None, buffer_size: int = 1e8) -> numpy.ndarray:
755+
"""
756+
Take the mean of values across the ``DelayedArray``, possibly over a
757+
given axis or set of axes. If the seed has a ``mean()`` method, that
758+
method is called directly with the supplied arguments.
759+
760+
Args:
761+
axis:
762+
A single integer specifying the axis to perform the calculation.
763+
Alternatively a tuple or None, see ``numpy.mean`` for details.
764+
765+
dtype:
766+
NumPy type for the output array. If None, this is automatically
767+
chosen based on the type of the ``DelayedArray``, see
768+
``numpy.mean`` for details.
769+
770+
buffer_size:
771+
Buffer size in bytes to use for block processing. Larger values
772+
generally improve speed at the cost of memory.
773+
774+
Returns:
775+
A NumPy array containing the mean values. If ``axis = None``,
776+
this will be a NumPy scalar instead.
777+
"""
778+
if hasattr(self._seed, "mean"):
779+
return self._seed.mean(axis=axis, dtype=dtype)
780+
else:
781+
return array_mean(
782+
self,
783+
axis=axis,
784+
dtype=dtype,
785+
reduce_over_x=lambda x, axes, op : _reduce(x, axes, op, buffer_size),
786+
masked=is_masked(self),
787+
)
788+
789+
790+
def var(self, axis: Optional[Union[int, Tuple[int, ...]]] = None, dtype: Optional[numpy.dtype] = None, ddof: int = 0, buffer_size: int = 1e8) -> numpy.ndarray:
791+
"""
792+
Take the variances of values across the ``DelayedArray``, possibly over
793+
a given axis or set of axes. If the seed has a ``var()`` method, that
794+
method is called directly with the supplied arguments.
795+
796+
Args:
797+
axis:
798+
A single integer specifying the axis to perform the calculation.
799+
Alternatively a tuple or None, see ``numpy.var`` for details.
800+
801+
dtype:
802+
NumPy type for the output array. If None, this is automatically
803+
chosen based on the type of the ``DelayedArray``, see
804+
``numpy.var`` for details.
805+
806+
ddof:
807+
Delta in the degrees of freedom to subtract from the denominator.
808+
Typically set to 1 to obtain the sample variance.
809+
810+
buffer_size:
811+
Buffer size in bytes to use for block processing. Larger values
812+
generally improve speed at the cost of memory.
813+
814+
Returns:
815+
A NumPy array containing the variances. If ``axis = None``,
816+
this will be a NumPy scalar instead.
817+
"""
818+
if hasattr(self._seed, "var"):
819+
return self._seed.var(axis=axis, dtype=dtype, ddof=ddof)
820+
else:
821+
return array_var(
822+
self,
823+
axis=axis,
824+
dtype=dtype,
825+
ddof=ddof,
826+
reduce_over_x=lambda x, axes, op : _reduce(x, axes, op, buffer_size),
827+
masked=is_masked(self),
828+
)
728829

729830

730831
@extract_dense_array.register
@@ -761,3 +862,90 @@ def is_sparse_DelayedArray(x: DelayedArray):
761862
def is_masked_DelayedArray(x: DelayedArray):
762863
"""See :py:meth:`~delayedarray.is_masked.is_masked`."""
763864
return is_masked(x._seed)
865+
866+
867+
#########################################################
868+
#########################################################
869+
870+
871+
_StatisticsPayload = namedtuple("_StatisticsPayload", [ "operation", "multipliers", "starts" ])
872+
873+
874+
def _reduce_1darray(val: numpy.ndarray, payload: _StatisticsPayload, offset: int = 0):
875+
for i, v in enumerate(val):
876+
extra = payload.multipliers[-1] * (i + payload.starts[-1])
877+
payload.operation(offset + extra, v)
878+
return
879+
880+
881+
def _recursive_reduce_ndarray(x: numpy.ndarray, payload: _StatisticsPayload, dim: int, offset: int = 0):
882+
mult = payload.multipliers[dim]
883+
shift = payload.starts[dim]
884+
if len(x.shape) == 2:
885+
for i in range(x.shape[0]):
886+
_reduce_1darray(x[i], payload, offset = offset + mult * (shift + i))
887+
else:
888+
for i in range(x.shape[0]):
889+
_recursive_reduce_ndarray(x[i], payload, dim = dim + 1, offset = offset + mult * (shift + i))
890+
return
891+
892+
893+
def _reduce_ndarray(block: numpy.ndarray, multipliers: List[int], axes: List[int], position: Tuple[Tuple[int, int], ...], operation: Callable):
894+
ndim = len(block.shape)
895+
payload = _StatisticsPayload(operation=operation, multipliers=multipliers, starts=(*(s[0] for s in position),))
896+
if ndim == 1:
897+
_reduce_1darray(block, payload)
898+
else:
899+
_recursive_reduce_ndarray(block, payload, dim=0)
900+
return
901+
902+
903+
def _reduce_sparse_vector(idx: numpy.ndarray, val: numpy.ndarray, payload: _StatisticsPayload, offset: int = 0):
904+
for j, ix in enumerate(idx):
905+
extra = payload.multipliers[0] * (ix + payload.starts[0])
906+
payload.operation(offset + extra, val[j])
907+
return
908+
909+
910+
def _recursive_reduce_SparseNdarray(contents, payload: _StatisticsPayload, dim: int, offset: int = 0):
911+
mult = payload.multipliers[dim]
912+
start = payload.starts[dim]
913+
if dim == 1:
914+
for i, con in enumerate(contents):
915+
if con is not None:
916+
_reduce_sparse_vector(con[0], con[1], payload, offset = offset + mult * (i + start))
917+
else:
918+
for i, con in enumerate(contents):
919+
if con is not None:
920+
_recursive_reduce_SparseNdarray(con, payload, dim = dim - 1, offset = offset + mult * (i + start))
921+
return
922+
923+
924+
def _reduce_SparseNdarray(x: SparseNdarray, multipliers: List[int], axes: List[int], position: Tuple[Tuple[int, int], ...], operation: Callable):
925+
if x.contents is not None:
926+
payload = _StatisticsPayload(operation=operation, multipliers=multipliers, starts=(*(s[0] for s in position),))
927+
ndim = len(x.shape)
928+
if ndim == 1:
929+
_reduce_sparse_vector(x.contents[0], x.contents[1], payload)
930+
else:
931+
_recursive_reduce_SparseNdarray(x.contents, payload, dim=ndim - 1)
932+
return
933+
934+
935+
def _reduce(x: DelayedArray, axes: List[int], operation: Callable, buffer_size: int):
936+
block_shape = choose_block_shape_for_iteration(x, memory = buffer_size)
937+
multipliers = _create_offset_multipliers(x.shape, axes)
938+
if is_sparse(x):
939+
apply_over_blocks(
940+
x,
941+
lambda position, block : _reduce_SparseNdarray(block, multipliers, axes, position, operation),
942+
block_shape=block_shape,
943+
allow_sparse=True
944+
)
945+
else:
946+
apply_over_blocks(
947+
x,
948+
lambda position, block : _reduce_ndarray(block, multipliers, axes, position, operation),
949+
block_shape=block_shape
950+
)
951+
return

0 commit comments

Comments
 (0)