Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions build.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,3 @@
for schema_file_path in schemas_file_paths:
# Step 3 - translate and build each openMINDS schema as JSON-Schema
JSONSchemaBuilder(schema_file_path, schema_loader.schemas_sources).build()




111 changes: 87 additions & 24 deletions pipeline/translator.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,14 @@
import json
import os.path

from pathlib import Path
from typing import List, Optional, Dict

from pipeline.utils import find_embedded_schema, find_source_schema_path

class JSONSchemaBuilder(object):
property_name = None
required_properties = []

def __init__(self, schema_file_path:str, root_path:str):
_relative_path_without_extension = schema_file_path[len(root_path)+1:].replace(".schema.omi.json", "").split("/")
Expand All @@ -14,21 +20,29 @@ def __init__(self, schema_file_path:str, root_path:str):
def _target_file_without_extension(self) -> str:
return os.path.join(self.version, "/".join(self.relative_path_without_extension))

def _resolve_string_property(self, property) -> Dict:
def _resolve_string_property(self, property: dict) -> Dict:
string_property_spec = {}
if "_formats" in property and property["_formats"]:
if len(property["_formats"]) == 1:
string_property_spec["format"] = property["_formats"][0]
string_property_spec["type"] = property["type"]
if self.property_name not in self.required_properties:
string_property_spec["type"] = [string_property_spec["type"], "null"]
elif len(property["_formats"]) > 1:
string_property_spec["anyOf"] = []
for format in property["_formats"]:
string_property_spec["anyOf"].append({
"type": property["type"],
"format": format
})
if self.property_name not in self.required_properties:
string_property_spec["anyOf"].append({
"type": "null"
})
else:
string_property_spec["type"] = property["type"]
if self.property_name not in self.required_properties:
string_property_spec["type"] = [string_property_spec["type"], "null"]
if "pattern" in property and property["pattern"]:
string_property_spec["pattern"] = property["pattern"]
if "minLength" in property and property["minLength"]:
Expand All @@ -37,8 +51,11 @@ def _resolve_string_property(self, property) -> Dict:
string_property_spec["maxLength"] = property["maxLength"]
return string_property_spec

def _resolve_number_property(self, property) -> Dict:
number_property_spec = {"type": property["type"]}
def _resolve_number_property(self, property: dict) -> Dict:
if self.property_name not in self.required_properties:
number_property_spec = {"type": [property["type"], "null"]}
else:
number_property_spec = {"type": property["type"]}
if "multipleOf" in property and property["multipleOf"]:
number_property_spec["multipleOf"] = property["multipleOf"]
if "minimum" in property and property["minimum"]:
Expand All @@ -47,37 +64,73 @@ def _resolve_number_property(self, property) -> Dict:
number_property_spec["maximum"] = property["maximum"]
return number_property_spec

def _resolve_object_property(self, property):
def _resolve_embedded_schema(self, embedded_type:str):
"""
Retrieves the embedded JSON Schema for the given type.

If a pre-generated embedded schema is found, it is returned directly.
Otherwise, the function attempts to locate the source schema and generates the embedded schema file.
"""
embedded = find_embedded_schema(self.version, embedded_type.split('/')[-1])
if embedded:
return embedded
else:
embedded_path = find_source_schema_path(self.version, embedded_type.split('/')[-1])
if embedded_path:
return JSONSchemaBuilder(embedded_path, os.path.join(os.path.realpath("."), "sources", "schemas")).build()
return None

def _resolve_object_property(self, property: dict):
object_property_spec = {}
if "_embeddedTypes" in property and property["_embeddedTypes"]:
if len(property["_embeddedTypes"]) == 1:
object_property_spec["type"] = "object"
object_property_spec["$ref"] = f"{property['_embeddedTypes'][0]}?format=json-schema"
embedded_schema = self._resolve_embedded_schema(property['_embeddedTypes'][0])
del embedded_schema['$schema']
object_property_spec = embedded_schema
elif len(property["_embeddedTypes"]) > 1:
object_property_spec["anyOf"] = []
for embedded_type in property["_embeddedTypes"]:
embedded_schema = self._resolve_embedded_schema(embedded_type)
del embedded_schema['$schema']
object_property_spec["anyOf"].append(embedded_schema)

if self.property_name not in self.required_properties:
object_property_spec["anyOf"].append({
"type": "object",
"$ref": f"{embedded_type}?format=json-schema"
"type": "null"
})
elif "_linkedTypes" in property and property["_linkedTypes"]:
object_property_spec["type"] = "object"
object_property_spec["if"] = {
"required": ["@type"]
}
object_property_spec["then"] = {
"properties": {
"@id": {
"type": "string",
"format": "iri"
},
"@type": {
"type": "string",
"format": "iri",
"enum": property["_linkedTypes"]
}},
"required": ["@id"]
}
if self.property_name not in self.required_properties:
object_property_spec["then"] = {
"properties": {
"@id": {
"type": "string",
"format": "iri"
},
"@type": {
"type": ["string", "null"],
"format": "iri",
"enum": property["_linkedTypes"] + [ "null" ]
}},
"required": ["@id"]
}
else:
object_property_spec["then"] = {
"properties": {
"@id": {
"type": "string",
"format": "iri"
},
"@type": {
"type": "string",
"format": "iri",
"enum": property["_linkedTypes"]
}},
"required": ["@id"]
}
object_property_spec["else"] = {
"properties": {
"@id": {
Expand All @@ -92,6 +145,8 @@ def _resolve_object_property(self, property):

def _resolve_array_property(self, property):
array_property_spec = {"type": property["type"]}
if self.property_name not in self.required_properties:
array_property_spec["type"] = [property["type"], "null"]
if "items" in property and property["items"]:
if "type" in property["items"] and property["items"]["type"]:
if property["items"]["type"] == "string":
Expand All @@ -114,7 +169,7 @@ def _resolve_array_property(self, property):
def _translate_property_specifications(self, property) -> Dict:
self._translated_prop_spec = {}

prop_description = property["description"] if "description" in property else None
prop_description = property["description"] if "description" in property else "null"
self._translated_prop_spec["description"] = prop_description

self._translated_prop_spec["name"] = property["name"]
Expand All @@ -137,9 +192,10 @@ def _translate_property_specifications(self, property) -> Dict:
def translate(self):
# set required properties
required_prop = self._schema_payload["required"] if "required" in self._schema_payload else []
self.required_properties = required_prop
required_prop.extend(["@id", "@type"])
# set description
description = self._schema_payload['description'] if "description" in self._schema_payload else None
description = self._schema_payload['description'] if "description" in self._schema_payload else "null"

self._translated_schema = {
"$id": f"{self._schema_payload['_type']}?format=json-schema",
Expand All @@ -163,13 +219,20 @@ def translate(self):

if "properties" in self._schema_payload and self._schema_payload["properties"]:
for prop_name, prop_spec in self._schema_payload["properties"].items():
self.property_name = prop_name
self._translated_schema["properties"][prop_name] = self._translate_property_specifications(prop_spec)

def build(self):
target_file = os.path.join("target", "schemas", f"{self._target_file_without_extension()}.schema.json")
path_target_file = Path(target_file)
if path_target_file.exists():
return json.loads(path_target_file.read_text())

os.makedirs(os.path.dirname(target_file), exist_ok=True)

self.translate()

with open(target_file, "w") as target_file:
target_file.write(json.dumps(self._translated_schema, indent=2, sort_keys=True))
target_file.write(json.dumps(self._translated_schema, indent=2, sort_keys=True))

return self._translated_schema
28 changes: 28 additions & 0 deletions pipeline/utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
import glob
import json
import os
import shutil
from pathlib import Path
from typing import List

from git import Repo, GitCommandError
Expand All @@ -10,6 +12,32 @@ def clone_sources():
shutil.rmtree("sources")
Repo.clone_from("https://github.com/openMetadataInitiative/openMINDS.git", to_path="sources", depth=1)

def find_embedded_schema(version, class_name):
"""
Searches a returns a pre-generated (embedded) JSON Schema file for a given class name and version.
"""
directory = Path(f"./target/schemas/{version}/")
camel_case = class_name[:1].lower() + class_name[1:]

# Search for exact filename matches
for case in [camel_case, class_name]:
for file_path in directory.glob(f"**/{case}.schema.json"):
return json.loads(file_path.read_text())
return None

def find_source_schema_path(version, class_name):
"""
Locates the source JSON Schema file path for a given class name and version.
"""
directory = Path(f"./sources/schemas/{version}/")
camel_case = class_name[:1].lower() + class_name[1:]

# Search for exact filename matches
for case in [camel_case, class_name]:
for file_path in directory.glob(f"**/{case}.schema.omi.json"):
return os.path.abspath(file_path)
return None

class SchemaLoader(object):

def __init__(self):
Expand Down