diff --git a/src/azul/plugins/__init__.py b/src/azul/plugins/__init__.py index 6aa1c59d8c..4afab46d37 100644 --- a/src/azul/plugins/__init__.py +++ b/src/azul/plugins/__init__.py @@ -469,6 +469,24 @@ def verbatim_pfb_schema(self, ) return avro_pfb.pfb_schema_from_replicas(replicas) + def verbatim_pfb_relations(self, + replicas: Iterable[JSON] + ) -> dict[str, list[tuple[str, str]]]: + """ + :param replicas: a topologically sorted list of replicas. + :return: A mapping from the replicas' entity IDs to the replicas they + reference/depend on, represented as (replica_type, entity_id) + pairs. + """ + return {} + + def verbatim_pfb_links(self, replica_types: Iterable[str]) -> MutableJSON: + """ + Express the relationships between the given replica types as PFB links + (https://uc-cdis.github.io/pypfb/#link). + """ + return {replica_type: [] for replica_type in replica_types} + @abstractmethod def document_slice(self, entity_type: str) -> DocumentSlice | None: raise NotImplementedError diff --git a/src/azul/service/avro_pfb.py b/src/azul/service/avro_pfb.py index eb1f3b7fbe..c51f9f350d 100644 --- a/src/azul/service/avro_pfb.py +++ b/src/azul/service/avro_pfb.py @@ -15,6 +15,7 @@ ) from typing import ( ClassVar, + Mapping, MutableSet, Self, ) @@ -63,6 +64,7 @@ AnyMutableJSON, JSON, MutableJSON, + MutableJSONs, ) log = logging.getLogger(__name__) @@ -269,8 +271,18 @@ def to_entity(cls, entity: PFBEntity) -> Self: return cls(dst_id=entity.id, dst_name=entity.name) -def pfb_metadata_entity(entity_types: Iterable[str], - links: bool = True +def pfb_links_from_field_types(field_types: FieldTypes) -> MutableJSON: + return { + entity_type: [] if entity_type == 'files' else [{ + 'multiplicity': 'MANY_TO_MANY', + 'dst': 'files', + 'name': 'files' + }] + for entity_type in field_types + } + + +def pfb_metadata_entity(links_by_entity_type: Mapping[str, MutableJSONs], ) -> MutableJSON: """ The Metadata entity encodes the possible relationships between tables. @@ -286,13 +298,9 @@ def pfb_metadata_entity(entity_types: Iterable[str], 'name': entity_type, 'ontology_reference': '', 'values': {}, - 'links': [] if not links or entity_type == 'files' else [{ - 'multiplicity': 'MANY_TO_MANY', - 'dst': 'files', - 'name': 'files' - }], + 'links': links, 'properties': [] - } for entity_type in entity_types + } for entity_type, links in links_by_entity_type.items() ], 'misc': {} } diff --git a/src/azul/service/manifest_service.py b/src/azul/service/manifest_service.py index e700df99b3..8b7445cb4c 100644 --- a/src/azul/service/manifest_service.py +++ b/src/azul/service/manifest_service.py @@ -127,6 +127,9 @@ Filters, avro_pfb, ) +from azul.service.avro_pfb import ( + PFBRelation, +) from azul.service.elasticsearch_service import ( ElasticsearchService, Pagination, @@ -1734,7 +1737,8 @@ def create_file(self) -> tuple[str, Optional[str]]: for doc in self._all_docs_sorted(): converter.add_doc(doc) - entity = avro_pfb.pfb_metadata_entity(field_types) + links = avro_pfb.pfb_links_from_field_types(field_types) + entity = avro_pfb.pfb_metadata_entity(links) entities = itertools.chain([entity], converter.entities()) fd, path = mkstemp(suffix='.avro') @@ -2127,17 +2131,23 @@ def format(cls) -> ManifestFormat: def create_file(self) -> tuple[str, Optional[str]]: plugin = self.service.metadata_plugin(self.catalog) replicas = list(self._all_replicas()) + relations = plugin.verbatim_pfb_relations(replicas) replica_schemas = plugin.verbatim_pfb_schema(replicas) # Ensure field order is consistent for unit tests replica_schemas.sort(key=itemgetter('name')) - replica_types = [s['name'] for s in replica_schemas] + links = plugin.verbatim_pfb_links(s['name'] for s in replica_schemas) pfb_schema = avro_pfb.avro_pfb_schema(replica_schemas) - pfb_metadata_entity = avro_pfb.pfb_metadata_entity(replica_types, links=False) + pfb_metadata_entity = avro_pfb.pfb_metadata_entity(links) def pfb_entities(): yield pfb_metadata_entity for replica in replicas: - yield avro_pfb.PFBEntity.for_replica(dict(replica), pfb_schema).to_json(()) + entity = avro_pfb.PFBEntity.for_replica(dict(replica), pfb_schema) + entity_relations = [ + PFBRelation(dst_name=replica_type, dst_id=entity_id) + for replica_type, entity_id in relations.get(replica['entity_id'], ()) + ] + yield entity.to_json(entity_relations) fd, path = mkstemp(suffix=f'.{self.file_name_extension()}') os.close(fd) diff --git a/test/service/test_manifest.py b/test/service/test_manifest.py index 8fe99a5893..8505308e27 100644 --- a/test/service/test_manifest.py +++ b/test/service/test_manifest.py @@ -2240,7 +2240,8 @@ def test_terra_pfb_schema(self): self._assert_pfb_schema(schema) def test_pfb_metadata_object(self): - metadata_entity = avro_pfb.pfb_metadata_entity(FileTransformer.field_types()) + links = avro_pfb.pfb_links_from_field_types(FileTransformer.field_types()) + metadata_entity = avro_pfb.pfb_metadata_entity(links) field_types = FileTransformer.field_types() schema = avro_pfb.pfb_schema_from_field_types(field_types) parsed_schema = fastavro.parse_schema(cast(dict, schema))