Skip to content

Commit 5801cd8

Browse files
lazebnyioctavia-squidington-iii
andauthored
feat(low-code cdk): add dynamic schema loader (#104)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent ed9a5e7 commit 5801cd8

File tree

7 files changed

+709
-11
lines changed

7 files changed

+709
-11
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1218,6 +1218,7 @@ definitions:
12181218
title: Schema Loader
12191219
description: Component used to retrieve the schema for the current stream.
12201220
anyOf:
1221+
- "$ref": "#/definitions/DynamicSchemaLoader"
12211222
- "$ref": "#/definitions/InlineSchemaLoader"
12221223
- "$ref": "#/definitions/JsonFileSchemaLoader"
12231224
- "$ref": "#/definitions/CustomSchemaLoader"
@@ -1684,6 +1685,92 @@ definitions:
16841685
$parameters:
16851686
type: object
16861687
additionalProperties: true
1688+
TypesMap:
1689+
title: Types Map
1690+
description: (This component is experimental. Use at your own risk.) Represents a mapping between a current type and its corresponding target type.
1691+
type: object
1692+
required:
1693+
- target_type
1694+
- current_type
1695+
properties:
1696+
target_type:
1697+
anyOf:
1698+
- type: string
1699+
- type: array
1700+
items:
1701+
type: string
1702+
current_type:
1703+
anyOf:
1704+
- type: string
1705+
- type: array
1706+
items:
1707+
type: string
1708+
SchemaTypeIdentifier:
1709+
title: Schema Type Identifier
1710+
description: (This component is experimental. Use at your own risk.) Identifies schema details for dynamic schema extraction and processing.
1711+
type: object
1712+
required:
1713+
- key_pointer
1714+
properties:
1715+
type:
1716+
type: string
1717+
enum: [SchemaTypeIdentifier]
1718+
schema_pointer:
1719+
title: Schema Path
1720+
description: List of nested fields defining the schema field path to extract. Defaults to [].
1721+
type: array
1722+
default: []
1723+
items:
1724+
- type: string
1725+
interpolation_context:
1726+
- config
1727+
key_pointer:
1728+
title: Key Path
1729+
description: List of potentially nested fields describing the full path of the field key to extract.
1730+
type: array
1731+
items:
1732+
- type: string
1733+
interpolation_context:
1734+
- config
1735+
type_pointer:
1736+
title: Type Path
1737+
description: List of potentially nested fields describing the full path of the field type to extract.
1738+
type: array
1739+
items:
1740+
- type: string
1741+
interpolation_context:
1742+
- config
1743+
types_mapping:
1744+
type: array
1745+
items:
1746+
- "$ref": "#/definitions/TypesMap"
1747+
$parameters:
1748+
type: object
1749+
additionalProperties: true
1750+
DynamicSchemaLoader:
1751+
title: Dynamic Schema Loader
1752+
description: (This component is experimental. Use at your own risk.) Loads a schema by extracting data from retrieved records.
1753+
type: object
1754+
required:
1755+
- type
1756+
- retriever
1757+
- schema_type_identifier
1758+
properties:
1759+
type:
1760+
type: string
1761+
enum: [DynamicSchemaLoader]
1762+
retriever:
1763+
title: Retriever
1764+
description: Component used to coordinate how records are extracted across stream slices and request pages.
1765+
anyOf:
1766+
- "$ref": "#/definitions/AsyncRetriever"
1767+
- "$ref": "#/definitions/CustomRetriever"
1768+
- "$ref": "#/definitions/SimpleRetriever"
1769+
schema_type_identifier:
1770+
"$ref": "#/definitions/SchemaTypeIdentifier"
1771+
$parameters:
1772+
type: object
1773+
additionalProperties: true
16871774
InlineSchemaLoader:
16881775
title: Inline Schema Loader
16891776
description: Loads a schema that is defined directly in the manifest file.

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 53 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -650,6 +650,32 @@ class HttpResponseFilter(BaseModel):
650650
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
651651

652652

653+
class TypesMap(BaseModel):
654+
target_type: Union[str, List[str]]
655+
current_type: Union[str, List[str]]
656+
657+
658+
class SchemaTypeIdentifier(BaseModel):
659+
type: Optional[Literal["SchemaTypeIdentifier"]] = None
660+
schema_pointer: Optional[List[str]] = Field(
661+
[],
662+
description="List of nested fields defining the schema field path to extract. Defaults to [].",
663+
title="Schema Path",
664+
)
665+
key_pointer: List[str] = Field(
666+
...,
667+
description="List of potentially nested fields describing the full path of the field key to extract.",
668+
title="Key Path",
669+
)
670+
type_pointer: Optional[List[str]] = Field(
671+
None,
672+
description="List of potentially nested fields describing the full path of the field type to extract.",
673+
title="Type Path",
674+
)
675+
types_mapping: Optional[List[TypesMap]] = None
676+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
677+
678+
653679
class InlineSchemaLoader(BaseModel):
654680
type: Literal["InlineSchemaLoader"]
655681
schema_: Optional[Dict[str, Any]] = Field(
@@ -822,13 +848,13 @@ class Config:
822848
)
823849
extract_output: List[str] = Field(
824850
...,
825-
description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config. ",
851+
description="The DeclarativeOAuth Specific list of strings to indicate which keys should be extracted and returned back to the input config.",
826852
examples=[{"extract_output": ["access_token", "refresh_token", "other_field"]}],
827853
title="DeclarativeOAuth Extract Output",
828854
)
829855
state: Optional[State] = Field(
830856
None,
831-
description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity. ",
857+
description="The DeclarativeOAuth Specific object to provide the criteria of how the `state` query param should be constructed,\nincluding length and complexity.",
832858
examples=[{"state": {"min": 7, "max": 128}}],
833859
title="(Optional) DeclarativeOAuth Configurable State Query Param",
834860
)
@@ -852,13 +878,13 @@ class Config:
852878
)
853879
state_key: Optional[str] = Field(
854880
None,
855-
description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider. ",
881+
description="The DeclarativeOAuth Specific optional override to provide the custom `state` key name, if required by data-provider.",
856882
examples=[{"state_key": "my_custom_state_key_key_name"}],
857883
title="(Optional) DeclarativeOAuth State Key Override",
858884
)
859885
auth_code_key: Optional[str] = Field(
860886
None,
861-
description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider. ",
887+
description="The DeclarativeOAuth Specific optional override to provide the custom `code` key name to something like `auth_code` or `custom_auth_code`, if required by data-provider.",
862888
examples=[{"auth_code_key": "my_custom_auth_code_key_name"}],
863889
title="(Optional) DeclarativeOAuth Auth Code Key Override",
864890
)
@@ -1609,12 +1635,17 @@ class Config:
16091635
primary_key: Optional[PrimaryKey] = Field(
16101636
"", description="The primary key of the stream.", title="Primary Key"
16111637
)
1612-
schema_loader: Optional[Union[InlineSchemaLoader, JsonFileSchemaLoader, CustomSchemaLoader]] = (
1613-
Field(
1614-
None,
1615-
description="Component used to retrieve the schema for the current stream.",
1616-
title="Schema Loader",
1617-
)
1638+
schema_loader: Optional[
1639+
Union[
1640+
DynamicSchemaLoader,
1641+
InlineSchemaLoader,
1642+
JsonFileSchemaLoader,
1643+
CustomSchemaLoader,
1644+
]
1645+
] = Field(
1646+
None,
1647+
description="Component used to retrieve the schema for the current stream.",
1648+
title="Schema Loader",
16181649
)
16191650
transformations: Optional[
16201651
List[Union[AddFields, CustomTransformation, RemoveFields, KeysToLower]]
@@ -1774,6 +1805,17 @@ class HttpRequester(BaseModel):
17741805
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
17751806

17761807

1808+
class DynamicSchemaLoader(BaseModel):
1809+
type: Literal["DynamicSchemaLoader"]
1810+
retriever: Union[AsyncRetriever, CustomRetriever, SimpleRetriever] = Field(
1811+
...,
1812+
description="Component used to coordinate how records are extracted across stream slices and request pages.",
1813+
title="Retriever",
1814+
)
1815+
schema_type_identifier: SchemaTypeIdentifier
1816+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
1817+
1818+
17771819
class ParentStreamConfig(BaseModel):
17781820
type: Literal["ParentStreamConfig"]
17791821
parent_key: str = Field(
@@ -1981,5 +2023,6 @@ class DynamicDeclarativeStream(BaseModel):
19812023
SelectiveAuthenticator.update_forward_refs()
19822024
DeclarativeStream.update_forward_refs()
19832025
SessionTokenAuthenticator.update_forward_refs()
2026+
DynamicSchemaLoader.update_forward_refs()
19842027
SimpleRetriever.update_forward_refs()
19852028
AsyncRetriever.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,10 @@
6464
"AddFields.fields": "AddedFieldDefinition",
6565
# CustomPartitionRouter
6666
"CustomPartitionRouter.parent_stream_configs": "ParentStreamConfig",
67+
# DynamicSchemaLoader
68+
"DynamicSchemaLoader.retriever": "SimpleRetriever",
69+
# SchemaTypeIdentifier
70+
"SchemaTypeIdentifier.types_map": "TypesMap",
6771
}
6872

6973
# We retain a separate registry for custom components to automatically insert the type if it is missing. This is intended to

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -188,6 +188,9 @@
188188
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
189189
DpathExtractor as DpathExtractorModel,
190190
)
191+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
192+
DynamicSchemaLoader as DynamicSchemaLoaderModel,
193+
)
191194
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
192195
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
193196
)
@@ -278,6 +281,9 @@
278281
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
279282
ResponseToFileExtractor as ResponseToFileExtractorModel,
280283
)
284+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
285+
SchemaTypeIdentifier as SchemaTypeIdentifierModel,
286+
)
281287
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
282288
SelectiveAuthenticator as SelectiveAuthenticatorModel,
283289
)
@@ -291,6 +297,9 @@
291297
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
292298
SubstreamPartitionRouter as SubstreamPartitionRouterModel,
293299
)
300+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
301+
TypesMap as TypesMapModel,
302+
)
294303
from airbyte_cdk.sources.declarative.models.declarative_component_schema import ValueType
295304
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
296305
WaitTimeFromHeader as WaitTimeFromHeaderModel,
@@ -356,8 +365,11 @@
356365
)
357366
from airbyte_cdk.sources.declarative.schema import (
358367
DefaultSchemaLoader,
368+
DynamicSchemaLoader,
359369
InlineSchemaLoader,
360370
JsonFileSchemaLoader,
371+
SchemaTypeIdentifier,
372+
TypesMap,
361373
)
362374
from airbyte_cdk.sources.declarative.spec import Spec
363375
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
@@ -455,6 +467,9 @@ def _init_mappings(self) -> None:
455467
IterableDecoderModel: self.create_iterable_decoder,
456468
XmlDecoderModel: self.create_xml_decoder,
457469
JsonFileSchemaLoaderModel: self.create_json_file_schema_loader,
470+
DynamicSchemaLoaderModel: self.create_dynamic_schema_loader,
471+
SchemaTypeIdentifierModel: self.create_schema_type_identifier,
472+
TypesMapModel: self.create_types_map,
458473
JwtAuthenticatorModel: self.create_jwt_authenticator,
459474
LegacyToPerPartitionStateMigrationModel: self.create_legacy_to_per_partition_state_migration,
460475
ListPartitionRouterModel: self.create_list_partition_router,
@@ -1574,6 +1589,63 @@ def create_inline_schema_loader(
15741589
) -> InlineSchemaLoader:
15751590
return InlineSchemaLoader(schema=model.schema_ or {}, parameters={})
15761591

1592+
@staticmethod
1593+
def create_types_map(model: TypesMapModel, **kwargs: Any) -> TypesMap:
1594+
return TypesMap(target_type=model.target_type, current_type=model.current_type)
1595+
1596+
def create_schema_type_identifier(
1597+
self, model: SchemaTypeIdentifierModel, config: Config, **kwargs: Any
1598+
) -> SchemaTypeIdentifier:
1599+
types_mapping = []
1600+
if model.types_mapping:
1601+
types_mapping.extend(
1602+
[
1603+
self._create_component_from_model(types_map, config=config)
1604+
for types_map in model.types_mapping
1605+
]
1606+
)
1607+
model_schema_pointer: List[Union[InterpolatedString, str]] = (
1608+
[x for x in model.schema_pointer] if model.schema_pointer else []
1609+
)
1610+
model_key_pointer: List[Union[InterpolatedString, str]] = [x for x in model.key_pointer]
1611+
model_type_pointer: Optional[List[Union[InterpolatedString, str]]] = (
1612+
[x for x in model.type_pointer] if model.type_pointer else None
1613+
)
1614+
1615+
return SchemaTypeIdentifier(
1616+
schema_pointer=model_schema_pointer,
1617+
key_pointer=model_key_pointer,
1618+
type_pointer=model_type_pointer,
1619+
types_mapping=types_mapping,
1620+
parameters=model.parameters or {},
1621+
)
1622+
1623+
def create_dynamic_schema_loader(
1624+
self, model: DynamicSchemaLoaderModel, config: Config, **kwargs: Any
1625+
) -> DynamicSchemaLoader:
1626+
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
1627+
combined_slicers = self._build_resumable_cursor_from_paginator(
1628+
model.retriever, stream_slicer
1629+
)
1630+
1631+
retriever = self._create_component_from_model(
1632+
model=model.retriever,
1633+
config=config,
1634+
name="",
1635+
primary_key=None,
1636+
stream_slicer=combined_slicers,
1637+
transformations=[],
1638+
)
1639+
schema_type_identifier = self._create_component_from_model(
1640+
model.schema_type_identifier, config=config, parameters=model.parameters or {}
1641+
)
1642+
return DynamicSchemaLoader(
1643+
retriever=retriever,
1644+
config=config,
1645+
schema_type_identifier=schema_type_identifier,
1646+
parameters=model.parameters or {},
1647+
)
1648+
15771649
@staticmethod
15781650
def create_json_decoder(model: JsonDecoderModel, config: Config, **kwargs: Any) -> JsonDecoder:
15791651
return JsonDecoder(parameters={})

airbyte_cdk/sources/declarative/schema/__init__.py

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,5 +6,6 @@
66
from airbyte_cdk.sources.declarative.schema.inline_schema_loader import InlineSchemaLoader
77
from airbyte_cdk.sources.declarative.schema.json_file_schema_loader import JsonFileSchemaLoader
88
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
9+
from airbyte_cdk.sources.declarative.schema.dynamic_schema_loader import DynamicSchemaLoader, TypesMap, SchemaTypeIdentifier
910

10-
__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader"]
11+
__all__ = ["JsonFileSchemaLoader", "DefaultSchemaLoader", "SchemaLoader", "InlineSchemaLoader", "DynamicSchemaLoader", "TypesMap", "SchemaTypeIdentifier"]

0 commit comments

Comments
 (0)