Skip to content

Commit d19f3f0

Browse files
Ignore stale children when reconsolidating metadata (#2980)
Co-authored-by: David Stansby <[email protected]>
1 parent 36ca497 commit d19f3f0

File tree

4 files changed

+109
-8
lines changed

4 files changed

+109
-8
lines changed

changes/2921.bugfix.rst

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
Ignore stale child metadata when reconsolidating metadata.

src/zarr/api/asynchronous.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -201,7 +201,10 @@ async def consolidate_metadata(
201201
group = await AsyncGroup.open(store_path, zarr_format=zarr_format, use_consolidated=False)
202202
group.store_path.store._check_writable()
203203

204-
members_metadata = {k: v.metadata async for k, v in group.members(max_depth=None)}
204+
members_metadata = {
205+
k: v.metadata
206+
async for k, v in group.members(max_depth=None, use_consolidated_for_children=False)
207+
}
205208
# While consolidating, we want to be explicit about when child groups
206209
# are empty by inserting an empty dict for consolidated_metadata.metadata
207210
for k, v in members_metadata.items():

src/zarr/core/group.py

Lines changed: 61 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -1305,6 +1305,8 @@ async def nmembers(
13051305
async def members(
13061306
self,
13071307
max_depth: int | None = 0,
1308+
*,
1309+
use_consolidated_for_children: bool = True,
13081310
) -> AsyncGenerator[
13091311
tuple[str, AsyncArray[ArrayV2Metadata] | AsyncArray[ArrayV3Metadata] | AsyncGroup],
13101312
None,
@@ -1322,6 +1324,11 @@ async def members(
13221324
default, (``max_depth=0``) only immediate children are included. Set
13231325
``max_depth=None`` to include all nodes, and some positive integer
13241326
to consider children within that many levels of the root Group.
1327+
use_consolidated_for_children : bool, default True
1328+
Whether to use the consolidated metadata of child groups loaded
1329+
from the store. Note that this only affects groups loaded from the
1330+
store. If the current Group already has consolidated metadata, it
1331+
will always be used.
13251332
13261333
Returns
13271334
-------
@@ -1332,7 +1339,9 @@ async def members(
13321339
"""
13331340
if max_depth is not None and max_depth < 0:
13341341
raise ValueError(f"max_depth must be None or >= 0. Got '{max_depth}' instead")
1335-
async for item in self._members(max_depth=max_depth):
1342+
async for item in self._members(
1343+
max_depth=max_depth, use_consolidated_for_children=use_consolidated_for_children
1344+
):
13361345
yield item
13371346

13381347
def _members_consolidated(
@@ -1362,7 +1371,7 @@ def _members_consolidated(
13621371
yield from obj._members_consolidated(new_depth, prefix=key)
13631372

13641373
async def _members(
1365-
self, max_depth: int | None
1374+
self, max_depth: int | None, *, use_consolidated_for_children: bool = True
13661375
) -> AsyncGenerator[
13671376
tuple[str, AsyncArray[ArrayV3Metadata] | AsyncArray[ArrayV2Metadata] | AsyncGroup], None
13681377
]:
@@ -1392,7 +1401,11 @@ async def _members(
13921401
# enforce a concurrency limit by passing a semaphore to all the recursive functions
13931402
semaphore = asyncio.Semaphore(config.get("async.concurrency"))
13941403
async for member in _iter_members_deep(
1395-
self, max_depth=max_depth, skip_keys=skip_keys, semaphore=semaphore
1404+
self,
1405+
max_depth=max_depth,
1406+
skip_keys=skip_keys,
1407+
semaphore=semaphore,
1408+
use_consolidated_for_children=use_consolidated_for_children,
13961409
):
13971410
yield member
13981411

@@ -2092,10 +2105,34 @@ def nmembers(self, max_depth: int | None = 0) -> int:
20922105

20932106
return self._sync(self._async_group.nmembers(max_depth=max_depth))
20942107

2095-
def members(self, max_depth: int | None = 0) -> tuple[tuple[str, Array | Group], ...]:
2108+
def members(
2109+
self, max_depth: int | None = 0, *, use_consolidated_for_children: bool = True
2110+
) -> tuple[tuple[str, Array | Group], ...]:
20962111
"""
2097-
Return the sub-arrays and sub-groups of this group as a tuple of (name, array | group)
2098-
pairs
2112+
Returns an AsyncGenerator over the arrays and groups contained in this group.
2113+
This method requires that `store_path.store` supports directory listing.
2114+
2115+
The results are not guaranteed to be ordered.
2116+
2117+
Parameters
2118+
----------
2119+
max_depth : int, default 0
2120+
The maximum number of levels of the hierarchy to include. By
2121+
default, (``max_depth=0``) only immediate children are included. Set
2122+
``max_depth=None`` to include all nodes, and some positive integer
2123+
to consider children within that many levels of the root Group.
2124+
use_consolidated_for_children : bool, default True
2125+
Whether to use the consolidated metadata of child groups loaded
2126+
from the store. Note that this only affects groups loaded from the
2127+
store. If the current Group already has consolidated metadata, it
2128+
will always be used.
2129+
2130+
Returns
2131+
-------
2132+
path:
2133+
A string giving the path to the target, relative to the Group ``self``.
2134+
value: AsyncArray or AsyncGroup
2135+
The AsyncArray or AsyncGroup that is a child of ``self``.
20992136
"""
21002137
_members = self._sync_iter(self._async_group.members(max_depth=max_depth))
21012138

@@ -3329,6 +3366,7 @@ async def _iter_members_deep(
33293366
max_depth: int | None,
33303367
skip_keys: tuple[str, ...],
33313368
semaphore: asyncio.Semaphore | None = None,
3369+
use_consolidated_for_children: bool = True,
33323370
) -> AsyncGenerator[
33333371
tuple[str, AsyncArray[ArrayV3Metadata] | AsyncArray[ArrayV2Metadata] | AsyncGroup], None
33343372
]:
@@ -3346,6 +3384,11 @@ async def _iter_members_deep(
33463384
A tuple of keys to skip when iterating over the possible members of the group.
33473385
semaphore : asyncio.Semaphore | None
33483386
An optional semaphore to use for concurrency control.
3387+
use_consolidated_for_children : bool, default True
3388+
Whether to use the consolidated metadata of child groups loaded
3389+
from the store. Note that this only affects groups loaded from the
3390+
store. If the current Group already has consolidated metadata, it
3391+
will always be used.
33493392
33503393
Yields
33513394
------
@@ -3360,8 +3403,19 @@ async def _iter_members_deep(
33603403
else:
33613404
new_depth = max_depth - 1
33623405
async for name, node in _iter_members(group, skip_keys=skip_keys, semaphore=semaphore):
3406+
is_group = isinstance(node, AsyncGroup)
3407+
if (
3408+
is_group
3409+
and not use_consolidated_for_children
3410+
and node.metadata.consolidated_metadata is not None # type: ignore [union-attr]
3411+
):
3412+
node = cast("AsyncGroup", node)
3413+
# We've decided not to trust consolidated metadata at this point, because we're
3414+
# reconsolidating the metadata, for example.
3415+
node = replace(node, metadata=replace(node.metadata, consolidated_metadata=None))
33633416
yield name, node
3364-
if isinstance(node, AsyncGroup) and do_recursion:
3417+
if is_group and do_recursion:
3418+
node = cast("AsyncGroup", node)
33653419
to_recurse[name] = _iter_members_deep(
33663420
node, max_depth=new_depth, skip_keys=skip_keys, semaphore=semaphore
33673421
)

tests/test_metadata/test_consolidated.py

Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -574,6 +574,49 @@ async def test_use_consolidated_false(
574574
assert good.metadata.consolidated_metadata
575575
assert sorted(good.metadata.consolidated_metadata.metadata) == ["a", "b"]
576576

577+
async def test_stale_child_metadata_ignored(self, memory_store: zarr.storage.MemoryStore):
578+
# https://github.com/zarr-developers/zarr-python/issues/2921
579+
# When consolidating metadata, we should ignore any (possibly stale) metadata
580+
# from previous consolidations, *including at child nodes*.
581+
root = await zarr.api.asynchronous.group(store=memory_store, zarr_format=3)
582+
await root.create_group("foo")
583+
await zarr.api.asynchronous.consolidate_metadata(memory_store, path="foo")
584+
await root.create_group("foo/bar/spam")
585+
586+
await zarr.api.asynchronous.consolidate_metadata(memory_store)
587+
588+
reopened = await zarr.api.asynchronous.open_consolidated(store=memory_store, zarr_format=3)
589+
result = [x[0] async for x in reopened.members(max_depth=None)]
590+
expected = ["foo", "foo/bar", "foo/bar/spam"]
591+
assert result == expected
592+
593+
async def test_use_consolidated_for_children_members(
594+
self, memory_store: zarr.storage.MemoryStore
595+
):
596+
# A test that has *unconsolidated* metadata at the root group, but discovers
597+
# a child group with consolidated metadata.
598+
599+
root = await zarr.api.asynchronous.create_group(store=memory_store)
600+
await root.create_group("a/b")
601+
# Consolidate metadata at "a/b"
602+
await zarr.api.asynchronous.consolidate_metadata(memory_store, path="a/b")
603+
604+
# Add a new group a/b/c, that's not present in the CM at "a/b"
605+
await root.create_group("a/b/c")
606+
607+
# Now according to the consolidated metadata, "a" has children ["b"]
608+
# but according to the unconsolidated metadata, "a" has children ["b", "c"]
609+
group = await zarr.api.asynchronous.open_group(store=memory_store, path="a")
610+
with pytest.warns(UserWarning, match="Object at 'c' not found"):
611+
result = sorted([x[0] async for x in group.members(max_depth=None)])
612+
expected = ["b"]
613+
assert result == expected
614+
615+
result = sorted(
616+
[x[0] async for x in group.members(max_depth=None, use_consolidated_for_children=False)]
617+
)
618+
expected = ["b", "b/c"]
619+
assert result == expected
577620

578621
@pytest.mark.parametrize("fill_value", [np.nan, np.inf, -np.inf])
579622
async def test_consolidated_metadata_encodes_special_chars(

0 commit comments

Comments
 (0)