Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 7 additions & 18 deletions nmdc_runtime/api/core/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import pandas as pd
import pandas as pds
from fastapi import HTTPException
from jsonschema import Draft7Validator
from linkml_runtime.utils.schemaview import SchemaView
from nmdc_schema import nmdc
from nmdc_schema.nmdc_data import get_nmdc_schema_definition
Expand All @@ -21,7 +20,7 @@
from toolz.dicttoolz import dissoc, assoc_in, get_in

from nmdc_runtime.api.models.metadata import ChangesheetIn
from nmdc_runtime.util import get_nmdc_jsonschema_dict, collection_name_to_class_names
from nmdc_runtime.util import collection_name_to_class_names, get_nmdc_schema_validator

# custom named tuple to hold path property information
SchemaPathProperties = namedtuple(
Expand Down Expand Up @@ -687,40 +686,30 @@ def update_mongo_db(mdb: MongoDatabase, update_cmd: Dict):
mdb : MongoDatabase
Mongo database to be updated.
update_cmd : Dict
Contians update commands to be executed.
Contains update commands to be executed.

Returns
-------
results: Dict
Information about what was updated in the Mongo database.
"""
results = []
validator_strict = Draft7Validator(get_nmdc_jsonschema_dict())
validator_noidpatterns = Draft7Validator(
get_nmdc_jsonschema_dict(enforce_id_patterns=False)
)

validator = get_nmdc_schema_validator()
for id_, update_cmd_doc in update_cmd.items():
collection_name = update_cmd_doc["update"]
doc_before = dissoc(mdb[collection_name].find_one({"id": id_}), "_id")
update_result = json.loads(bson_dumps(mdb.command(update_cmd_doc)))
doc_after = dissoc(mdb[collection_name].find_one({"id": id_}), "_id")
if collection_name in {
"study_set",
"biosample_set",
"omics_processing_set",
} and id_.split(":")[0] in {"gold", "emsl", "igsn"}:
validator = validator_noidpatterns
else:
validator = validator_strict
errors = list(validator.iter_errors({collection_name: [doc_after]}))
report = validator.validate(
{collection_name: [doc_after]}, target_class="Database"
)
results.append(
{
"id": id_,
"doc_before": doc_before,
"update_info": update_result,
"doc_after": doc_after,
"validation_errors": [e.message for e in errors],
"validation_errors": [e.message for e in report.results],
}
)

Expand Down
9 changes: 5 additions & 4 deletions nmdc_runtime/api/db/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from uuid import uuid4

import bson
from jsonschema import Draft7Validator
from nmdc_schema.nmdc import Database as NMDCDatabase
from pymongo.errors import AutoReconnect, OperationFailure
from refscan.lib.Finder import Finder
Expand All @@ -22,9 +21,9 @@
nmdc_schema_view,
collection_name_to_class_names,
ensure_unique_id_indexes,
get_nmdc_jsonschema_dict,
nmdc_database_collection_names,
get_allowed_references,
get_nmdc_schema_validator,
)
from pymongo import MongoClient
from pymongo.database import Database as MongoDatabase
Expand Down Expand Up @@ -311,7 +310,7 @@ def validate_json(
the database. In other words, set this to `True` if you want this
function to perform referential integrity checks.
"""
validator = Draft7Validator(get_nmdc_jsonschema_dict())
validator = get_nmdc_schema_validator()
docs = deepcopy(in_docs)
validation_errors = {}

Expand All @@ -333,7 +332,9 @@ def validate_json(
]
continue

errors = list(validator.iter_errors({coll_name: coll_docs}))
errors = list(
validator.iter_results({coll_name: coll_docs}, target_class="Database")
)
validation_errors[coll_name] = [e.message for e in errors]
if coll_docs:
if not isinstance(coll_docs, list):
Expand Down
83 changes: 41 additions & 42 deletions nmdc_runtime/site/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
StringSource,
InitResourceContext,
)
from fastjsonschema import JsonSchemaValueException
from frozendict import frozendict
from linkml_runtime.dumpers import json_dumper
from pydantic import BaseModel, AnyUrl
Expand All @@ -27,7 +26,7 @@
from nmdc_runtime.api.models.operation import ListOperationsResponse
from nmdc_runtime.api.models.util import ListRequest
from nmdc_runtime.site.normalization.gold import normalize_gold_id
from nmdc_runtime.util import unfreeze, nmdc_jsonschema_validator_noidpatterns
from nmdc_runtime.util import unfreeze, get_nmdc_schema_validator
from nmdc_schema import nmdc


Expand Down Expand Up @@ -534,46 +533,46 @@ def add_docs(self, docs, validate=True, replace=True):
"""
TODO: Document this function.
"""
try:
if validate:
nmdc_jsonschema_validator_noidpatterns(docs)
rv = {}
for collection_name, collection_docs in docs.items():
# If `collection_docs` is empty, abort this iteration.
#
# Note: We do this because the `bulk_write` method called below will raise
# an `InvalidOperation` exception if it is passed 0 operations.
#
# Reference: https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write
#
if len(collection_docs) == 0:
continue

rv[collection_name] = self.db[collection_name].bulk_write(
[
(
ReplaceOne({"id": d["id"]}, d, upsert=True)
if replace
else InsertOne(d)
)
for d in collection_docs
]
)
now = datetime.now(timezone.utc)
self.db.txn_log.insert_many(
[
{
"tgt": {"id": d.get("id"), "c": collection_name},
"type": "upsert",
"ts": now,
# "dtl": {},
}
for d in collection_docs
]
)
return rv
except JsonSchemaValueException as e:
raise ValueError(e.message)
if validate:
validator = get_nmdc_schema_validator()
# Fail fast on first validation error.
for result in validator.iter_results(docs, target_class="Database"):
raise ValueError(result.message)
rv = {}
for collection_name, collection_docs in docs.items():
# If `collection_docs` is empty, abort this iteration.
#
# Note: We do this because the `bulk_write` method called below will raise
# an `InvalidOperation` exception if it is passed 0 operations.
#
# Reference: https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.bulk_write
#
if len(collection_docs) == 0:
continue

rv[collection_name] = self.db[collection_name].bulk_write(
[
(
ReplaceOne({"id": d["id"]}, d, upsert=True)
if replace
else InsertOne(d)
)
for d in collection_docs
]
)
now = datetime.now(timezone.utc)
self.db.txn_log.insert_many(
[
{
"tgt": {"id": d.get("id"), "c": collection_name},
"type": "upsert",
"ts": now,
# "dtl": {},
}
for d in collection_docs
]
)
return rv


@resource(
Expand Down
13 changes: 6 additions & 7 deletions nmdc_runtime/site/validation/util.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
from dagster import op, AssetMaterialization, AssetKey, MetadataValue
from jsonschema import Draft7Validator
from nmdc_runtime.util import get_nmdc_jsonschema_dict
from nmdc_runtime.util import get_nmdc_schema_validator
from toolz import dissoc

from nmdc_runtime.site.resources import mongo_resource
Expand Down Expand Up @@ -61,19 +60,19 @@ def validate_mongo_collection(context, collection_name: str):
collection = mongo_db[collection_name] # get mongo collection
db_set = collection_name.split(".")[0]

validator = Draft7Validator(get_nmdc_jsonschema_dict())
validator = get_nmdc_schema_validator()
validation_errors = []

for count, doc in enumerate(collection.find()):
# add logging for progress?
# e.g.: if count % 1000 == 0: context.log.info(“done X of Y")
doc = dissoc(doc, "_id") # dissoc _id
errors = list(validator.iter_errors({f"{db_set}": [doc]}))
if len(errors) > 0:
report = validator.validate({f"{db_set}": [doc]}, target_class="Database")
if len(report.results) > 0:
if "id" in doc.keys():
errors = {doc["id"]: [e.message for e in errors]}
errors = {doc["id"]: [r.message for r in report.results]}
else:
errors = {f"missing id ({count})": [e.message for e in errors]}
errors = {f"missing id ({count})": [r.message for r in report.results]}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think in the validator plugin architecture, I can specify low-level warnings and info, right? these still count as errors here?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, individual validation results can have various severities (info, warning, error, fatal). In practice the two validation plugins we're using only ever produce results with the error severity level.

validation_errors.append(errors)

return {"collection_name": collection_name, "errors": validation_errors}
Expand Down
63 changes: 32 additions & 31 deletions nmdc_runtime/util.py
Original file line number Diff line number Diff line change
@@ -1,22 +1,22 @@
import importlib.resources
import json
import mimetypes
import os
import pkgutil
from collections import defaultdict
from collections.abc import Iterable
from copy import deepcopy
from datetime import datetime, timezone
from functools import lru_cache
from io import BytesIO
from itertools import chain
from pathlib import Path
from typing import Callable, List, Optional, Set, Dict

import fastjsonschema
import requests
from bson.son import SON
from frozendict import frozendict
from linkml.validator import Validator
from linkml.validator.plugins import JsonschemaValidationPlugin
from linkml_runtime import SchemaView
from nmdc_schema import NmdcSchemaValidationPlugin
from nmdc_schema.get_nmdc_view import ViewGetter
from pymongo.database import Database as MongoDatabase
from pymongo.errors import OperationFailure
Expand Down Expand Up @@ -111,39 +111,23 @@ def get_type_collections() -> dict:
return mappings


def without_id_patterns(nmdc_jsonschema):
# !!! Return the unmodified schema !!!
# This is an experimental change to determine if removing the ID pattern exception would cause
# any issues in practice. If not, we can remove this function and the associated logic.
return nmdc_jsonschema


@lru_cache
def get_nmdc_jsonschema_dict(enforce_id_patterns=True):
"""Get NMDC JSON Schema with materialized patterns (for identifier regexes)."""
d = json.loads(
BytesIO(
pkgutil.get_data("nmdc_schema", "nmdc_materialized_patterns.schema.json")
)
.getvalue()
.decode("utf-8")
)
return d if enforce_id_patterns else without_id_patterns(d)
def get_nmdc_jsonschema_path() -> Path:
"""Get path to NMDC JSON Schema file."""
with importlib.resources.path(
"nmdc_schema", "nmdc_materialized_patterns.schema.json"
) as p:
return p


@lru_cache
def get_nmdc_jsonschema_validator(enforce_id_patterns=True):
return fastjsonschema.compile(
get_nmdc_jsonschema_dict(enforce_id_patterns=enforce_id_patterns)
)
@lru_cache()
def get_nmdc_jsonschema_dict() -> dict:
"""Get NMDC JSON Schema with materialized patterns (for identifier regexes)."""
with open(get_nmdc_jsonschema_path(), "r") as f:
return json.load(f)


nmdc_jsonschema = get_nmdc_jsonschema_dict()
nmdc_jsonschema_validator = get_nmdc_jsonschema_validator()
nmdc_jsonschema_noidpatterns = get_nmdc_jsonschema_dict(enforce_id_patterns=False)
nmdc_jsonschema_validator_noidpatterns = get_nmdc_jsonschema_validator(
enforce_id_patterns=False
)

REPO_ROOT_DIR = Path(__file__).parent.parent

Expand Down Expand Up @@ -312,6 +296,23 @@ def nmdc_schema_view():
return ViewGetter().get_view()


@lru_cache()
def get_nmdc_schema_validator() -> Validator:
schema_view = nmdc_schema_view()
return Validator(
schema_view.schema,
validation_plugins=[
JsonschemaValidationPlugin(
closed=True,
# Since the `nmdc-schema` package exports a pre-built JSON Schema file, use that
# instead of relying on the plugin to generate one on the fly.
json_schema_path=get_nmdc_jsonschema_path(),
),
NmdcSchemaValidationPlugin(),
],
)


@lru_cache
def get_class_name_to_collection_names_map(
schema_view: SchemaView,
Expand Down
1 change: 0 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ dependencies = [
# Note: FastAPI 0.115.0 introduced support for encapsulating request _query_ parameters in Pydantic models.
# Docs: https://fastapi.tiangolo.com/
"fastapi >= 0.115.0",
"fastjsonschema",
"frozendict",
"git-root",
"jq",
Expand Down
Loading