Skip to content

Commit 51cfea5

Browse files
authored
feat: add condition to component mapping definition (#667)
1 parent 410571c commit 51cfea5

File tree

6 files changed

+233
-20
lines changed

6 files changed

+233
-20
lines changed

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4173,6 +4173,18 @@ definitions:
41734173
description: Determines whether to create a new path if it doesn't exist (true) or only update existing paths (false). When set to true, the resolver will create new paths in the stream template if they don't exist. When false (default), it will only update existing paths.
41744174
type: boolean
41754175
default: false
4176+
condition:
4177+
title: Condition
4178+
description: A condition that must be met for the mapping to be applied. This property is only supported for `ConfigComponentsResolver`.
4179+
type: string
4180+
interpolation_context:
4181+
- config
4182+
- stream_template_config
4183+
- components_values
4184+
- stream_slice
4185+
examples:
4186+
- "{{ components_values.get('cursor_field', None) }}"
4187+
- "{{ '_incremental' in components_values.get('stream_name', '') }}"
41764188
$parameters:
41774189
type: object
41784190
additionalProperties: true

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,5 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
13
# generated by datamodel-codegen:
24
# filename: declarative_component_schema.yaml
35

@@ -1492,6 +1494,15 @@ class ComponentMappingDefinition(BaseModel):
14921494
description="Determines whether to create a new path if it doesn't exist (true) or only update existing paths (false). When set to true, the resolver will create new paths in the stream template if they don't exist. When false (default), it will only update existing paths.",
14931495
title="Create or Update",
14941496
)
1497+
condition: Optional[str] = Field(
1498+
None,
1499+
description="A condition that must be met for the mapping to be applied. This property is only supported for `ConfigComponentsResolver`.",
1500+
examples=[
1501+
"{{ components_values.get('cursor_field', None) }}",
1502+
"{{ '_incremental' in components_values.get('stream_name', '') }}",
1503+
],
1504+
title="Condition",
1505+
)
14951506
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
14961507

14971508

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 30 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -3800,6 +3800,7 @@ def create_components_mapping_definition(
38003800
value=interpolated_value,
38013801
value_type=ModelToComponentFactory._json_schema_type_name_to_type(model.value_type),
38023802
create_or_update=model.create_or_update,
3803+
condition=model.condition,
38033804
parameters=model.parameters or {},
38043805
)
38053806

@@ -3818,16 +3819,19 @@ def create_http_components_resolver(
38183819
transformations=[],
38193820
)
38203821

3821-
components_mapping = [
3822-
self._create_component_from_model(
3823-
model=components_mapping_definition_model,
3824-
value_type=ModelToComponentFactory._json_schema_type_name_to_type(
3825-
components_mapping_definition_model.value_type
3826-
),
3827-
config=config,
3822+
components_mapping = []
3823+
for component_mapping_definition_model in model.components_mapping:
3824+
if component_mapping_definition_model.condition:
3825+
raise ValueError("`condition` is only supported for `ConfigComponentsResolver`")
3826+
components_mapping.append(
3827+
self._create_component_from_model(
3828+
model=component_mapping_definition_model,
3829+
value_type=ModelToComponentFactory._json_schema_type_name_to_type(
3830+
component_mapping_definition_model.value_type
3831+
),
3832+
config=config,
3833+
)
38283834
)
3829-
for components_mapping_definition_model in model.components_mapping
3830-
]
38313835

38323836
return HttpComponentsResolver(
38333837
retriever=retriever,
@@ -3851,7 +3855,9 @@ def create_stream_config(
38513855
)
38523856

38533857
def create_config_components_resolver(
3854-
self, model: ConfigComponentsResolverModel, config: Config
3858+
self,
3859+
model: ConfigComponentsResolverModel,
3860+
config: Config,
38553861
) -> Any:
38563862
model_stream_configs = (
38573863
model.stream_config if isinstance(model.stream_config, list) else [model.stream_config]
@@ -3871,6 +3877,7 @@ def create_config_components_resolver(
38713877
components_mapping_definition_model.value_type
38723878
),
38733879
config=config,
3880+
parameters=model.parameters,
38743881
)
38753882
for components_mapping_definition_model in model.components_mapping
38763883
]
@@ -3888,16 +3895,20 @@ def create_parametrized_components_resolver(
38883895
stream_parameters = StreamParametersDefinition(
38893896
list_of_parameters_for_stream=model.stream_parameters.list_of_parameters_for_stream
38903897
)
3891-
components_mapping = [
3892-
self._create_component_from_model(
3893-
model=components_mapping_definition_model,
3894-
value_type=ModelToComponentFactory._json_schema_type_name_to_type(
3895-
components_mapping_definition_model.value_type
3896-
),
3897-
config=config,
3898+
3899+
components_mapping = []
3900+
for components_mapping_definition_model in model.components_mapping:
3901+
if components_mapping_definition_model.condition:
3902+
raise ValueError("`condition` is only supported for `ConfigComponentsResolver`")
3903+
components_mapping.append(
3904+
self._create_component_from_model(
3905+
model=components_mapping_definition_model,
3906+
value_type=ModelToComponentFactory._json_schema_type_name_to_type(
3907+
components_mapping_definition_model.value_type
3908+
),
3909+
config=config,
3910+
)
38983911
)
3899-
for components_mapping_definition_model in model.components_mapping
3900-
]
39013912
return ParametrizedComponentsResolver(
39023913
stream_parameters=stream_parameters,
39033914
config=config,

airbyte_cdk/sources/declarative/resolvers/components_resolver.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
from typing_extensions import deprecated
1010

1111
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
12+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1213
from airbyte_cdk.sources.source import ExperimentalClassWarning
1314

1415

@@ -22,6 +23,7 @@ class ComponentMappingDefinition:
2223
value: Union["InterpolatedString", str]
2324
value_type: Optional[Type[Any]]
2425
parameters: InitVar[Mapping[str, Any]]
26+
condition: Optional[str] = None
2527
create_or_update: Optional[bool] = False
2628

2729

@@ -35,6 +37,7 @@ class ResolvedComponentMappingDefinition:
3537
value: "InterpolatedString"
3638
value_type: Optional[Type[Any]]
3739
parameters: InitVar[Mapping[str, Any]]
40+
condition: Optional[InterpolatedBoolean] = None
3841
create_or_update: Optional[bool] = False
3942

4043

airbyte_cdk/sources/declarative/resolvers/config_components_resolver.py

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
from yaml.scanner import ScannerError
1515

1616
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
17+
from airbyte_cdk.sources.declarative.interpolation.interpolated_boolean import InterpolatedBoolean
1718
from airbyte_cdk.sources.declarative.resolvers.components_resolver import (
1819
ComponentMappingDefinition,
1920
ComponentsResolver,
@@ -70,6 +71,12 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
7071
"""
7172

7273
for component_mapping in self.components_mapping:
74+
interpolated_condition = (
75+
InterpolatedBoolean(condition=component_mapping.condition, parameters=parameters)
76+
if component_mapping.condition
77+
else None
78+
)
79+
7380
if isinstance(component_mapping.value, (str, InterpolatedString)):
7481
interpolated_value = (
7582
InterpolatedString.create(component_mapping.value, parameters=parameters)
@@ -89,6 +96,7 @@ def __post_init__(self, parameters: Mapping[str, Any]) -> None:
8996
value_type=component_mapping.value_type,
9097
create_or_update=component_mapping.create_or_update,
9198
parameters=parameters,
99+
condition=interpolated_condition,
92100
)
93101
)
94102
else:
@@ -155,6 +163,12 @@ def resolve_components(
155163
kwargs["components_values"] = components_values # type: ignore[assignment] # component_values will always be of type Mapping[str, Any]
156164

157165
for resolved_component in self._resolved_components:
166+
if (
167+
resolved_component.condition is not None
168+
and not resolved_component.condition.eval(self.config, **kwargs)
169+
):
170+
continue
171+
158172
valid_types = (
159173
(resolved_component.value_type,) if resolved_component.value_type else None
160174
)

unit_tests/sources/declarative/resolvers/test_config_components_resolver.py

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,7 @@ def to_configured_catalog(
145145
STREAM_CONFIG
146146
]
147147

148-
# Manifest with component definition with value that is fails when trying
148+
# Manifest with component definition with value that fails when trying
149149
# to parse yaml in _parse_yaml_if_possible but generally contains valid string
150150
_MANIFEST_WITH_SCANNER_ERROR = deepcopy(_MANIFEST)
151151
_MANIFEST_WITH_SCANNER_ERROR["dynamic_streams"][0]["components_resolver"][
@@ -173,6 +173,7 @@ def to_configured_catalog(
173173
(_MANIFEST_WITH_STREAM_CONFIGS_LIST, _CONFIG, None, ["item_1", "item_2", "default_item"]),
174174
(_MANIFEST_WITH_SCANNER_ERROR, _CONFIG, None, ["item_1", "item_2", "default_item"]),
175175
],
176+
ids=["no_duplicates", "duplicates", "stream_configs_list", "scanner_error"],
176177
)
177178
def test_dynamic_streams_read_with_config_components_resolver(
178179
manifest, config, expected_exception, expected_stream_names
@@ -223,3 +224,164 @@ def test_dynamic_streams_read_with_config_components_resolver(
223224
assert len(records) == len(expected_stream_names)
224225
# Use set comparison to avoid relying on deterministic ordering
225226
assert set(record.stream for record in records) == set(expected_stream_names)
227+
228+
229+
# Manifest with condition that always evaluates to true
230+
_MANIFEST_WITH_TRUE_CONDITION = deepcopy(_MANIFEST)
231+
_MANIFEST_WITH_TRUE_CONDITION["dynamic_streams"][0]["components_resolver"][
232+
"components_mapping"
233+
].append(
234+
{
235+
"type": "ComponentMappingDefinition",
236+
"field_path": ["retriever", "requester", "$parameters", "always_included"],
237+
"value": "true_condition_value",
238+
"create_or_update": True,
239+
"condition": "{{ True }}",
240+
}
241+
)
242+
243+
# Manifest with condition that always evaluates to false
244+
_MANIFEST_WITH_FALSE_CONDITION = deepcopy(_MANIFEST)
245+
_MANIFEST_WITH_FALSE_CONDITION["dynamic_streams"][0]["components_resolver"][
246+
"components_mapping"
247+
].append(
248+
{
249+
"type": "ComponentMappingDefinition",
250+
"field_path": ["retriever", "requester", "$parameters", "never_included"],
251+
"value": "false_condition_value",
252+
"create_or_update": True,
253+
"condition": "{{ False }}",
254+
}
255+
)
256+
257+
# Manifest with condition using components_values that evaluates to true for some items
258+
_MANIFEST_WITH_COMPONENTS_VALUES_TRUE_CONDITION = deepcopy(_MANIFEST)
259+
_MANIFEST_WITH_COMPONENTS_VALUES_TRUE_CONDITION["dynamic_streams"][0]["components_resolver"][
260+
"components_mapping"
261+
].append(
262+
{
263+
"type": "ComponentMappingDefinition",
264+
"field_path": ["retriever", "requester", "$parameters", "conditional_param"],
265+
"value": "item_1_special_value",
266+
"create_or_update": True,
267+
"condition": "{{ components_values['name'] == 'item_1' }}",
268+
}
269+
)
270+
271+
# Manifest with condition using components_values that evaluates to false for all items
272+
_MANIFEST_WITH_COMPONENTS_VALUES_FALSE_CONDITION = deepcopy(_MANIFEST)
273+
_MANIFEST_WITH_COMPONENTS_VALUES_FALSE_CONDITION["dynamic_streams"][0]["components_resolver"][
274+
"components_mapping"
275+
].append(
276+
{
277+
"type": "ComponentMappingDefinition",
278+
"field_path": ["retriever", "requester", "$parameters", "never_matching"],
279+
"value": "never_applied_value",
280+
"create_or_update": True,
281+
"condition": "{{ components_values['name'] == 'non_existent_item' }}",
282+
}
283+
)
284+
285+
# Manifest with multiple conditions - some true, some false
286+
_MANIFEST_WITH_MIXED_CONDITIONS = deepcopy(_MANIFEST)
287+
_MANIFEST_WITH_MIXED_CONDITIONS["dynamic_streams"][0]["components_resolver"][
288+
"components_mapping"
289+
].extend(
290+
[
291+
{
292+
"type": "ComponentMappingDefinition",
293+
"field_path": ["retriever", "requester", "$parameters", "always_true"],
294+
"value": "always_applied",
295+
"create_or_update": True,
296+
"condition": "{{ True }}",
297+
},
298+
{
299+
"type": "ComponentMappingDefinition",
300+
"field_path": ["retriever", "requester", "$parameters", "always_false"],
301+
"value": "never_applied",
302+
"create_or_update": True,
303+
"condition": "{{ False }}",
304+
},
305+
{
306+
"type": "ComponentMappingDefinition",
307+
"field_path": ["retriever", "requester", "$parameters", "item_specific"],
308+
"value": "applied_to_item_2",
309+
"create_or_update": True,
310+
"condition": "{{ components_values['id'] == 2 }}",
311+
},
312+
]
313+
)
314+
315+
316+
@pytest.mark.parametrize(
317+
"manifest, config, expected_conditional_params",
318+
[
319+
(
320+
_MANIFEST_WITH_TRUE_CONDITION,
321+
_CONFIG,
322+
{
323+
"item_1": {"always_included": "true_condition_value", "item_id": 1},
324+
"item_2": {"always_included": "true_condition_value", "item_id": 2},
325+
"default_item": {"always_included": "true_condition_value", "item_id": 4},
326+
},
327+
),
328+
(
329+
_MANIFEST_WITH_FALSE_CONDITION,
330+
_CONFIG,
331+
{
332+
"item_1": {"item_id": 1}, # never_included should not be present
333+
"item_2": {"item_id": 2},
334+
"default_item": {"item_id": 4},
335+
},
336+
),
337+
(
338+
_MANIFEST_WITH_COMPONENTS_VALUES_TRUE_CONDITION,
339+
_CONFIG,
340+
{
341+
"item_1": {"conditional_param": "item_1_special_value", "item_id": 1},
342+
"item_2": {"item_id": 2}, # condition false for item_2
343+
"default_item": {"item_id": 4}, # condition false for default_item
344+
},
345+
),
346+
(
347+
_MANIFEST_WITH_COMPONENTS_VALUES_FALSE_CONDITION,
348+
_CONFIG,
349+
{
350+
"item_1": {"item_id": 1}, # never_matching should not be present
351+
"item_2": {"item_id": 2},
352+
"default_item": {"item_id": 4},
353+
},
354+
),
355+
(
356+
_MANIFEST_WITH_MIXED_CONDITIONS,
357+
_CONFIG,
358+
{
359+
"item_1": {"always_true": "always_applied", "item_id": 1},
360+
"item_2": {
361+
"always_true": "always_applied",
362+
"item_specific": "applied_to_item_2",
363+
"item_id": 2,
364+
},
365+
"default_item": {"always_true": "always_applied", "item_id": 4},
366+
},
367+
),
368+
],
369+
ids=[
370+
"true_condition",
371+
"false_condition",
372+
"components_values_true_condition",
373+
"components_values_false_condition",
374+
"mixed_conditions",
375+
],
376+
)
377+
def test_component_mapping_conditions(manifest, config, expected_conditional_params):
378+
"""Test that ComponentMappingDefinition conditions work correctly for various scenarios."""
379+
source = ConcurrentDeclarativeSource(
380+
source_config=manifest, config=config, catalog=None, state=None
381+
)
382+
383+
for stream in source.streams(config):
384+
if stream.name in expected_conditional_params:
385+
assert (
386+
stream.retriever.requester._parameters == expected_conditional_params[stream.name]
387+
)

0 commit comments

Comments
 (0)