Skip to content

Commit ad087e1

Browse files
committed
Fix: Verbatim PFB handover fails when using a common prefix (#6843)
1 parent cd9b74d commit ad087e1

File tree

1 file changed

+30
-11
lines changed

1 file changed

+30
-11
lines changed

src/azul/service/manifest_service.py

+30-11
Original file line numberDiff line numberDiff line change
@@ -102,6 +102,9 @@
102102
from azul.deployment import (
103103
aws,
104104
)
105+
from azul.indexer import (
106+
SourceSpec,
107+
)
105108
from azul.indexer.document import (
106109
DocumentType,
107110
FieldPath,
@@ -2051,6 +2054,9 @@ class ReplicaKeys:
20512054
"""
20522055
hub_id: str
20532056
replica_ids: list[str]
2057+
# We need to track whether sources use a common prefix to avoid dangling
2058+
# references in PFB relations
2059+
source_spec: str
20542060

20552061
def _replica_keys(self) -> Iterable[ReplicaKeys]:
20562062
request = self._create_request()
@@ -2065,14 +2071,16 @@ def _replica_keys(self) -> Iterable[ReplicaKeys]:
20652071
for document_id in always_iterable(inner_entity['document_id'])
20662072
]
20672073
yield self.ReplicaKeys(hub_id=hit['entity_id'],
2068-
replica_ids=document_ids)
2074+
replica_ids=document_ids,
2075+
source_spec=one(hit['sources'])['spec'])
20692076

2070-
def _all_replicas(self) -> Iterable[JSON]:
2077+
def _all_replicas(self) -> Iterable[tuple[JSON, set[str]]]:
20712078
emitted_replica_ids = set()
20722079
page_size = 100
20732080
for page in chunked(self._replica_keys(), page_size):
20742081
num_replicas = 0
20752082
num_new_replicas = 0
2083+
sources = {keys.source_spec for keys in page}
20762084
for replica in self._join_replicas(page):
20772085
num_replicas += 1
20782086
# A single replica may have many hubs. To prevent replicas from
@@ -2081,7 +2089,7 @@ def _all_replicas(self) -> Iterable[JSON]:
20812089
replica_id = replica.meta.id
20822090
if replica_id not in emitted_replica_ids:
20832091
num_new_replicas += 1
2084-
yield replica.to_dict()
2092+
yield replica.to_dict(), sources
20852093
emitted_replica_ids.add(replica_id)
20862094
log.info('Found %d replicas (%d already emitted) from page of %d hubs',
20872095
num_replicas, num_replicas - num_new_replicas, len(page))
@@ -2119,7 +2127,7 @@ def create_file(self) -> tuple[str, str | None]:
21192127
fd, path = mkstemp(suffix=f'.{self.file_name_extension()}')
21202128
os.close(fd)
21212129
with open(path, 'w') as f:
2122-
for replica in self._all_replicas():
2130+
for replica, _ in self._all_replicas():
21232131
entry = {
21242132
'value': replica['contents'],
21252133
'type': replica['replica_type']
@@ -2143,7 +2151,14 @@ def file_name_extension(cls):
21432151
def format(cls) -> ManifestFormat:
21442152
return ManifestFormat.verbatim_pfb
21452153

2146-
def _include_relations(self) -> bool:
2154+
@property
2155+
def included_fields(self) -> list[FieldPath]:
2156+
return [
2157+
*super().included_fields,
2158+
('sources',)
2159+
]
2160+
2161+
def _include_relations(self, sources: Iterable[str]) -> bool:
21472162
# Terra will reject the handover if the manifest includes
21482163
# dangling relations, i.e., if any entity references another
21492164
# entity that isn't included in the manifest. There are three
@@ -2171,14 +2186,17 @@ def _include_relations(self) -> bool:
21712186
# See https://github.com/DataBiosphere/azul/issues/4440
21722187
#
21732188
# (1) can only occur when orphans are included, and (2) and (3)
2174-
# can only occur when orphans are *not* included. Because (1)
2175-
# can only occur on lower deployments, we make the inclusion of
2176-
# relations conditional on avoiding (2) and (3).
2189+
# can only occur when orphans are *not* included.
21772190
#
2178-
return config.enable_verbatim_relations and self.include_orphans
2191+
complete = not any(
2192+
SourceSpec.parse_prefix(source)[1].common
2193+
for source in sources
2194+
)
2195+
return config.enable_verbatim_relations and self.include_orphans and complete
21792196

21802197
def create_file(self) -> tuple[str, str | None]:
2181-
replicas = list(self._all_replicas())
2198+
replicas, sources = zip(*self._all_replicas())
2199+
sources = set().union(*sources)
21822200
plugin = self.metadata_plugin
21832201
replica_schemas = plugin.verbatim_pfb_schema(replicas)
21842202
# Ensure field order is consistent for unit tests
@@ -2192,10 +2210,11 @@ def create_file(self) -> tuple[str, str | None]:
21922210

21932211
def pfb_entities():
21942212
yield pfb_metadata_entity
2213+
include_relations = self._include_relations(sources)
21952214
for replica in replicas:
21962215
id = plugin.verbatim_pfb_entity_id(replica)
21972216
entity = avro_pfb.PFBEntity.for_replica(id, dict(replica))
2198-
if self._include_relations():
2217+
if include_relations:
21992218
relations = plugin.verbatim_pfb_relations(replica)
22002219
entity_relations = [
22012220
PFBRelation(dst_name=replica_type, dst_id=entity_id)

0 commit comments

Comments
 (0)