Skip to content

Commit

Permalink
Refactor and pluginify verbatim PFB manifest creation
Browse files Browse the repository at this point in the history
  • Loading branch information
nadove-ucsc committed Jan 10, 2025
1 parent d5c8415 commit 58c931c
Show file tree
Hide file tree
Showing 4 changed files with 50 additions and 13 deletions.
18 changes: 18 additions & 0 deletions src/azul/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -469,6 +469,24 @@ def verbatim_pfb_schema(self,
)
return avro_pfb.pfb_schema_from_replicas(replicas)

def verbatim_pfb_relations(self,
replicas: Iterable[JSON]
) -> dict[str, list[tuple[str, str]]]:
"""
:param replicas: a topologically sorted list of replicas.
:return: A mapping from the replicas' entity IDs to the replicas they
reference/depend on, represented as (replica_type, entity_id)
pairs.
"""
return {}

def verbatim_pfb_links(self, replica_types: Iterable[str]) -> MutableJSON:
"""
Express the relationships between the given replica types as PFB links
(https://uc-cdis.github.io/pypfb/#link).
"""
return {replica_type: [] for replica_type in replica_types}

@abstractmethod
def document_slice(self, entity_type: str) -> DocumentSlice | None:
raise NotImplementedError
Expand Down
24 changes: 16 additions & 8 deletions src/azul/service/avro_pfb.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
)
from typing import (
ClassVar,
Mapping,
MutableSet,
Self,
)
Expand Down Expand Up @@ -63,6 +64,7 @@
AnyMutableJSON,
JSON,
MutableJSON,
MutableJSONs,
)

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -269,8 +271,18 @@ def to_entity(cls, entity: PFBEntity) -> Self:
return cls(dst_id=entity.id, dst_name=entity.name)


def pfb_metadata_entity(entity_types: Iterable[str],
links: bool = True
def pfb_links_from_field_types(field_types: FieldTypes) -> MutableJSON:
return {
entity_type: [] if entity_type == 'files' else [{
'multiplicity': 'MANY_TO_MANY',
'dst': 'files',
'name': 'files'
}]
for entity_type in field_types
}


def pfb_metadata_entity(links_by_entity_type: Mapping[str, MutableJSONs],
) -> MutableJSON:
"""
The Metadata entity encodes the possible relationships between tables.
Expand All @@ -286,13 +298,9 @@ def pfb_metadata_entity(entity_types: Iterable[str],
'name': entity_type,
'ontology_reference': '',
'values': {},
'links': [] if not links or entity_type == 'files' else [{
'multiplicity': 'MANY_TO_MANY',
'dst': 'files',
'name': 'files'
}],
'links': links,
'properties': []
} for entity_type in entity_types
} for entity_type, links in links_by_entity_type.items()
],
'misc': {}
}
Expand Down
18 changes: 14 additions & 4 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,9 @@
Filters,
avro_pfb,
)
from azul.service.avro_pfb import (
PFBRelation,
)
from azul.service.elasticsearch_service import (
ElasticsearchService,
Pagination,
Expand Down Expand Up @@ -1734,7 +1737,8 @@ def create_file(self) -> tuple[str, Optional[str]]:
for doc in self._all_docs_sorted():
converter.add_doc(doc)

entity = avro_pfb.pfb_metadata_entity(field_types)
links = avro_pfb.pfb_links_from_field_types(field_types)
entity = avro_pfb.pfb_metadata_entity(links)
entities = itertools.chain([entity], converter.entities())

fd, path = mkstemp(suffix='.avro')
Expand Down Expand Up @@ -2127,17 +2131,23 @@ def format(cls) -> ManifestFormat:
def create_file(self) -> tuple[str, Optional[str]]:
plugin = self.service.metadata_plugin(self.catalog)
replicas = list(self._all_replicas())
relations = plugin.verbatim_pfb_relations(replicas)
replica_schemas = plugin.verbatim_pfb_schema(replicas)
# Ensure field order is consistent for unit tests
replica_schemas.sort(key=itemgetter('name'))
replica_types = [s['name'] for s in replica_schemas]
links = plugin.verbatim_pfb_links(s['name'] for s in replica_schemas)
pfb_schema = avro_pfb.avro_pfb_schema(replica_schemas)
pfb_metadata_entity = avro_pfb.pfb_metadata_entity(replica_types, links=False)
pfb_metadata_entity = avro_pfb.pfb_metadata_entity(links)

def pfb_entities():
yield pfb_metadata_entity
for replica in replicas:
yield avro_pfb.PFBEntity.for_replica(dict(replica), pfb_schema).to_json(())
entity = avro_pfb.PFBEntity.for_replica(dict(replica), pfb_schema)
entity_relations = [
PFBRelation(dst_name=replica_type, dst_id=entity_id)
for replica_type, entity_id in relations.get(replica['entity_id'], ())
]
yield entity.to_json(entity_relations)

fd, path = mkstemp(suffix=f'.{self.file_name_extension()}')
os.close(fd)
Expand Down
3 changes: 2 additions & 1 deletion test/service/test_manifest.py
Original file line number Diff line number Diff line change
Expand Up @@ -2240,7 +2240,8 @@ def test_terra_pfb_schema(self):
self._assert_pfb_schema(schema)

def test_pfb_metadata_object(self):
metadata_entity = avro_pfb.pfb_metadata_entity(FileTransformer.field_types())
links = avro_pfb.pfb_links_from_field_types(FileTransformer.field_types())
metadata_entity = avro_pfb.pfb_metadata_entity(links)
field_types = FileTransformer.field_types()
schema = avro_pfb.pfb_schema_from_field_types(field_types)
parsed_schema = fastavro.parse_schema(cast(dict, schema))
Expand Down

0 comments on commit 58c931c

Please sign in to comment.