diff --git a/onedm/sdf/common.py b/onedm/sdf/common.py index 94e9e3b..cdd0725 100644 --- a/onedm/sdf/common.py +++ b/onedm/sdf/common.py @@ -5,7 +5,9 @@ class CommonQualities(BaseModel): - model_config = ConfigDict(extra="allow", alias_generator=to_camel) + model_config = ConfigDict( + extra="allow", alias_generator=to_camel, populate_by_name=True + ) label: str | None = None description: str | None = None diff --git a/onedm/sdf/data.py b/onedm/sdf/data.py index a17b45e..7cb56c5 100644 --- a/onedm/sdf/data.py +++ b/onedm/sdf/data.py @@ -9,6 +9,7 @@ import datetime from abc import ABC from enum import EnumMeta, IntEnum +from re import Pattern from typing import Annotated, Any, Literal, Union from pydantic import Field, NonNegativeInt, field_serializer @@ -187,7 +188,7 @@ class StringData(DataQualities): enum: list[str] | None = None min_length: NonNegativeInt = 0 max_length: NonNegativeInt | None = None - pattern: str | None = None + pattern: str | Pattern[str] | None = None format: str | None = None content_format: str | None = None choices: Annotated[dict[str, StringData] | None, Field(alias="sdfChoice")] = ( @@ -226,7 +227,7 @@ def _get_base_schema(self) -> core_schema.CoreSchema: class ArrayData(DataQualities): type: Literal["array"] = "array" - items: Data + items: Data | None = None min_items: NonNegativeInt = 0 max_items: NonNegativeInt | None = None unique_items: bool = False @@ -240,12 +241,12 @@ def always_include_type(self, type: str, _): def _get_base_schema(self) -> core_schema.ListSchema | core_schema.SetSchema: if self.unique_items: return core_schema.set_schema( - self.items.get_pydantic_schema(), + self.items.get_pydantic_schema() if self.items is not None else None, min_length=self.min_items, max_length=self.max_items, ) return core_schema.list_schema( - self.items.get_pydantic_schema(), + self.items.get_pydantic_schema() if self.items is not None else None, min_length=self.min_items, max_length=self.max_items, ) @@ -253,7 +254,7 @@ def _get_base_schema(self) -> core_schema.ListSchema | core_schema.SetSchema: class ObjectData(DataQualities): type: Literal["object"] = "object" - properties: dict[str, Data] + properties: dict[str, Data] | None = None required: list[str] = Field(default_factory=list) const: dict[str, Any] | None = None default: dict[str, Any] | None = None @@ -262,7 +263,9 @@ class ObjectData(DataQualities): def always_include_type(self, type: str, _): return type - def _get_base_schema(self) -> core_schema.TypedDictSchema: + def _get_base_schema(self) -> core_schema.CoreSchema: + if self.properties is None: + return core_schema.dict_schema() required = self.required or [] fields = { name: core_schema.typed_dict_field( diff --git a/onedm/sdf/from_type.py b/onedm/sdf/from_type.py index 8d29162..225f1d2 100644 --- a/onedm/sdf/from_type.py +++ b/onedm/sdf/from_type.py @@ -1,32 +1,252 @@ """Conversion from native types to sdfData.""" -from enum import Enum from typing import Type from pydantic import TypeAdapter +from pydantic_core import core_schema -from .data import Data, IntegerData -from .json_schema import from_json_schema +from . import data -def data_from_type(type_: Type) -> Data | None: +def data_from_type(type_: Type) -> data.Data | None: """Create from a native Python or Pydantic type. None or null is not a supported type in SDF. In this case the return value will be None. """ - schema = TypeAdapter(type_).json_schema() + return data_from_schema(TypeAdapter(type_).core_schema) - if schema.get("type") == "null": - # Null types not supported + +def data_from_schema(schema: core_schema.CoreSchema) -> data.Data | None: + schema_type = schema["type"] + data_type: data.Data + if schema_type == "none": return None + if schema_type == "int": + data_type = data_from_int_schema(schema) # type: ignore + elif schema_type == "float": + data_type = data_from_float_schema(schema) # type: ignore + elif schema_type == "bool": + data_type = data_from_bool_schema(schema) # type: ignore + elif schema_type == "str": + data_type = data_from_str_schema(schema) # type: ignore + elif schema_type == "bytes": + data_type = data_from_bytes_schema(schema) # type: ignore + elif schema_type == "model": + data_type = data_from_model_schema(schema) # type: ignore + elif schema_type == "model-fields": + data_type = data_from_model_fields_schema(schema) # type: ignore + elif schema_type == "dataclass": + data_type = data_from_dataclass_schema(schema) # type: ignore + elif schema_type == "list": + data_type = data_from_list_schema(schema) # type: ignore + elif schema_type == "set": + data_type = data_from_set_schema(schema) # type: ignore + elif schema_type == "dict": + data_type = data_from_dict_schema(schema) # type: ignore + elif schema_type == "typed-dict": + data_type = data_from_typed_dict_schema(schema) # type: ignore + elif schema_type == "enum": + data_type = data_from_enum_schema(schema) # type: ignore + elif schema_type == "literal": + data_type = data_from_literal_schema(schema) # type: ignore + elif schema_type == "any": + data_type = data_from_any_schema(schema) # type: ignore + elif schema_type == "nullable": + data_type = data_from_nullable_schema(schema) # type: ignore + elif schema_type == "default": + data_type = data_from_default_schema(schema) # type: ignore + elif schema_type == "datetime": + data_type = data_from_datetime_schema(schema) # type: ignore + else: + raise NotImplementedError(f"Unsupported schema '{schema['type']}'") + + # data_type.label = schema["metadata"].get("title") + return data_type + + +def data_from_any_schema(schema: core_schema.AnySchema): + return data.AnyData(nullable=False) + + +def data_from_nullable_schema(schema: core_schema.NullableSchema): + data_type = data_from_schema(schema["schema"]) + data_type.nullable = True + return data_type + + +def data_from_default_schema(schema: core_schema.WithDefaultSchema): + data_type = data_from_schema(schema["schema"]) + data_type.default = schema["default"] + return data_type + + +def data_from_model_schema(schema: core_schema.ModelSchema): + data_type = data_from_schema(schema["schema"]) + return data_type + + +def data_from_model_fields_schema(schema: core_schema.ModelFieldsSchema): + return data.ObjectData( + label=schema.get("model_name"), + properties={ + prop_schema.get("serialization_alias", name): data_from_schema( + prop_schema["schema"] + ) + for name, prop_schema in schema["fields"].items() + }, + nullable=False, + ) + + +def data_from_dataclass_args_schema(schema: core_schema.DataclassArgsSchema): + return data.ObjectData( + properties={ + field.get("serialization_alias", field["name"]): data_from_schema( + field["schema"] + ) + for field in schema["fields"] + }, + nullable=False, + ) + + +def data_from_dataclass_schema(schema: core_schema.DataclassSchema): + return data_from_dataclass_args_schema(schema["schema"]) # type: ignore + + +def data_from_typed_dict_schema(schema: core_schema.TypedDictSchema): + return data.ObjectData( + properties={ + field.get("serialization_alias", name): data_from_schema(field["schema"]) + for name, field in schema["fields"].items() + }, + required=[ + field.get("serialization_alias", name) + for name, field in schema["fields"].items() + if field.get("required", False) + ], + nullable=False, + ) + + +def data_from_list_schema(schema: core_schema.ListSchema): + return data.ArrayData( + items=( + data_from_schema(schema["items_schema"]) + if "items_schema" in schema + else None + ), + min_items=schema.get("min_length", 0), + max_items=schema.get("max_length"), + nullable=False, + ) + + +def data_from_set_schema(schema: core_schema.SetSchema): + return data.ArrayData( + items=( + data_from_schema(schema["items_schema"]) + if "items_schema" in schema + else None + ), + min_items=schema.get("min_length", 0), + max_items=schema.get("max_length"), + unique_items=True, + nullable=False, + ) + + +def data_from_dict_schema(schema: core_schema.DictSchema): + return data.ObjectData(nullable=False) + + +def data_from_int_schema(schema: core_schema.IntSchema): + return data.IntegerData( + minimum=schema.get("ge"), + maximum=schema.get("le"), + exclusive_minimum=schema.get("gt"), + exclusive_maximum=schema.get("lt"), + multiple_of=schema.get("multiple_of"), + nullable=False, + ) + + +def data_from_float_schema(schema: core_schema.FloatSchema): + return data.NumberData( + minimum=schema.get("ge"), + maximum=schema.get("le"), + exclusive_minimum=schema.get("gt"), + exclusive_maximum=schema.get("lt"), + multiple_of=schema.get("multiple_of"), + nullable=False, + ) + + +def data_from_bool_schema(schema: core_schema.BoolSchema): + return data.BooleanData(nullable=False) + + +def data_from_str_schema(schema: core_schema.StringSchema): + return data.StringData( + pattern=schema.get("pattern"), + min_length=schema.get("min_length", 0), + max_length=schema.get("max_length"), + nullable=False, + ) + + +def data_from_bytes_schema(schema: core_schema.BytesSchema): + return data.StringData( + sdf_type="byte-string", + format="bytes", + min_length=schema.get("min_length", 0), + max_length=schema.get("max_length"), + nullable=False, + ) + + +def data_from_literal_schema(schema: core_schema.LiteralSchema): + choices = schema["expected"] + if len(choices) == 1: + return data.AnyData( + const=choices[0], + nullable=False, + ) + if all(isinstance(choice, str) for choice in choices): + return data.StringData( + enum=choices, + nullable=False, + ) + raise NotImplementedError(f"Literal with {choices} not supported") + - data = from_json_schema(schema) +def data_from_enum_schema(schema: core_schema.EnumSchema): + if "sub_type" not in schema: + return data.AnyData( + choices={ + member.name: data.AnyData(const=member.value) + for member in schema["members"] + }, + nullable=False, + ) + if schema["sub_type"] == "int": + return data.IntegerData( + choices={ + member.name: data.IntegerData(const=member.value) + for member in schema["members"] + }, + nullable=False, + ) + if schema["sub_type"] == "str": + return data.StringData( + choices={ + member.name: data.StringData(const=member.value) + for member in schema["members"] + }, + nullable=False, + ) - if isinstance(data, IntegerData) and data.enum and issubclass(type_, Enum): - data.choices = { - member.name: IntegerData(const=member.value) for member in type_ - } - data.enum = None - return data +def data_from_datetime_schema(schema: core_schema.DatetimeSchema): + return data.StringData(nullable=False, format="date-time") diff --git a/onedm/sdf/json_schema.py b/onedm/sdf/json_schema.py deleted file mode 100644 index c2f783a..0000000 --- a/onedm/sdf/json_schema.py +++ /dev/null @@ -1,82 +0,0 @@ -from pydantic import TypeAdapter - -from .data import Data - -DataModel = TypeAdapter[Data](Data) - - -def from_json_schema(definition: dict) -> Data: - definition = process_node(definition, definition) - - return DataModel.validate_python(definition) - - -def process_node(definition: dict, root: dict) -> dict: - if "$ref" in definition: - ref: str = definition.pop("$ref") - # Try to dereference for now, in the future we may want to use - # sdfData to store definitions - fragments: list[str] = ref.split("/") - assert fragments[0] == "#", "Only internal references supported" - referenced = root - for fragment in fragments[1:]: - referenced = referenced[fragment] - definition = {**referenced, **definition} - - if "title" in definition: - # SDF uses label instead of title - definition["label"] = definition.pop("title") - - if "anyOf" in definition: - definition = convert_anyof(definition["anyOf"], root) - else: - # Can't be null - definition["nullable"] = False - - if "enum" in definition: - # Could maybe be replaced with sdfChoice - definition = convert_enum(definition) - - if definition.get("format") == "binary": - definition["sdfType"] = "byte-string" - - if "items" in definition: - definition["items"] = process_node(definition["items"], root) - - if "properties" in definition: - for key, value in definition["properties"].items(): - definition["properties"][key] = process_node(value, root) - - if "$defs" in definition: - # Don't need these anymore - definition.pop("$defs") - - return definition - - -def convert_anyof(anyof: list[dict], root) -> dict: - nullable = False - for option in anyof: - option = process_node(option, root) - if option["type"] == "null": - # Replace this null option with nullable property - nullable = True - anyof.remove(option) - if len(anyof) > 1: - # TODO: Use sdfChoice - raise NotImplementedError("Unions not supported yet") - # Flatten - definition = anyof[0] - definition["nullable"] = nullable - return definition - - -def convert_enum(definition: dict) -> dict: - if len(definition["enum"]) == 1: - # Probably means its a constant - definition["const"] = definition["enum"][0] - del definition["enum"] - return definition - - -__all__ = ["from_json_schema"] diff --git a/tests/sdf/test_from_type.py b/tests/sdf/test_from_type.py new file mode 100644 index 0000000..8b33b74 --- /dev/null +++ b/tests/sdf/test_from_type.py @@ -0,0 +1,163 @@ +from dataclasses import dataclass +import enum +from pydantic import Field, BaseModel +from typing import Annotated, Literal + +import pytest + +from onedm import sdf +from onedm.sdf.from_type import data_from_type + + +def test_integer(): + data = data_from_type(int) + + assert isinstance(data, sdf.IntegerData) + assert not data.nullable + + +def test_float(): + data = data_from_type(float) + + assert isinstance(data, sdf.NumberData) + assert not data.nullable + + +def test_bool(): + data = data_from_type(bool) + + assert isinstance(data, sdf.BooleanData) + assert not data.nullable + + +def test_str(): + data = data_from_type(str) + + assert isinstance(data, sdf.StringData) + assert not data.nullable + + +def test_bytes(): + data = data_from_type(bytes) + + assert isinstance(data, sdf.StringData) + assert data.sdf_type == "byte-string" + assert not data.nullable + + +def test_enum(): + class MyEnum(enum.Enum): + ONE = 1 + TWO = "two" + + data = data_from_type(MyEnum) + + assert isinstance(data, sdf.AnyData) + assert data.choices["ONE"].const == 1 + assert data.choices["TWO"].const == "two" + assert not data.nullable + + +def test_int_enum(): + class MyEnum(enum.IntEnum): + ONE = 1 + TWO = 2 + + data = data_from_type(MyEnum) + + assert isinstance(data, sdf.IntegerData) + assert data.choices["ONE"].const == 1 + assert data.choices["TWO"].const == 2 + assert not data.nullable + + +def test_str_enum(): + class MyEnum(enum.StrEnum): + ONE = "one" + TWO = "two" + + data = data_from_type(MyEnum) + + assert isinstance(data, sdf.StringData) + assert data.choices["ONE"].const == "one" + assert data.choices["TWO"].const == "two" + assert not data.nullable + + +def test_const(): + data = data_from_type(Literal["const"]) + + assert data.const == "const" + + +def test_string_literals(): + data = data_from_type(Literal["one", "two"]) + + assert isinstance(data, sdf.StringData) + assert data.enum == ["one", "two"] + assert not data.nullable + + +def test_nullable(): + data = data_from_type(int | None) + + assert isinstance(data, sdf.IntegerData) + assert data.nullable + + +def test_list(): + data = data_from_type(list[str]) + + assert isinstance(data, sdf.ArrayData) + assert isinstance(data.items, sdf.StringData) + assert not data.unique_items + assert not data.nullable + + +def test_set(): + data = data_from_type(set[str]) + + assert isinstance(data, sdf.ArrayData) + assert isinstance(data.items, sdf.StringData) + assert data.unique_items + assert not data.nullable + + +def test_model(): + class TestModel(BaseModel): + with_default: int = 2 + with_alias: Annotated[int, Field(alias="withAlias")] + + data = data_from_type(TestModel) + + assert isinstance(data, sdf.ObjectData) + assert data.label == "TestModel" + assert not data.nullable + + assert isinstance(data.properties["with_default"], sdf.IntegerData) + assert data.properties["with_default"].default == 2 + assert not data.properties["with_default"].nullable + + assert "withAlias" in data.properties + + +def test_dataclass(): + @dataclass + class TestModel: + with_default: int = 2 + + data = data_from_type(TestModel) + + assert isinstance(data, sdf.ObjectData) + assert not data.nullable + + assert isinstance(data.properties["with_default"], sdf.IntegerData) + assert data.properties["with_default"].default == 2 + assert not data.properties["with_default"].nullable + + +@pytest.mark.xfail(reason="Not implemented") +def test_label(): + data = data_from_type(Annotated[int, Field(title="Test title")]) + + assert data.label == "Test title"