Skip to content

Commit bcfcf04

Browse files
authored
feat(low-code cdk): Allow for multiple schema_loaders to be defined for a stream (#536)
1 parent 8906ed2 commit bcfcf04

File tree

6 files changed

+352
-5
lines changed

6 files changed

+352
-5
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1421,12 +1421,22 @@ definitions:
14211421
default: ""
14221422
schema_loader:
14231423
title: Schema Loader
1424-
description: Component used to retrieve the schema for the current stream.
1424+
description:
1425+
One or many schema loaders can be used to retrieve the schema for the current stream. When
1426+
multiple schema loaders are defined, schema properties will be merged together. Schema
1427+
loaders defined first taking precedence in the event of a conflict.
14251428
anyOf:
14261429
- "$ref": "#/definitions/InlineSchemaLoader"
14271430
- "$ref": "#/definitions/DynamicSchemaLoader"
14281431
- "$ref": "#/definitions/JsonFileSchemaLoader"
14291432
- "$ref": "#/definitions/CustomSchemaLoader"
1433+
- type: array
1434+
items:
1435+
anyOf:
1436+
- "$ref": "#/definitions/InlineSchemaLoader"
1437+
- "$ref": "#/definitions/DynamicSchemaLoader"
1438+
- "$ref": "#/definitions/JsonFileSchemaLoader"
1439+
- "$ref": "#/definitions/CustomSchemaLoader"
14301440
# TODO we have move the transformation to the RecordSelector level in the code but kept this here for
14311441
# compatibility reason. We should eventually move this to align with the code.
14321442
transformations:
@@ -4362,4 +4372,4 @@ interpolation:
43624372
regex: The regular expression to search for. It must include a capture group.
43634373
return_type: str
43644374
examples:
4365-
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'
4375+
- '{{ "goodbye, cruel world" | regex_search("goodbye,\s(.*)$") }} -> "cruel world"'

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -2151,6 +2153,14 @@ class Config:
21512153
DynamicSchemaLoader,
21522154
JsonFileSchemaLoader,
21532155
CustomSchemaLoader,
2156+
List[
2157+
Union[
2158+
InlineSchemaLoader,
2159+
DynamicSchemaLoader,
2160+
JsonFileSchemaLoader,
2161+
CustomSchemaLoader,
2162+
]
2163+
],
21542164
]
21552165
] = Field(
21562166
None,

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 19 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -504,6 +504,7 @@
504504
SchemaTypeIdentifier,
505505
TypesMap,
506506
)
507+
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
507508
from airbyte_cdk.sources.declarative.spec import Spec
508509
from airbyte_cdk.sources.declarative.stream_slicers import StreamSlicer
509510
from airbyte_cdk.sources.declarative.transformations import (
@@ -1914,9 +1915,25 @@ def create_declarative_stream(
19141915
else:
19151916
state_transformations = []
19161917

1917-
if model.schema_loader:
1918+
schema_loader: Union[
1919+
CompositeSchemaLoader,
1920+
DefaultSchemaLoader,
1921+
DynamicSchemaLoader,
1922+
InlineSchemaLoader,
1923+
JsonFileSchemaLoader,
1924+
]
1925+
if model.schema_loader and isinstance(model.schema_loader, list):
1926+
nested_schema_loaders = [
1927+
self._create_component_from_model(model=nested_schema_loader, config=config)
1928+
for nested_schema_loader in model.schema_loader
1929+
]
1930+
schema_loader = CompositeSchemaLoader(
1931+
schema_loaders=nested_schema_loaders, parameters={}
1932+
)
1933+
elif model.schema_loader:
19181934
schema_loader = self._create_component_from_model(
1919-
model=model.schema_loader, config=config
1935+
model=model.schema_loader, # type: ignore # If defined, schema_loader is guaranteed not to be a list and will be one of the existing base models
1936+
config=config,
19201937
)
19211938
else:
19221939
options = model.parameters or {}
Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import InitVar, dataclass
6+
from typing import Any, Dict, List, Mapping
7+
8+
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
9+
10+
11+
@dataclass
12+
class CompositeSchemaLoader(SchemaLoader):
13+
"""
14+
Schema loader that consists of multiple schema loaders that are combined into a single
15+
schema. Subsequent schemas do not overwrite existing values so the schema loaders with
16+
a higher priority should be defined first.
17+
"""
18+
19+
schema_loaders: List[SchemaLoader]
20+
parameters: InitVar[Mapping[str, Any]]
21+
22+
def get_json_schema(self) -> Mapping[str, Any]:
23+
combined_schema: Dict[str, Any] = {
24+
"$schema": "http://json-schema.org/draft-07/schema#",
25+
"type": ["null", "object"],
26+
"properties": {},
27+
}
28+
for schema_loader in self.schema_loaders:
29+
schema_properties = schema_loader.get_json_schema()["properties"]
30+
combined_schema["properties"] = {**schema_properties, **combined_schema["properties"]}
31+
return combined_schema

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 70 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,8 @@
152152
SimpleRetriever,
153153
SimpleRetrieverTestReadDecorator,
154154
)
155-
from airbyte_cdk.sources.declarative.schema import JsonFileSchemaLoader
155+
from airbyte_cdk.sources.declarative.schema import InlineSchemaLoader, JsonFileSchemaLoader
156+
from airbyte_cdk.sources.declarative.schema.composite_schema_loader import CompositeSchemaLoader
156157
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
157158
from airbyte_cdk.sources.declarative.spec import Spec
158159
from airbyte_cdk.sources.declarative.transformations import AddFields, RemoveFields
@@ -4561,3 +4562,71 @@ def test_create_property_chunking_invalid_property_limit_type():
45614562
component_definition=property_chunking_model,
45624563
config={},
45634564
)
4565+
4566+
4567+
def test_create_stream_with_multiple_schema_loaders():
4568+
content = """
4569+
retriever:
4570+
requester:
4571+
type: "HttpRequester"
4572+
path: "example"
4573+
record_selector:
4574+
extractor:
4575+
field_path: []
4576+
stream_A:
4577+
type: DeclarativeStream
4578+
name: "A"
4579+
primary_key: "id"
4580+
schema_loader:
4581+
- type: InlineSchemaLoader
4582+
schema:
4583+
"#/schemas/first_schema"
4584+
- type: InlineSchemaLoader
4585+
schema:
4586+
"#/schemas/second_schema"
4587+
$parameters:
4588+
retriever: "#/retriever"
4589+
url_base: "https://airbyte.io"
4590+
schemas:
4591+
first_schema:
4592+
$schema: "http://json-schema.org/draft-07/schema"
4593+
type:
4594+
- "null"
4595+
- object
4596+
additionalProperties: true
4597+
properties:
4598+
id:
4599+
description: The user ID
4600+
type:
4601+
- "null"
4602+
- string
4603+
second_schema:
4604+
$schema: "http://json-schema.org/draft-07/schema"
4605+
type:
4606+
- "null"
4607+
- object
4608+
additionalProperties: true
4609+
properties:
4610+
name:
4611+
description: The user name
4612+
type:
4613+
- "null"
4614+
- string
4615+
"""
4616+
parsed_manifest = YamlDeclarativeSource._parse(content)
4617+
resolved_manifest = resolver.preprocess_manifest(parsed_manifest)
4618+
partition_router_manifest = transformer.propagate_types_and_parameters(
4619+
"", resolved_manifest["stream_A"], {}
4620+
)
4621+
4622+
declarative_stream = factory.create_component(
4623+
model_type=DeclarativeStreamModel,
4624+
component_definition=partition_router_manifest,
4625+
config=input_config,
4626+
)
4627+
4628+
schema_loader = declarative_stream.schema_loader
4629+
assert isinstance(schema_loader, CompositeSchemaLoader)
4630+
assert len(schema_loader.schema_loaders) == 2
4631+
assert isinstance(schema_loader.schema_loaders[0], InlineSchemaLoader)
4632+
assert isinstance(schema_loader.schema_loaders[1], InlineSchemaLoader)

0 commit comments

Comments
 (0)