Skip to content

Commit 4475c57

Browse files
authored
chore(concurrent_declarative_source): Integrate ManifestDeclarativeSource into ConcurrentDeclarativeSource (#704)
1 parent 08ac482 commit 4475c57

33 files changed

+3153
-100
lines changed

airbyte_cdk/__init__.py

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,6 @@
107107
from .sources.declarative.extractors.record_filter import RecordFilter
108108
from .sources.declarative.incremental import DatetimeBasedCursor
109109
from .sources.declarative.interpolation import InterpolatedBoolean, InterpolatedString
110-
from .sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
111110
from .sources.declarative.migrations.legacy_to_per_partition_state_migration import (
112111
LegacyToPerPartitionStateMigration,
113112
)
@@ -253,7 +252,6 @@
253252
"JsonDecoder",
254253
"JsonFileSchemaLoader",
255254
"LegacyToPerPartitionStateMigration",
256-
"ManifestDeclarativeSource",
257255
"MinMaxDatetime",
258256
"NoAuth",
259257
"OffsetIncrement",

airbyte_cdk/connector_builder/connector_builder_handler.py

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,6 @@
1919
ConcurrentDeclarativeSource,
2020
TestLimits,
2121
)
22-
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
23-
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
2422
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets
2523
from airbyte_cdk.utils.datetime_helpers import ab_datetime_now
2624
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
@@ -90,7 +88,7 @@ def create_source(
9088

9189

9290
def read_stream(
93-
source: DeclarativeSource,
91+
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
9492
config: Mapping[str, Any],
9593
configured_catalog: ConfiguredAirbyteCatalog,
9694
state: List[AirbyteStateMessage],
@@ -128,7 +126,9 @@ def read_stream(
128126
return error.as_airbyte_message()
129127

130128

131-
def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
129+
def resolve_manifest(
130+
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
131+
) -> AirbyteMessage:
132132
try:
133133
return AirbyteMessage(
134134
type=Type.RECORD,
@@ -145,7 +145,9 @@ def resolve_manifest(source: ManifestDeclarativeSource) -> AirbyteMessage:
145145
return error.as_airbyte_message()
146146

147147

148-
def full_resolve_manifest(source: ManifestDeclarativeSource, limits: TestLimits) -> AirbyteMessage:
148+
def full_resolve_manifest(
149+
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]], limits: TestLimits
150+
) -> AirbyteMessage:
149151
try:
150152
manifest = {**source.resolved_manifest}
151153
streams = manifest.get("streams", [])

airbyte_cdk/connector_builder/main.py

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,9 @@
2525
ConfiguredAirbyteCatalog,
2626
ConfiguredAirbyteCatalogSerializer,
2727
)
28-
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
28+
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
29+
ConcurrentDeclarativeSource,
30+
)
2931
from airbyte_cdk.sources.source import Source
3032
from airbyte_cdk.utils.traced_exception import AirbyteTracedException
3133

@@ -68,7 +70,7 @@ def get_config_and_catalog_from_args(
6870

6971

7072
def handle_connector_builder_request(
71-
source: ManifestDeclarativeSource,
73+
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
7274
command: str,
7375
config: Mapping[str, Any],
7476
catalog: Optional[ConfiguredAirbyteCatalog],

airbyte_cdk/connector_builder/test_reader/reader.py

Lines changed: 10 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,9 @@
2323
ConfiguredAirbyteCatalog,
2424
TraceType,
2525
)
26-
from airbyte_cdk.sources.declarative.declarative_source import DeclarativeSource
26+
from airbyte_cdk.sources.declarative.concurrent_declarative_source import (
27+
ConcurrentDeclarativeSource,
28+
)
2729
from airbyte_cdk.utils import AirbyteTracedException
2830
from airbyte_cdk.utils.datetime_format_inferrer import DatetimeFormatInferrer
2931
from airbyte_cdk.utils.schema_inferrer import (
@@ -55,7 +57,7 @@ class TestReader:
5557
that contains slices of data, log messages, auxiliary requests, and any inferred schema or datetime formats.
5658
5759
Parameters:
58-
source (DeclarativeSource): The data source to read from.
60+
source (ConcurrentDeclarativeSource): The data source to read from.
5961
config (Mapping[str, Any]): Configuration parameters for the source.
6062
configured_catalog (ConfiguredAirbyteCatalog): Catalog containing stream configuration.
6163
state (List[AirbyteStateMessage]): Current state information for the read.
@@ -83,7 +85,7 @@ def __init__(
8385

8486
def run_test_read(
8587
self,
86-
source: DeclarativeSource,
88+
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
8789
config: Mapping[str, Any],
8890
configured_catalog: ConfiguredAirbyteCatalog,
8991
stream_name: str,
@@ -94,7 +96,7 @@ def run_test_read(
9496
Run a test read for the connector by reading from a single stream and inferring schema and datetime formats.
9597
9698
Parameters:
97-
source (DeclarativeSource): The source instance providing the streams.
99+
source (ConcurrentDeclarativeSource): The source instance providing the streams.
98100
config (Mapping[str, Any]): The configuration settings to use for reading.
99101
configured_catalog (ConfiguredAirbyteCatalog): The catalog specifying the stream configuration.
100102
state (List[AirbyteStateMessage]): A list of state messages to resume the read.
@@ -126,7 +128,7 @@ def run_test_read(
126128
if stream
127129
else None,
128130
self._cursor_field_to_nested_and_composite_field(stream.cursor_field)
129-
if stream
131+
if stream and stream.cursor_field
130132
else None,
131133
)
132134
datetime_format_inferrer = DatetimeFormatInferrer()
@@ -381,13 +383,13 @@ def _get_latest_config_update(
381383

382384
def _read_stream(
383385
self,
384-
source: DeclarativeSource,
386+
source: ConcurrentDeclarativeSource[Optional[List[AirbyteStateMessage]]],
385387
config: Mapping[str, Any],
386388
configured_catalog: ConfiguredAirbyteCatalog,
387389
state: List[AirbyteStateMessage],
388390
) -> Iterator[AirbyteMessage]:
389391
"""
390-
Reads messages from the given DeclarativeSource using an AirbyteEntrypoint.
392+
Reads messages from the given ConcurrentDeclarativeSource using an AirbyteEntrypoint.
391393
392394
This method attempts to yield messages from the source's read generator. If the generator
393395
raises an AirbyteTracedException, it checks whether the exception message indicates a non-actionable
@@ -396,7 +398,7 @@ def _read_stream(
396398
wrapped into an AirbyteTracedException, and yielded as an AirbyteMessage.
397399
398400
Parameters:
399-
source (DeclarativeSource): The source object that provides data reading logic.
401+
source (ConcurrentDeclarativeSource): The source object that provides data reading logic.
400402
config (Mapping[str, Any]): The configuration dictionary for the source.
401403
configured_catalog (ConfiguredAirbyteCatalog): The catalog defining the streams and their configurations.
402404
state (List[AirbyteStateMessage]): A list representing the current state for incremental sync.

airbyte_cdk/legacy/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
File renamed without changes.
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
File renamed without changes.

0 commit comments

Comments
 (0)