Skip to content

Commit 8ef954c

Browse files
darynaishchenkooctavia-squidington-iii
andauthored
fix(low-code): set transform_before_filtering from manifest if client_side_incremental_sync=true (#515)
Co-authored-by: octavia-squidington-iii <[email protected]>
1 parent 9c1f74f commit 8ef954c

File tree

2 files changed

+72
-8
lines changed

2 files changed

+72
-8
lines changed

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -2833,13 +2833,9 @@ def create_record_selector(
28332833
else None
28342834
)
28352835

2836-
if model.transform_before_filtering is None:
2837-
# default to False if not set
2838-
model.transform_before_filtering = False
2839-
2840-
assert model.transform_before_filtering is not None # for mypy
2841-
2842-
transform_before_filtering = model.transform_before_filtering
2836+
transform_before_filtering = (
2837+
False if model.transform_before_filtering is None else model.transform_before_filtering
2838+
)
28432839
if client_side_incremental_sync:
28442840
record_filter = ClientSideIncrementalRecordFilterDecorator(
28452841
config=config,
@@ -2849,7 +2845,11 @@ def create_record_selector(
28492845
else None,
28502846
**client_side_incremental_sync,
28512847
)
2852-
transform_before_filtering = True
2848+
transform_before_filtering = (
2849+
True
2850+
if model.transform_before_filtering is None
2851+
else model.transform_before_filtering
2852+
)
28532853

28542854
if model.schema_normalization is None:
28552855
# default to no schema normalization if not set

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111

1212
import freezegun
1313
import isodate
14+
import pytest
1415
from typing_extensions import deprecated
1516

1617
from airbyte_cdk.models import (
@@ -1876,6 +1877,69 @@ def test_stream_using_is_client_side_incremental_has_cursor_state():
18761877
assert client_side_incremental_cursor_state == expected_cursor_value
18771878

18781879

1880+
@pytest.mark.parametrize(
1881+
"expected_transform_before_filtering",
1882+
[
1883+
pytest.param(
1884+
True,
1885+
id="transform before filtering",
1886+
),
1887+
pytest.param(
1888+
False,
1889+
id="transform after filtering",
1890+
),
1891+
pytest.param(
1892+
None,
1893+
id="default transform before filtering",
1894+
),
1895+
],
1896+
)
1897+
def test_stream_using_is_client_side_incremental_has_transform_before_filtering_according_to_manifest(
1898+
expected_transform_before_filtering,
1899+
):
1900+
expected_cursor_value = "2024-07-01"
1901+
state = [
1902+
AirbyteStateMessage(
1903+
type=AirbyteStateType.STREAM,
1904+
stream=AirbyteStreamState(
1905+
stream_descriptor=StreamDescriptor(name="locations", namespace=None),
1906+
stream_state=AirbyteStateBlob(updated_at=expected_cursor_value),
1907+
),
1908+
)
1909+
]
1910+
1911+
manifest_with_stream_state_interpolation = copy.deepcopy(_MANIFEST)
1912+
1913+
# Enable semi-incremental on the locations stream
1914+
manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["incremental_sync"][
1915+
"is_client_side_incremental"
1916+
] = True
1917+
1918+
if expected_transform_before_filtering is not None:
1919+
manifest_with_stream_state_interpolation["definitions"]["locations_stream"]["retriever"][
1920+
"record_selector"
1921+
]["transform_before_filtering"] = expected_transform_before_filtering
1922+
1923+
source = ConcurrentDeclarativeSource(
1924+
source_config=manifest_with_stream_state_interpolation,
1925+
config=_CONFIG,
1926+
catalog=_CATALOG,
1927+
state=state,
1928+
)
1929+
concurrent_streams, synchronous_streams = source._group_streams(config=_CONFIG)
1930+
1931+
locations_stream = concurrent_streams[2]
1932+
assert isinstance(locations_stream, DefaultStream)
1933+
1934+
simple_retriever = locations_stream._stream_partition_generator._partition_factory._retriever
1935+
record_selector = simple_retriever.record_selector
1936+
1937+
if expected_transform_before_filtering is not None:
1938+
assert record_selector.transform_before_filtering == expected_transform_before_filtering
1939+
else:
1940+
assert record_selector.transform_before_filtering is True
1941+
1942+
18791943
def create_wrapped_stream(stream: DeclarativeStream) -> Stream:
18801944
slice_to_records_mapping = get_mocked_read_records_output(stream_name=stream.name)
18811945

0 commit comments

Comments
 (0)