Skip to content

Commit 4f0fcbc

Browse files
maxi297octavia-squidington-iiialdogonzalez8aaronsteersdevin-ai-integration[bot]
authored
feat(file-api): new upload component for declarative cdk and update protocol support for file-based connectors (#433)
Co-authored-by: octavia-squidington-iii <[email protected]> Co-authored-by: Aldo Gonzalez <[email protected]> Co-authored-by: Aldo Gonzalez <[email protected]> Co-authored-by: Aaron ("AJ") Steers <[email protected]> Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent b148ca5 commit 4f0fcbc

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

41 files changed

+1338
-392
lines changed

airbyte_cdk/models/__init__.py

+1
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
AirbyteMessage,
2020
AirbyteProtocol,
2121
AirbyteRecordMessage,
22+
AirbyteRecordMessageFileReference,
2223
AirbyteStateBlob,
2324
AirbyteStateMessage,
2425
AirbyteStateStats,

airbyte_cdk/models/airbyte_protocol.py

+1-3
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
from airbyte_protocol_dataclasses.models import * # noqa: F403 # Allow '*'
99
from serpyco_rs.metadata import Alias
1010

11-
from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
12-
1311
# ruff: noqa: F405 # ignore fuzzy import issues with 'import *'
1412

1513

@@ -84,7 +82,7 @@ class AirbyteMessage:
8482
spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined]
8583
connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined]
8684
catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined]
87-
record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined]
85+
record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined]
8886
state: Optional[AirbyteStateMessage] = None
8987
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
9088
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]

airbyte_cdk/models/file_transfer_record_message.py

-13
This file was deleted.

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

+1-1
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,7 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
149149
message = stream_data_to_airbyte_message(
150150
stream_name=record.stream_name,
151151
data_or_message=record.data,
152-
is_file_transfer_message=record.is_file_transfer_message,
152+
file_reference=record.file_reference,
153153
)
154154
stream = self._stream_name_to_instance[record.stream_name]
155155

airbyte_cdk/sources/declarative/concurrent_declarative_source.py

+8
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
PerPartitionWithGlobalCursor,
2929
)
3030
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
31+
from airbyte_cdk.sources.declarative.models import FileUploader
3132
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
3233
ConcurrencyLevel as ConcurrencyLevelModel,
3334
)
@@ -209,6 +210,10 @@ def _group_streams(
209210
# these legacy Python streams the way we do low-code streams to determine if they are concurrent compatible,
210211
# so we need to treat them as synchronous
211212

213+
supports_file_transfer = (
214+
"file_uploader" in name_to_stream_mapping[declarative_stream.name]
215+
)
216+
212217
if (
213218
isinstance(declarative_stream, DeclarativeStream)
214219
and name_to_stream_mapping[declarative_stream.name]["type"]
@@ -325,6 +330,7 @@ def _group_streams(
325330
else None,
326331
logger=self.logger,
327332
cursor=cursor,
333+
supports_file_transfer=supports_file_transfer,
328334
)
329335
)
330336
elif (
@@ -356,6 +362,7 @@ def _group_streams(
356362
cursor_field=None,
357363
logger=self.logger,
358364
cursor=final_state_cursor,
365+
supports_file_transfer=supports_file_transfer,
359366
)
360367
)
361368
elif (
@@ -410,6 +417,7 @@ def _group_streams(
410417
cursor_field=perpartition_cursor.cursor_field.cursor_field_key,
411418
logger=self.logger,
412419
cursor=perpartition_cursor,
420+
supports_file_transfer=supports_file_transfer,
413421
)
414422
)
415423
else:

airbyte_cdk/sources/declarative/declarative_component_schema.yaml

+36
Original file line numberDiff line numberDiff line change
@@ -1448,6 +1448,42 @@ definitions:
14481448
- "$ref": "#/definitions/LegacyToPerPartitionStateMigration"
14491449
- "$ref": "#/definitions/CustomStateMigration"
14501450
default: []
1451+
file_uploader:
1452+
title: File Uploader
1453+
description: (experimental) Describes how to fetch a file
1454+
type: object
1455+
required:
1456+
- type
1457+
- requester
1458+
- download_target_extractor
1459+
properties:
1460+
type:
1461+
type: string
1462+
enum: [ FileUploader ]
1463+
requester:
1464+
description: Requester component that describes how to prepare HTTP requests to send to the source API.
1465+
anyOf:
1466+
- "$ref": "#/definitions/CustomRequester"
1467+
- "$ref": "#/definitions/HttpRequester"
1468+
download_target_extractor:
1469+
description: Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response
1470+
anyOf:
1471+
- "$ref": "#/definitions/CustomRecordExtractor"
1472+
- "$ref": "#/definitions/DpathExtractor"
1473+
file_extractor:
1474+
description: Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content
1475+
anyOf:
1476+
- "$ref": "#/definitions/CustomRecordExtractor"
1477+
- "$ref": "#/definitions/DpathExtractor"
1478+
filename_extractor:
1479+
description: Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.
1480+
type: string
1481+
interpolation_context:
1482+
- config
1483+
- record
1484+
examples:
1485+
- "{{ record.id }}/{{ record.file_name }}/"
1486+
- "{{ record.id }}_{{ record.file_name }}/"
14511487
$parameters:
14521488
type: object
14531489
additional_properties: true

airbyte_cdk/sources/declarative/extractors/record_selector.py

+6-1
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
)
1616
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1717
from airbyte_cdk.sources.declarative.models import SchemaNormalization
18+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
1819
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
1920
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
2021
from airbyte_cdk.sources.utils.transform import TypeTransformer
@@ -42,6 +43,7 @@ class RecordSelector(HttpSelector):
4243
record_filter: Optional[RecordFilter] = None
4344
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
4445
transform_before_filtering: bool = False
46+
file_uploader: Optional[FileUploader] = None
4547

4648
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4749
self._parameters = parameters
@@ -117,7 +119,10 @@ def filter_and_transform(
117119
transformed_filtered_data, schema=records_schema
118120
)
119121
for data in normalized_data:
120-
yield Record(data=data, stream_name=self.name, associated_slice=stream_slice)
122+
record = Record(data=data, stream_name=self.name, associated_slice=stream_slice)
123+
if self.file_uploader:
124+
self.file_uploader.upload(record)
125+
yield record
121126

122127
def _normalize_by_schema(
123128
self, records: Iterable[Mapping[str, Any]], schema: Optional[Mapping[str, Any]]

airbyte_cdk/sources/declarative/models/declarative_component_schema.py

+31
Original file line numberDiff line numberDiff line change
@@ -2066,6 +2066,31 @@ class Config:
20662066
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
20672067

20682068

2069+
class FileUploader(BaseModel):
2070+
type: Literal["FileUploader"]
2071+
requester: Union[CustomRequester, HttpRequester] = Field(
2072+
...,
2073+
description="Requester component that describes how to prepare HTTP requests to send to the source API.",
2074+
)
2075+
download_target_extractor: Union[CustomRecordExtractor, DpathExtractor] = Field(
2076+
...,
2077+
description="Responsible for fetching the url where the file is located. This is applied on each records and not on the HTTP response",
2078+
)
2079+
file_extractor: Optional[Union[CustomRecordExtractor, DpathExtractor]] = Field(
2080+
None,
2081+
description="Responsible for fetching the content of the file. If not defined, the assumption is that the whole response body is the file content",
2082+
)
2083+
filename_extractor: Optional[str] = Field(
2084+
None,
2085+
description="Defines the name to store the file. Stream name is automatically added to the file path. File unique ID can be used to avoid overwriting files. Random UUID will be used if the extractor is not provided.",
2086+
examples=[
2087+
"{{ record.id }}/{{ record.file_name }}/",
2088+
"{{ record.id }}_{{ record.file_name }}/",
2089+
],
2090+
)
2091+
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
2092+
2093+
20692094
class DeclarativeStream(BaseModel):
20702095
class Config:
20712096
extra = Extra.allow
@@ -2124,6 +2149,11 @@ class Config:
21242149
description="Array of state migrations to be applied on the input state",
21252150
title="State Migrations",
21262151
)
2152+
file_uploader: Optional[FileUploader] = Field(
2153+
None,
2154+
description="(experimental) Describes how to fetch a file",
2155+
title="File Uploader",
2156+
)
21272157
parameters: Optional[Dict[str, Any]] = Field(None, alias="$parameters")
21282158

21292159

@@ -2617,6 +2647,7 @@ class DynamicDeclarativeStream(BaseModel):
26172647
DeclarativeSource1.update_forward_refs()
26182648
DeclarativeSource2.update_forward_refs()
26192649
SelectiveAuthenticator.update_forward_refs()
2650+
FileUploader.update_forward_refs()
26202651
DeclarativeStream.update_forward_refs()
26212652
SessionTokenAuthenticator.update_forward_refs()
26222653
DynamicSchemaLoader.update_forward_refs()

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

+39-1
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,6 @@
106106
)
107107
from airbyte_cdk.sources.declarative.models import (
108108
CustomStateMigration,
109-
GzipDecoder,
110109
)
111110
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
112111
AddedFieldDefinition as AddedFieldDefinitionModel,
@@ -228,6 +227,9 @@
228227
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
229228
ExponentialBackoffStrategy as ExponentialBackoffStrategyModel,
230229
)
230+
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
231+
FileUploader as FileUploaderModel,
232+
)
231233
from airbyte_cdk.sources.declarative.models.declarative_component_schema import (
232234
FixedWindowCallRatePolicy as FixedWindowCallRatePolicyModel,
233235
)
@@ -479,6 +481,7 @@
479481
SimpleRetriever,
480482
SimpleRetrieverTestReadDecorator,
481483
)
484+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
482485
from airbyte_cdk.sources.declarative.schema import (
483486
ComplexFieldType,
484487
DefaultSchemaLoader,
@@ -676,6 +679,7 @@ def _init_mappings(self) -> None:
676679
ComponentMappingDefinitionModel: self.create_components_mapping_definition,
677680
ZipfileDecoderModel: self.create_zipfile_decoder,
678681
HTTPAPIBudgetModel: self.create_http_api_budget,
682+
FileUploaderModel: self.create_file_uploader,
679683
FixedWindowCallRatePolicyModel: self.create_fixed_window_call_rate_policy,
680684
MovingWindowCallRatePolicyModel: self.create_moving_window_call_rate_policy,
681685
UnlimitedCallRatePolicyModel: self.create_unlimited_call_rate_policy,
@@ -1840,6 +1844,11 @@ def create_declarative_stream(
18401844
transformations.append(
18411845
self._create_component_from_model(model=transformation_model, config=config)
18421846
)
1847+
file_uploader = None
1848+
if model.file_uploader:
1849+
file_uploader = self._create_component_from_model(
1850+
model=model.file_uploader, config=config
1851+
)
18431852

18441853
retriever = self._create_component_from_model(
18451854
model=model.retriever,
@@ -1851,6 +1860,7 @@ def create_declarative_stream(
18511860
stop_condition_on_cursor=stop_condition_on_cursor,
18521861
client_side_incremental_sync=client_side_incremental_sync,
18531862
transformations=transformations,
1863+
file_uploader=file_uploader,
18541864
incremental_sync=model.incremental_sync,
18551865
)
18561866
cursor_field = model.incremental_sync.cursor_field if model.incremental_sync else None
@@ -2796,6 +2806,7 @@ def create_record_selector(
27962806
transformations: List[RecordTransformation] | None = None,
27972807
decoder: Decoder | None = None,
27982808
client_side_incremental_sync: Dict[str, Any] | None = None,
2809+
file_uploader: Optional[FileUploader] = None,
27992810
**kwargs: Any,
28002811
) -> RecordSelector:
28012812
extractor = self._create_component_from_model(
@@ -2833,6 +2844,7 @@ def create_record_selector(
28332844
config=config,
28342845
record_filter=record_filter,
28352846
transformations=transformations or [],
2847+
file_uploader=file_uploader,
28362848
schema_normalization=schema_normalization,
28372849
parameters=model.parameters or {},
28382850
transform_before_filtering=transform_before_filtering,
@@ -2890,6 +2902,7 @@ def create_simple_retriever(
28902902
stop_condition_on_cursor: bool = False,
28912903
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
28922904
transformations: List[RecordTransformation],
2905+
file_uploader: Optional[FileUploader] = None,
28932906
incremental_sync: Optional[
28942907
Union[
28952908
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
@@ -2910,6 +2923,7 @@ def create_simple_retriever(
29102923
decoder=decoder,
29112924
transformations=transformations,
29122925
client_side_incremental_sync=client_side_incremental_sync,
2926+
file_uploader=file_uploader,
29132927
)
29142928

29152929
query_properties: Optional[QueryProperties] = None
@@ -3576,6 +3590,30 @@ def create_fixed_window_call_rate_policy(
35763590
matchers=matchers,
35773591
)
35783592

3593+
def create_file_uploader(
3594+
self, model: FileUploaderModel, config: Config, **kwargs: Any
3595+
) -> FileUploader:
3596+
name = "File Uploader"
3597+
requester = self._create_component_from_model(
3598+
model=model.requester,
3599+
config=config,
3600+
name=name,
3601+
**kwargs,
3602+
)
3603+
download_target_extractor = self._create_component_from_model(
3604+
model=model.download_target_extractor,
3605+
config=config,
3606+
name=name,
3607+
**kwargs,
3608+
)
3609+
return FileUploader(
3610+
requester=requester,
3611+
download_target_extractor=download_target_extractor,
3612+
config=config,
3613+
parameters=model.parameters or {},
3614+
filename_extractor=model.filename_extractor if model.filename_extractor else None,
3615+
)
3616+
35793617
def create_moving_window_call_rate_policy(
35803618
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
35813619
) -> MovingWindowCallRatePolicy:

0 commit comments

Comments
 (0)