Skip to content

Commit

Permalink
[r] Improve handling of DUOS replicas (#6139, PR #6847)
Browse files Browse the repository at this point in the history
  • Loading branch information
dsotirho-ucsc committed Jan 30, 2025
2 parents c4a20e5 + 38a7270 commit 033d2ff
Show file tree
Hide file tree
Showing 16 changed files with 864 additions and 268 deletions.
20 changes: 12 additions & 8 deletions src/azul/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,15 +54,18 @@ def entity_type(cls) -> EntityType:
"""
raise NotImplementedError

@abstractmethod
def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
def _replica_type(self, entity: EntityReference) -> str:
"""
A tuple consisting of:
1. The name of the type of replica emitted by this transformer for a
given entity. See :py:attr:`Replica.replica_type`.
The name of the type of replica emitted by this transformer for a given
entity. See :py:attr:`Replica.replica_type`.
"""
return entity.entity_type

2. The contents of the replica for that entity.
@abstractmethod
def _replica_contents(self, entity: EntityReference) -> JSON:
"""
The contents of the replica emitted by this transformer for a given
entity.
"""
raise NotImplementedError

Expand Down Expand Up @@ -136,7 +139,8 @@ def _replica(self,
root_hub: EntityID,
file_hub: EntityID | None,
) -> Replica:
replica_type, contents = self._replicate(entity)
replica_type = self._replica_type(entity)
contents = self._replica_contents(entity)
coordinates = ReplicaCoordinates(content_hash=json_hash(contents).hexdigest(),
entity=entity)
return Replica(coordinates=coordinates,
Expand Down
3 changes: 3 additions & 0 deletions src/azul/plugins/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -470,6 +470,9 @@ def facets(self) -> Sequence[str]:
def manifest_config(self) -> ManifestConfig:
raise NotImplementedError

def verbatim_pfb_entity_id(self, replica: JSON) -> str:
return replica['entity_id']

def verbatim_pfb_schema(self,
replicas: list[JSON]
) -> list[JSON]:
Expand Down
29 changes: 9 additions & 20 deletions src/azul/plugins/metadata/anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -333,6 +333,12 @@ def recurse(mapping: MetadataPlugin._FieldMapping, path: FieldPath):
result[('contents', 'files')]['file_url'] = 'files.azul_file_url'
return result

def verbatim_pfb_entity_id(self, replica: JSON) -> str:
if replica['replica_type'] == 'duos_dataset_registration':
return replica['contents']['duos_id']
else:
return super().verbatim_pfb_entity_id(replica)

def verbatim_pfb_schema(self,
replicas: list[JSON]
) -> list[JSON]:
Expand All @@ -349,26 +355,13 @@ def verbatim_pfb_schema(self,
entity_schemas = super().verbatim_pfb_schema(non_schema_replicas)
# For the rest, use the AnVIL schema as the basis of the PFB schema
for table_name, table_schema in table_schemas_by_name.items():
# FIXME: Improve handling of DUOS replicas
# https://github.com/DataBiosphere/azul/issues/6139
is_duos_type = table_name == 'anvil_dataset'
field_schemas = [
self._pfb_schema_from_anvil_column(table_name=table_name,
column_name='datarepo_row_id',
anvil_datatype='string',
is_optional=False,
is_polymorphic=is_duos_type)
is_optional=False)
]
if is_duos_type:
field_schemas.append(self._pfb_schema_from_anvil_column(table_name=table_name,
column_name='duos_id',
anvil_datatype='string',
is_polymorphic=True))
field_schemas.append(self._pfb_schema_from_anvil_column(table_name=table_name,
column_name='description',
anvil_datatype='string',
is_polymorphic=True))
elif table_name == 'anvil_file':
if table_name == 'anvil_file':
field_schemas.append(self._pfb_schema_from_anvil_column(table_name=table_name,
column_name='drs_uri',
anvil_datatype='string'))
Expand All @@ -378,8 +371,7 @@ def verbatim_pfb_schema(self,
column_name=column_schema['name'],
anvil_datatype=column_schema['datatype'],
is_array=column_schema['array_of'],
is_optional=not column_schema['required'],
is_polymorphic=is_duos_type)
is_optional=not column_schema['required'])
)

field_schemas.sort(key=itemgetter('name'))
Expand All @@ -397,7 +389,6 @@ def _pfb_schema_from_anvil_column(self,
anvil_datatype: str,
is_array: bool = False,
is_optional: bool = True,
is_polymorphic: bool = False
) -> AnyMutableJSON:
_anvil_to_pfb_types = {
'boolean': 'boolean',
Expand All @@ -414,8 +405,6 @@ def _pfb_schema_from_anvil_column(self,
'type': 'array',
'items': type_
}
if is_polymorphic and (is_array or not is_optional):
type_ = ['null', type_]
return {
'name': column_name,
'namespace': table_name,
Expand Down
18 changes: 14 additions & 4 deletions src/azul/plugins/metadata/anvil/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -192,9 +192,8 @@ def _transform(self,
) -> Iterable[Contribution | Replica]:
raise NotImplementedError

def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
content = ChainMap(self.bundle.entities, self.bundle.orphans)[entity]
return entity.entity_type, content
def _replica_contents(self, entity: EntityReference) -> JSON:
return ChainMap(self.bundle.entities, self.bundle.orphans)[entity]

def _convert_entity_type(self, entity_type: str) -> str:
assert entity_type == 'bundle' or entity_type.startswith('anvil_'), entity_type
Expand Down Expand Up @@ -507,14 +506,25 @@ def _duos(self, dataset: EntityReference) -> MutableJSON:
return self._entity(dataset, self._duos_types())

def _is_duos(self, dataset: EntityReference) -> bool:
return 'duos_id' in self.bundle.entities[dataset]
try:
contents = self.bundle.entities[dataset]
except KeyError:
return False
else:
return 'duos_id' in contents

def _dataset(self, dataset: EntityReference) -> MutableJSON:
if self._is_duos(dataset):
return self._duos(dataset)
else:
return super()._dataset(dataset)

def _replica_type(self, entity: EntityReference) -> str:
if entity.entity_type == 'anvil_dataset' and self._is_duos(entity):
return 'duos_dataset_registration'
else:
return super()._replica_type(entity)

def _list_entities(self) -> Iterable[EntityReference]:
# Suppress contributions for bundles that only contain orphans
if self.bundle.entities:
Expand Down
12 changes: 8 additions & 4 deletions src/azul/plugins/metadata/hca/indexer/transform.py
Original file line number Diff line number Diff line change
Expand Up @@ -497,13 +497,12 @@ def aggregator(cls, entity_type: EntityType) -> EntityAggregator | None:
agg_cls = SimpleAggregator
return agg_cls(entity_type)

def _replicate(self, entity: EntityReference) -> tuple[str, JSON]:
def _replica_contents(self, entity: EntityReference) -> JSON:
if entity == self.api_bundle.ref:
content = self.bundle.links
return self.bundle.links
else:
api_entity = self.api_bundle.entities[UUID(entity.entity_id)]
content = api_entity.json
return entity.entity_type, content
return api_entity.json

def _find_ancestor_samples(self,
entity: api.LinkedEntity,
Expand Down Expand Up @@ -1383,6 +1382,8 @@ def visit(self, entity: api.Entity) -> None:

@property
def entities(self) -> Iterable[EntityReference]:
# FIXME: Some replicas are still missing for HCA
# https://github.com/DataBiosphere/azul/issues/6597
for entity_dict in vars(self).values():
for entity in entity_dict.values():
yield EntityReference(entity_type=entity.schema_name,
Expand Down Expand Up @@ -1744,6 +1745,9 @@ def entity_type(cls) -> str:

class BundleTransformer(SingletonTransformer):

# FIXME: Some replicas are still missing for HCA
# https://github.com/DataBiosphere/azul/issues/6597

def _singleton_entity(self) -> DatedEntity:
return BundleAsEntity(self.api_bundle)

Expand Down
6 changes: 5 additions & 1 deletion src/azul/plugins/repository/tdr_anvil/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -491,7 +491,11 @@ def _duos_bundle(self, bundle_fqid: TDRAnvilBundleFQID) -> TDRAnvilBundle:
self.datarepo_row_uuid_version)
assert ref.entity_id == expected_entity_id, (ref, bundle_fqid)
bundle = TDRAnvilBundle(fqid=bundle_fqid)
entity_row = {'duos_id': duos_id, 'description': description}
entity_row = {
'duos_id': duos_id,
'description': description,
'dataset_id': row['dataset_id']
}
bundle.add_entity(ref, self._version, entity_row)
# Classify as orphan to suppress the emission of a contribution
bundle.add_entity(ref, self._version, dict(row), is_orphan=True)
Expand Down
34 changes: 13 additions & 21 deletions src/azul/service/avro_pfb.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ def add_doc(self, doc: JSON):
for entity in sorted(entities, key=itemgetter('document_id')):
if entity_type != self.entity_type:
_inject_reference_handover_values(entity, doc)
pfb_entity = PFBEntity.from_json(name=entity_type,
object_=entity,
schema=self.schema)
pfb_entity = PFBEntity.for_aggregate(name=entity_type,
object_=entity,
schema=self.schema)
if pfb_entity not in self._entities:
self._entities[pfb_entity] = set()
file_relations.add(PFBRelation.to_entity(pfb_entity))
Expand All @@ -138,9 +138,9 @@ def add_doc(self, doc: JSON):
for entity in chain([file_entity], related_files):
_inject_reference_handover_values(entity, doc)
# File entities are assumed to be unique
pfb_entity = PFBEntity.from_json(name=self.entity_type,
object_=entity,
schema=self.schema)
pfb_entity = PFBEntity.for_aggregate(name=self.entity_type,
object_=entity,
schema=self.schema)
assert pfb_entity not in self._entities
# Terra streams PFBs and requires entities be defined before they are
# referenced. Thus we add the file entity after all the entities
Expand Down Expand Up @@ -174,11 +174,11 @@ def __attrs_post_init__(self):
reject(len(self.id) > 254, 'Terra requires IDs be no longer than 254 chars', )

@classmethod
def from_json(cls,
name: str,
object_: MutableJSON,
schema: JSON
) -> Self:
def for_aggregate(cls,
name: str,
object_: MutableJSON,
schema: JSON
) -> Self:
"""
Derive ID from object in a reproducible way so that we can distinguish
entities by comparing their IDs.
Expand All @@ -193,17 +193,9 @@ def from_json(cls,
return cls(id=id_, name=name, object=object_)

@classmethod
def for_replica(cls, replica: MutableJSON, schema: JSON) -> Self:
def for_replica(cls, id: str, replica: MutableJSON) -> Self:
name, object_ = replica['replica_type'], replica['contents']
cls._add_missing_fields(name, object_, schema)
# Note that it is possible for two distinct replicas to have the same
# entity ID. For example, replicas representing the DUOS registration
# of AnVIL datasets have the same ID as the replica for the dataset
# itself. Terra appears to combine PFB entities with the same ID
# into a single row.
# FIXME: Improve handling of DUOS replicas
# https://github.com/DataBiosphere/azul/issues/6139
return cls(id=replica['entity_id'], name=name, object=object_)
return cls(id=id, name=name, object=object_)

@classmethod
def _add_missing_fields(cls, name: str, object_: MutableJSON, schema):
Expand Down
6 changes: 4 additions & 2 deletions src/azul/service/manifest_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -2139,7 +2139,8 @@ def format(cls) -> ManifestFormat:

def create_file(self) -> tuple[str, str | None]:
replicas = list(self._all_replicas())
replica_schemas = self.metadata_plugin.verbatim_pfb_schema(replicas)
plugin = self.metadata_plugin
replica_schemas = plugin.verbatim_pfb_schema(replicas)
# Ensure field order is consistent for unit tests
replica_schemas.sort(key=itemgetter('name'))
replica_types = [s['name'] for s in replica_schemas]
Expand All @@ -2149,7 +2150,8 @@ def create_file(self) -> tuple[str, str | None]:
def pfb_entities():
yield pfb_metadata_entity
for replica in replicas:
yield avro_pfb.PFBEntity.for_replica(dict(replica), pfb_schema).to_json(())
id = plugin.verbatim_pfb_entity_id(replica)
yield avro_pfb.PFBEntity.for_replica(id, dict(replica)).to_json(())

fd, path = mkstemp(suffix=f'.{self.file_name_extension()}')
os.close(fd)
Expand Down

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions test/indexer/test_anvil.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,8 @@ def test_dataset_description(self):
# These fields are populated only in the DUOS bundle
self.assertEqual('Study description from DUOS', contents['description'])
self.assertEqual('DUOS-000000', contents['duos_id'])
# This field is present in both bundles
self.assertEqual('52ee7665-7033-63f2-a8d9-ce8e32666739', contents['dataset_id'])
else:
self.fail(qualifier)
self.assertDictEqual(doc_counts, {
Expand Down
Loading

0 comments on commit 033d2ff

Please sign in to comment.