Skip to content

Commit 895756d

Browse files
darynaishchenkooctavia-squidington-iii
andauthored
feat(low-code): pass top level params to PropertiesFromEndpoint requester (#543)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 8f96aee commit 895756d

File tree

2 files changed

+160
-9
lines changed

2 files changed

+160
-9
lines changed

airbyte_cdk/sources/declarative/parsers/manifest_component_transformer.py

Lines changed: 51 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -120,14 +120,6 @@ def propagate_types_and_parameters(
120120
if found_type:
121121
propagated_component["type"] = found_type
122122

123-
# When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
124-
# When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
125-
# be json_schema are not objects but we believe this is not likely in our case because:
126-
# * records are Mapping so objects hence SchemaLoader root should be an object
127-
# * connection_specification is a Mapping
128-
if "type" not in propagated_component or self._is_json_schema_object(propagated_component):
129-
return propagated_component
130-
131123
# Combines parameters defined at the current level with parameters from parent components. Parameters at the current
132124
# level take precedence
133125
current_parameters = dict(copy.deepcopy(parent_parameters))
@@ -138,6 +130,27 @@ def propagate_types_and_parameters(
138130
else {**current_parameters, **component_parameters}
139131
)
140132

133+
# When there is no resolved type, we're not processing a component (likely a regular object) and don't need to propagate parameters
134+
# When the type refers to a json schema, we're not processing a component as well. This check is currently imperfect as there could
135+
# be json_schema are not objects but we believe this is not likely in our case because:
136+
# * records are Mapping so objects hence SchemaLoader root should be an object
137+
# * connection_specification is a Mapping
138+
if self._is_json_schema_object(propagated_component):
139+
return propagated_component
140+
141+
# For objects that don't have type check if their object fields have nested components which should have `$parameters` in it.
142+
# For example, QueryProperties in requester.request_parameters, etc.
143+
# Update propagated_component value with nested components with parent `$parameters` if needed and return propagated_component.
144+
if "type" not in propagated_component:
145+
if self._has_nested_components(propagated_component):
146+
propagated_component = self._process_nested_components(
147+
propagated_component,
148+
parent_field_identifier,
149+
current_parameters,
150+
use_parent_parameters,
151+
)
152+
return propagated_component
153+
141154
# Parameters should be applied to the current component fields with the existing field taking precedence over parameters if
142155
# both exist
143156
for parameter_key, parameter_value in current_parameters.items():
@@ -181,4 +194,33 @@ def propagate_types_and_parameters(
181194

182195
@staticmethod
183196
def _is_json_schema_object(propagated_component: Mapping[str, Any]) -> bool:
184-
return propagated_component.get("type") == "object"
197+
return propagated_component.get("type") == "object" or propagated_component.get("type") == [
198+
"null",
199+
"object",
200+
]
201+
202+
@staticmethod
203+
def _has_nested_components(propagated_component: Dict[str, Any]) -> bool:
204+
for k, v in propagated_component.items():
205+
if isinstance(v, dict) and v.get("type"):
206+
return True
207+
return False
208+
209+
def _process_nested_components(
210+
self,
211+
propagated_component: Dict[str, Any],
212+
parent_field_identifier: str,
213+
current_parameters: Mapping[str, Any],
214+
use_parent_parameters: Optional[bool] = None,
215+
) -> Dict[str, Any]:
216+
for field_name, field_value in propagated_component.items():
217+
if isinstance(field_value, dict) and field_value.get("type"):
218+
nested_component_with_parameters = self.propagate_types_and_parameters(
219+
parent_field_identifier,
220+
field_value,
221+
current_parameters,
222+
use_parent_parameters=use_parent_parameters,
223+
)
224+
propagated_component[field_name] = nested_component_with_parameters
225+
226+
return propagated_component

unit_tests/sources/declarative/parsers/test_manifest_component_transformer.py

Lines changed: 109 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -460,3 +460,112 @@ def test_do_not_propagate_parameters_on_json_schema_object():
460460
actual_component = transformer.propagate_types_and_parameters("", component, {})
461461

462462
assert actual_component == expected_component
463+
464+
465+
def test_propagate_property_chunking():
466+
component = {
467+
"type": "DeclarativeStream",
468+
"streams": [
469+
{
470+
"type": "DeclarativeStream",
471+
"retriever": {
472+
"type": "SimpleRetriever",
473+
"requester": {
474+
"type": "HttpRequester",
475+
"url_base": "https://test.com",
476+
"request_parameters": {
477+
"properties": {
478+
"type": "QueryProperties",
479+
"property_list": {
480+
"type": "PropertiesFromEndpoint",
481+
"property_field_path": ["name"],
482+
"retriever": {
483+
"type": "SimpleRetriever",
484+
"requester": {
485+
"type": "HttpRequester",
486+
"url_base": "https://test.com",
487+
"authenticator": {
488+
"$ref": "#/definitions/authenticator"
489+
},
490+
"path": "/properties/{{ parameters.entity }}/properties",
491+
"http_method": "GET",
492+
"request_headers": {"Content-Type": "application/json"},
493+
},
494+
},
495+
},
496+
"property_chunking": {
497+
"type": "PropertyChunking",
498+
"property_limit_type": "characters",
499+
"property_limit": 15000,
500+
},
501+
}
502+
},
503+
},
504+
},
505+
"$parameters": {"entity": "test_entity"},
506+
}
507+
],
508+
}
509+
expected_component = {
510+
"streams": [
511+
{
512+
"$parameters": {"entity": "test_entity"},
513+
"entity": "test_entity",
514+
"retriever": {
515+
"$parameters": {"entity": "test_entity"},
516+
"entity": "test_entity",
517+
"requester": {
518+
"$parameters": {"entity": "test_entity"},
519+
"entity": "test_entity",
520+
"request_parameters": {
521+
"properties": {
522+
"$parameters": {"entity": "test_entity"},
523+
"entity": "test_entity",
524+
"property_chunking": {
525+
"$parameters": {"entity": "test_entity"},
526+
"entity": "test_entity",
527+
"property_limit": 15000,
528+
"property_limit_type": "characters",
529+
"type": "PropertyChunking",
530+
},
531+
"property_list": {
532+
"$parameters": {"entity": "test_entity"},
533+
"entity": "test_entity",
534+
"property_field_path": ["name"],
535+
"retriever": {
536+
"$parameters": {"entity": "test_entity"},
537+
"entity": "test_entity",
538+
"requester": {
539+
"$parameters": {"entity": "test_entity"},
540+
"authenticator": {
541+
"$ref": "#/definitions/authenticator"
542+
},
543+
"entity": "test_entity",
544+
"http_method": "GET",
545+
"path": "/properties/{{ "
546+
"parameters.entity "
547+
"}}/properties",
548+
"request_headers": {"Content-Type": "application/json"},
549+
"type": "HttpRequester",
550+
"url_base": "https://test.com",
551+
},
552+
"type": "SimpleRetriever",
553+
},
554+
"type": "PropertiesFromEndpoint",
555+
},
556+
"type": "QueryProperties",
557+
}
558+
},
559+
"type": "HttpRequester",
560+
"url_base": "https://test.com",
561+
},
562+
"type": "SimpleRetriever",
563+
},
564+
"type": "DeclarativeStream",
565+
}
566+
],
567+
"type": "DeclarativeStream",
568+
}
569+
transformer = ManifestComponentTransformer()
570+
actual_component = transformer.propagate_types_and_parameters("", component, {})
571+
assert actual_component == expected_component

0 commit comments

Comments
 (0)