Skip to content

Commit 069a06e

Browse files
authored
feat(concurrent cdk): Emit incomplete status for missing streams (#754)
1 parent 9a075a1 commit 069a06e

File tree

2 files changed

+58
-4
lines changed

2 files changed

+58
-4
lines changed

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

Lines changed: 12 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,7 @@
2020

2121
import orjson
2222
import yaml
23-
from airbyte_protocol_dataclasses.models import Level
23+
from airbyte_protocol_dataclasses.models import AirbyteStreamStatus, Level, StreamDescriptor
2424
from jsonschema.exceptions import ValidationError
2525
from jsonschema.validators import validate
2626

@@ -88,6 +88,7 @@
8888
DebugSliceLogger,
8989
SliceLogger,
9090
)
91+
from airbyte_cdk.utils.stream_status_utils import as_airbyte_message
9192
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
9293

9394

@@ -630,15 +631,22 @@ def _dynamic_stream_configs(
630631

631632
return dynamic_stream_configs
632633

633-
@staticmethod
634634
def _select_streams(
635-
streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
635+
self, streams: List[AbstractStream], configured_catalog: ConfiguredAirbyteCatalog
636636
) -> List[AbstractStream]:
637637
stream_name_to_instance: Mapping[str, AbstractStream] = {s.name: s for s in streams}
638638
abstract_streams: List[AbstractStream] = []
639639
for configured_stream in configured_catalog.streams:
640640
stream_instance = stream_name_to_instance.get(configured_stream.stream.name)
641641
if stream_instance:
642642
abstract_streams.append(stream_instance)
643-
643+
else:
644+
# Previous behavior in the legacy synchronous CDK was to also raise an error TRACE message if
645+
# the source was configured with raise_exception_on_missing_stream=True. This was used on very
646+
# few sources like facebook-marketing and google-ads. We decided not to port this feature over,
647+
# but we can do so if we feel it necessary. With the current behavior,we should still result
648+
# in a partial failure since missing streams will be marked as INCOMPLETE.
649+
self._message_repository.emit_message(
650+
as_airbyte_message(configured_stream.stream, AirbyteStreamStatus.INCOMPLETE)
651+
)
644652
return abstract_streams

unit_tests/sources/declarative/test_concurrent_declarative_source.py

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,12 @@
1919
import pytest
2020
import requests
2121
import yaml
22+
from airbyte_protocol_dataclasses.models import (
23+
AirbyteStreamStatus,
24+
AirbyteStreamStatusTraceMessage,
25+
AirbyteTraceMessage,
26+
TraceType,
27+
)
2228
from jsonschema.exceptions import ValidationError
2329
from typing_extensions import deprecated
2430

@@ -1898,6 +1904,46 @@ def get_mocked_read_records_output(stream_name: str) -> Mapping[tuple[str, str],
18981904
}
18991905

19001906

1907+
@freezegun.freeze_time("2025-01-01T00:00:00")
1908+
def test_catalog_contains_missing_stream_in_source():
1909+
expected_messages = [
1910+
AirbyteMessage(
1911+
type=Type.TRACE,
1912+
trace=AirbyteTraceMessage(
1913+
type=TraceType.STREAM_STATUS,
1914+
stream_status=AirbyteStreamStatusTraceMessage(
1915+
stream_descriptor=StreamDescriptor(name="missing"),
1916+
status=AirbyteStreamStatus.INCOMPLETE,
1917+
),
1918+
emitted_at=1735689600000.0,
1919+
),
1920+
),
1921+
]
1922+
1923+
catalog = ConfiguredAirbyteCatalog(
1924+
streams=[
1925+
ConfiguredAirbyteStream(
1926+
stream=AirbyteStream(
1927+
name="missing", json_schema={}, supported_sync_modes=[SyncMode.full_refresh]
1928+
),
1929+
sync_mode=SyncMode.full_refresh,
1930+
destination_sync_mode=DestinationSyncMode.append,
1931+
),
1932+
]
1933+
)
1934+
1935+
source = ConcurrentDeclarativeSource(
1936+
source_config=_MANIFEST, config=_CONFIG, catalog=catalog, state=[]
1937+
)
1938+
1939+
list(source.read(logger=source.logger, config=_CONFIG, catalog=catalog, state=[]))
1940+
queue = source._concurrent_source._queue
1941+
1942+
for expected_message in expected_messages:
1943+
queue_message = queue.get()
1944+
assert queue_message == expected_message
1945+
1946+
19011947
def get_records_for_stream(
19021948
stream_name: str, messages: List[AirbyteMessage]
19031949
) -> List[AirbyteRecordMessage]:

0 commit comments

Comments
 (0)