Skip to content

Commit a59d25f

Browse files
lmossmanCopilot
andauthored
fix: make builder test read work when a mix of static and dynamic streams are passed in (#676)
Co-authored-by: Copilot <[email protected]>
1 parent 51cfea5 commit a59d25f

File tree

8 files changed

+179
-75
lines changed

8 files changed

+179
-75
lines changed

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -108,7 +108,12 @@ def read_stream(
108108
stream_name = configured_catalog.streams[0].stream.name
109109

110110
stream_read = test_read_handler.run_test_read(
111-
source, config, configured_catalog, state, limits.max_records
111+
source,
112+
config,
113+
configured_catalog,
114+
stream_name,
115+
state,
116+
limits.max_records,
112117
)
113118

114119
return AirbyteMessage(

airbyte_cdk/connector_builder/test_reader/helpers.py

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -269,6 +269,37 @@ def should_close_page_for_slice(at_least_one_page_in_group: bool, message: Airby
269269
return at_least_one_page_in_group and should_process_slice_descriptor(message)
270270

271271

272+
def is_page_http_request_for_different_stream(
273+
json_message: Optional[Dict[str, Any]], stream_name: str
274+
) -> bool:
275+
"""
276+
Determines whether a given JSON message represents a page HTTP request for a different stream.
277+
278+
This function checks if the provided JSON message is a page HTTP request, and if the stream name in the log is
279+
different from the provided stream name.
280+
281+
This is needed because dynamic streams result in extra page HTTP requests for the dynamic streams that we want to ignore
282+
when they do not match the stream that is being read.
283+
284+
Args:
285+
json_message (Optional[Dict[str, Any]]): The JSON message to evaluate.
286+
stream_name (str): The name of the stream to compare against.
287+
288+
Returns:
289+
bool: True if the JSON message is a page HTTP request for a different stream, False otherwise.
290+
"""
291+
if not json_message or not is_page_http_request(json_message):
292+
return False
293+
294+
message_stream_name: str | None = (
295+
json_message.get("airbyte_cdk", {}).get("stream", {}).get("name", None)
296+
)
297+
if message_stream_name is None:
298+
return False
299+
300+
return message_stream_name != stream_name
301+
302+
272303
def is_page_http_request(json_message: Optional[Dict[str, Any]]) -> bool:
273304
"""
274305
Determines whether a given JSON message represents a page HTTP request.

airbyte_cdk/connector_builder/test_reader/message_grouper.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
is_async_auxiliary_request,
2929
is_config_update_message,
3030
is_log_message,
31+
is_page_http_request_for_different_stream,
3132
is_record_message,
3233
is_state_message,
3334
is_trace_with_error,
@@ -44,6 +45,7 @@ def get_message_groups(
4445
schema_inferrer: SchemaInferrer,
4546
datetime_format_inferrer: DatetimeFormatInferrer,
4647
limit: int,
48+
stream_name: str,
4749
) -> MESSAGE_GROUPS:
4850
"""
4951
Processes an iterator of AirbyteMessage objects to group and yield messages based on their type and sequence.
@@ -96,6 +98,9 @@ def get_message_groups(
9698
while records_count < limit and (message := next(messages, None)):
9799
json_message = airbyte_message_to_json(message)
98100

101+
if is_page_http_request_for_different_stream(json_message, stream_name):
102+
continue
103+
99104
if should_close_page(at_least_one_page_in_group, message, json_message):
100105
current_page_request, current_page_response = handle_current_page(
101106
current_page_request,

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,7 @@ def run_test_read(
8686
source: DeclarativeSource,
8787
config: Mapping[str, Any],
8888
configured_catalog: ConfiguredAirbyteCatalog,
89+
stream_name: str,
8990
state: List[AirbyteStateMessage],
9091
record_limit: Optional[int] = None,
9192
) -> StreamRead:
@@ -112,14 +113,17 @@ def run_test_read(
112113

113114
record_limit = self._check_record_limit(record_limit)
114115
# The connector builder currently only supports reading from a single stream at a time
115-
stream = source.streams(config)[0]
116+
streams = source.streams(config)
117+
stream = next((stream for stream in streams if stream.name == stream_name), None)
116118

117119
# get any deprecation warnings during the component creation
118120
deprecation_warnings: List[LogMessage] = source.deprecation_warnings()
119121

120122
schema_inferrer = SchemaInferrer(
121-
self._pk_to_nested_and_composite_field(stream.primary_key),
122-
self._cursor_field_to_nested_and_composite_field(stream.cursor_field),
123+
self._pk_to_nested_and_composite_field(stream.primary_key) if stream else None,
124+
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
125+
if stream
126+
else None,
123127
)
124128
datetime_format_inferrer = DatetimeFormatInferrer()
125129

@@ -128,6 +132,7 @@ def run_test_read(
128132
schema_inferrer,
129133
datetime_format_inferrer,
130134
record_limit,
135+
stream_name,
131136
)
132137

133138
slices, log_messages, auxiliary_requests, latest_config_update = self._categorise_groups(

airbyte_cdk/sources/declarative/manifest_declarative_source.py

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -542,11 +542,19 @@ def _dynamic_stream_configs(
542542
components_resolver_config["retriever"]["requester"]["use_cache"] = True
543543

544544
# Create a resolver for dynamic components based on type
545-
components_resolver = self._constructor.create_component(
546-
COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
547-
components_resolver_config,
548-
config,
549-
)
545+
if resolver_type == "HttpComponentsResolver":
546+
components_resolver = self._constructor.create_component(
547+
model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
548+
component_definition=components_resolver_config,
549+
config=config,
550+
stream_name=dynamic_definition.get("name"),
551+
)
552+
else:
553+
components_resolver = self._constructor.create_component(
554+
model_type=COMPONENTS_RESOLVER_TYPE_MAPPING[resolver_type],
555+
component_definition=components_resolver_config,
556+
config=config,
557+
)
550558

551559
stream_template_config = dynamic_definition["stream_template"]
552560

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 7 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3493,10 +3493,11 @@ def _get_download_retriever() -> SimpleRetriever:
34933493
requester=download_requester,
34943494
record_selector=record_selector,
34953495
primary_key=None,
3496-
name=job_download_components_name,
3496+
name=name,
34973497
paginator=paginator,
34983498
config=config,
34993499
parameters={},
3500+
log_formatter=self._get_log_formatter(None, name),
35003501
)
35013502

35023503
def _get_job_timeout() -> datetime.timedelta:
@@ -3805,15 +3806,15 @@ def create_components_mapping_definition(
38053806
)
38063807

38073808
def create_http_components_resolver(
3808-
self, model: HttpComponentsResolverModel, config: Config
3809+
self, model: HttpComponentsResolverModel, config: Config, stream_name: Optional[str] = None
38093810
) -> Any:
38103811
stream_slicer = self._build_stream_slicer_from_partition_router(model.retriever, config)
38113812
combined_slicers = self._build_resumable_cursor(model.retriever, stream_slicer)
38123813

38133814
retriever = self._create_component_from_model(
38143815
model=model.retriever,
38153816
config=config,
3816-
name="",
3817+
name=f"{stream_name if stream_name else '__http_components_resolver'}",
38173818
primary_key=None,
38183819
stream_slicer=stream_slicer if stream_slicer else combined_slicers,
38193820
transformations=[],
@@ -3890,7 +3891,9 @@ def create_config_components_resolver(
38903891
)
38913892

38923893
def create_parametrized_components_resolver(
3893-
self, model: ParametrizedComponentsResolverModel, config: Config
3894+
self,
3895+
model: ParametrizedComponentsResolverModel,
3896+
config: Config,
38943897
) -> ParametrizedComponentsResolver:
38953898
stream_parameters = StreamParametersDefinition(
38963899
list_of_parameters_for_stream=model.stream_parameters.list_of_parameters_for_stream

unit_tests/connector_builder/test_connector_builder_handler.py

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -752,6 +752,7 @@ def test_read():
752752
source,
753753
config,
754754
ConfiguredAirbyteCatalogSerializer.load(CONFIGURED_CATALOG),
755+
_stream_name,
755756
_A_STATE,
756757
limits.max_records,
757758
)
@@ -812,6 +813,10 @@ def primary_key(self):
812813
def cursor_field(self):
813814
return []
814815

816+
@property
817+
def name(self):
818+
return _stream_name
819+
815820
class MockManifestDeclarativeSource:
816821
def streams(self, config):
817822
return [MockDeclarativeStream()]

0 commit comments

Comments
 (0)