Skip to content

Commit f03dc9f

Browse files
fix(async-retriever): fix calling transformationws twice (#576)
Co-authored-by: ChristoGrab <[email protected]>
1 parent 8b534b0 commit f03dc9f

File tree

3 files changed

+35
-4
lines changed

3 files changed

+35
-4
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3445,11 +3445,14 @@ def create_async_retriever(
34453445
**kwargs: Any,
34463446
) -> AsyncRetriever:
34473447
def _get_download_retriever() -> SimpleRetriever:
3448+
# We create a record selector for the download retriever
3449+
# with no schema normalization and no transformations, neither record filter
3450+
# as all this occurs in the record_selector of the AsyncRetriever
34483451
record_selector = RecordSelector(
34493452
extractor=download_extractor,
34503453
name=name,
34513454
record_filter=None,
3452-
transformations=transformations,
3455+
transformations=[],
34533456
schema_normalization=TypeTransformer(TransformConfig.NoTransform),
34543457
config=config,
34553458
parameters={},

unit_tests/sources/declarative/parsers/test_model_to_component_factory.py

Lines changed: 27 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3828,13 +3828,29 @@ def test_create_async_retriever():
38283828
},
38293829
}
38303830

3831+
transformations = [
3832+
AddFields(
3833+
fields=[
3834+
AddedFieldDefinition(
3835+
path=["field1"],
3836+
value=InterpolatedString(
3837+
string="static_value", default="static_value", parameters={}
3838+
),
3839+
value_type=None,
3840+
parameters={},
3841+
)
3842+
],
3843+
parameters={},
3844+
)
3845+
]
3846+
38313847
component = factory.create_component(
38323848
model_type=AsyncRetrieverModel,
38333849
component_definition=definition,
38343850
name="test_stream",
38353851
primary_key="id",
38363852
stream_slicer=None,
3837-
transformations=[],
3853+
transformations=transformations,
38383854
config=config,
38393855
)
38403856

@@ -3864,6 +3880,16 @@ def test_create_async_retriever():
38643880
assert isinstance(extractor, DpathExtractor)
38653881
assert extractor.field_path == ["data"]
38663882

3883+
# Validate the transformations are just passed to the async retriever record_selector but not the download retriever record_selector
3884+
assert selector.transformations == transformations
3885+
download_retriever_record_selector: RecordSelector = (
3886+
job_repository.download_retriever.record_selector
3887+
) # type: ignore
3888+
assert download_retriever_record_selector.transformations != transformations
3889+
assert not download_retriever_record_selector.transformations
3890+
assert download_retriever_record_selector.record_filter is None
3891+
assert download_retriever_record_selector.schema_normalization._config.name == "NoTransform"
3892+
38673893

38683894
def test_api_budget():
38693895
manifest = {

unit_tests/sources/declarative/resolvers/test_config_components_resolver.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,8 @@ def test_dynamic_streams_read_with_config_components_resolver(
203203
]
204204

205205
assert len(actual_catalog.streams) == len(expected_stream_names)
206-
assert [stream.name for stream in actual_catalog.streams] == expected_stream_names
206+
# Use set comparison to avoid relying on deterministic ordering
207+
assert set(stream.name for stream in actual_catalog.streams) == set(expected_stream_names)
207208
assert len(records) == len(expected_stream_names)
208-
assert [record.stream for record in records] == expected_stream_names
209+
# Use set comparison to avoid relying on deterministic ordering
210+
assert set(record.stream for record in records) == set(expected_stream_names)

0 commit comments

Comments
 (0)