Skip to content

Commit 0a5dee2

Browse files
aldogonzalez8octavia-squidington-iiicoderabbitai[bot]
authored
feat(cdk): connector builder support for file uploader (#503)
Co-authored-by: octavia-squidington-iii <[email protected]> Co-authored-by: coderabbitai[bot] <136622811+coderabbitai[bot]@users.noreply.github.com>
1 parent 3a9d54b commit 0a5dee2

File tree

10 files changed

+236
-36
lines changed

10 files changed

+236
-36
lines changed

airbyte_cdk/sources/declarative/extractors/record_selector.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
)
1616
from airbyte_cdk.sources.declarative.interpolation import InterpolatedString
1717
from airbyte_cdk.sources.declarative.models import SchemaNormalization
18-
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
18+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import DefaultFileUploader
1919
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
2020
from airbyte_cdk.sources.types import Config, Record, StreamSlice, StreamState
2121
from airbyte_cdk.sources.utils.transform import TypeTransformer
@@ -43,7 +43,7 @@ class RecordSelector(HttpSelector):
4343
record_filter: Optional[RecordFilter] = None
4444
transformations: List[RecordTransformation] = field(default_factory=lambda: [])
4545
transform_before_filtering: bool = False
46-
file_uploader: Optional[FileUploader] = None
46+
file_uploader: Optional[DefaultFileUploader] = None
4747

4848
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
4949
self._parameters = parameters

airbyte_cdk/sources/declarative/parsers/model_to_component_factory.py

Lines changed: 20 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -481,7 +481,13 @@
481481
SimpleRetriever,
482482
SimpleRetrieverTestReadDecorator,
483483
)
484-
from airbyte_cdk.sources.declarative.retrievers.file_uploader import FileUploader
484+
from airbyte_cdk.sources.declarative.retrievers.file_uploader import (
485+
ConnectorBuilderFileUploader,
486+
DefaultFileUploader,
487+
FileUploader,
488+
LocalFileSystemFileWriter,
489+
NoopFileWriter,
490+
)
485491
from airbyte_cdk.sources.declarative.schema import (
486492
ComplexFieldType,
487493
DefaultSchemaLoader,
@@ -2815,7 +2821,7 @@ def create_record_selector(
28152821
transformations: List[RecordTransformation] | None = None,
28162822
decoder: Decoder | None = None,
28172823
client_side_incremental_sync: Dict[str, Any] | None = None,
2818-
file_uploader: Optional[FileUploader] = None,
2824+
file_uploader: Optional[DefaultFileUploader] = None,
28192825
**kwargs: Any,
28202826
) -> RecordSelector:
28212827
extractor = self._create_component_from_model(
@@ -2919,7 +2925,7 @@ def create_simple_retriever(
29192925
stop_condition_on_cursor: bool = False,
29202926
client_side_incremental_sync: Optional[Dict[str, Any]] = None,
29212927
transformations: List[RecordTransformation],
2922-
file_uploader: Optional[FileUploader] = None,
2928+
file_uploader: Optional[DefaultFileUploader] = None,
29232929
incremental_sync: Optional[
29242930
Union[
29252931
IncrementingCountCursorModel, DatetimeBasedCursorModel, CustomIncrementalSyncModel
@@ -3606,14 +3612,24 @@ def create_file_uploader(
36063612
name=name,
36073613
**kwargs,
36083614
)
3609-
return FileUploader(
3615+
emit_connector_builder_messages = self._emit_connector_builder_messages
3616+
file_uploader = DefaultFileUploader(
36103617
requester=requester,
36113618
download_target_extractor=download_target_extractor,
36123619
config=config,
3620+
file_writer=NoopFileWriter()
3621+
if emit_connector_builder_messages
3622+
else LocalFileSystemFileWriter(),
36133623
parameters=model.parameters or {},
36143624
filename_extractor=model.filename_extractor if model.filename_extractor else None,
36153625
)
36163626

3627+
return (
3628+
ConnectorBuilderFileUploader(file_uploader)
3629+
if emit_connector_builder_messages
3630+
else file_uploader
3631+
)
3632+
36173633
def create_moving_window_call_rate_policy(
36183634
self, model: MovingWindowCallRatePolicyModel, config: Config, **kwargs: Any
36193635
) -> MovingWindowCallRatePolicy:
Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,15 @@
1+
from .connector_builder_file_uploader import ConnectorBuilderFileUploader
2+
from .default_file_uploader import DefaultFileUploader
3+
from .file_uploader import FileUploader
4+
from .file_writer import FileWriter
5+
from .local_file_system_file_writer import LocalFileSystemFileWriter
6+
from .noop_file_writer import NoopFileWriter
7+
8+
__all__ = [
9+
"DefaultFileUploader",
10+
"LocalFileSystemFileWriter",
11+
"NoopFileWriter",
12+
"ConnectorBuilderFileUploader",
13+
"FileUploader",
14+
"FileWriter",
15+
]
Lines changed: 26 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,26 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from dataclasses import dataclass
6+
7+
from airbyte_cdk.sources.declarative.types import Record
8+
9+
from .default_file_uploader import DefaultFileUploader
10+
from .file_uploader import FileUploader
11+
12+
13+
@dataclass
14+
class ConnectorBuilderFileUploader(FileUploader):
15+
"""
16+
Connector builder file uploader
17+
Acts as a decorator or wrapper around a FileUploader instance, copying the attributes from record.file_reference into the record.data.
18+
"""
19+
20+
file_uploader: DefaultFileUploader
21+
22+
def upload(self, record: Record) -> None:
23+
self.file_uploader.upload(record=record)
24+
for file_reference_key, file_reference_value in record.file_reference.__dict__.items():
25+
if not file_reference_key.startswith("_"):
26+
record.data[file_reference_key] = file_reference_value # type: ignore

airbyte_cdk/sources/declarative/retrievers/file_uploader.py renamed to airbyte_cdk/sources/declarative/retrievers/file_uploader/default_file_uploader.py

Lines changed: 31 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -22,18 +22,27 @@
2222
from airbyte_cdk.sources.types import Config
2323
from airbyte_cdk.sources.utils.files_directory import get_files_directory
2424

25+
from .file_uploader import FileUploader
26+
from .file_writer import FileWriter
27+
2528
logger = logging.getLogger("airbyte")
2629

2730

2831
@dataclass
29-
class FileUploader:
32+
class DefaultFileUploader(FileUploader):
33+
"""
34+
File uploader class
35+
Handles the upload logic: fetching the download target, making the request via its requester, determining the file path, and calling self.file_writer.write()
36+
Different types of file_writer:BaseFileWriter can be injected to handle different file writing strategies.
37+
"""
38+
3039
requester: Requester
3140
download_target_extractor: RecordExtractor
3241
config: Config
42+
file_writer: FileWriter
3343
parameters: InitVar[Mapping[str, Any]]
3444

3545
filename_extractor: Optional[Union[InterpolatedString, str]] = None
36-
content_extractor: Optional[RecordExtractor] = None
3746

3847
def __post_init__(self, parameters: Mapping[str, Any]) -> None:
3948
if self.filename_extractor:
@@ -61,33 +70,28 @@ def upload(self, record: Record) -> None:
6170
),
6271
)
6372

64-
if self.content_extractor:
65-
raise NotImplementedError("TODO")
66-
else:
67-
files_directory = Path(get_files_directory())
73+
files_directory = Path(get_files_directory())
6874

69-
file_name = (
70-
self.filename_extractor.eval(self.config, record=record)
71-
if self.filename_extractor
72-
else str(uuid.uuid4())
73-
)
74-
file_name = file_name.lstrip("/")
75-
file_relative_path = Path(record.stream_name) / Path(file_name)
75+
file_name = (
76+
self.filename_extractor.eval(self.config, record=record)
77+
if self.filename_extractor
78+
else str(uuid.uuid4())
79+
)
80+
file_name = file_name.lstrip("/")
81+
file_relative_path = Path(record.stream_name) / Path(file_name)
7682

77-
full_path = files_directory / file_relative_path
78-
full_path.parent.mkdir(parents=True, exist_ok=True)
83+
full_path = files_directory / file_relative_path
84+
full_path.parent.mkdir(parents=True, exist_ok=True)
7985

80-
with open(str(full_path), "wb") as f:
81-
f.write(response.content)
82-
file_size_bytes = full_path.stat().st_size
86+
file_size_bytes = self.file_writer.write(full_path, content=response.content)
8387

84-
logger.info("File uploaded successfully")
85-
logger.info(f"File url: {str(full_path)}")
86-
logger.info(f"File size: {file_size_bytes / 1024} KB")
87-
logger.info(f"File relative path: {str(file_relative_path)}")
88+
logger.info("File uploaded successfully")
89+
logger.info(f"File url: {str(full_path)}")
90+
logger.info(f"File size: {file_size_bytes / 1024} KB")
91+
logger.info(f"File relative path: {str(file_relative_path)}")
8892

89-
record.file_reference = AirbyteRecordMessageFileReference(
90-
staging_file_url=str(full_path),
91-
source_file_relative_path=str(file_relative_path),
92-
file_size_bytes=file_size_bytes,
93-
)
93+
record.file_reference = AirbyteRecordMessageFileReference(
94+
staging_file_url=str(full_path),
95+
source_file_relative_path=str(file_relative_path),
96+
file_size_bytes=file_size_bytes,
97+
)
Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from abc import ABC, abstractmethod
6+
from dataclasses import dataclass
7+
8+
from airbyte_cdk.sources.declarative.types import Record
9+
10+
11+
@dataclass
12+
class FileUploader(ABC):
13+
"""
14+
Base class for file uploader
15+
"""
16+
17+
@abstractmethod
18+
def upload(self, record: Record) -> None:
19+
"""
20+
Uploads the file to the specified location
21+
"""
22+
...
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from abc import ABC, abstractmethod
6+
from pathlib import Path
7+
8+
9+
class FileWriter(ABC):
10+
"""
11+
Base File writer class
12+
"""
13+
14+
@abstractmethod
15+
def write(self, file_path: Path, content: bytes) -> int:
16+
"""
17+
Writes the file to the specified location
18+
"""
19+
...
Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,18 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from pathlib import Path
6+
7+
from .file_writer import FileWriter
8+
9+
10+
class LocalFileSystemFileWriter(FileWriter):
11+
def write(self, file_path: Path, content: bytes) -> int:
12+
"""
13+
Writes the file to the specified location
14+
"""
15+
with open(str(file_path), "wb") as f:
16+
f.write(content)
17+
18+
return file_path.stat().st_size
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from pathlib import Path
6+
7+
from .file_writer import FileWriter
8+
9+
10+
class NoopFileWriter(FileWriter):
11+
NOOP_FILE_SIZE = -1
12+
13+
def write(self, file_path: Path, content: bytes) -> int:
14+
"""
15+
Noop file writer
16+
"""
17+
return self.NOOP_FILE_SIZE

unit_tests/sources/declarative/file/test_file_stream.py

Lines changed: 66 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -3,9 +3,13 @@
33
from pathlib import Path
44
from typing import Any, Dict, List, Optional
55
from unittest import TestCase
6-
from unittest.mock import Mock
6+
from unittest.mock import Mock, patch
77

88
from airbyte_cdk.models import AirbyteStateMessage, ConfiguredAirbyteCatalog, Status
9+
from airbyte_cdk.sources.declarative.parsers.model_to_component_factory import (
10+
ModelToComponentFactory as OriginalModelToComponentFactory,
11+
)
12+
from airbyte_cdk.sources.declarative.retrievers.file_uploader.noop_file_writer import NoopFileWriter
913
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
1014
from airbyte_cdk.test.catalog_builder import CatalogBuilder, ConfiguredAirbyteStreamBuilder
1115
from airbyte_cdk.test.entrypoint_wrapper import EntrypointOutput
@@ -55,7 +59,11 @@ def read(
5559
config = config_builder.build()
5660
state = state_builder.build() if state_builder else StateBuilder().build()
5761
return entrypoint_read(
58-
_source(catalog, config, state, yaml_file), config, catalog, state, expecting_exception
62+
_source(catalog, config, state, yaml_file),
63+
config,
64+
catalog,
65+
state,
66+
expecting_exception,
5967
)
6068

6169

@@ -177,7 +185,7 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
177185
yaml_file="test_file_stream_with_filename_extractor.yaml",
178186
)
179187

180-
assert output.records
188+
assert len(output.records) == 1
181189
file_reference = output.records[0].record.file_reference
182190
assert file_reference
183191
assert (
@@ -190,6 +198,61 @@ def test_get_article_attachments_with_filename_extractor(self) -> None:
190198
)
191199
assert file_reference.file_size_bytes
192200

201+
def test_get_article_attachments_messages_for_connector_builder(self) -> None:
202+
with HttpMocker() as http_mocker:
203+
http_mocker.get(
204+
HttpRequest(url=STREAM_URL),
205+
HttpResponse(json.dumps(find_template("file_api/articles", __file__)), 200),
206+
)
207+
http_mocker.get(
208+
HttpRequest(url=STREAM_ATTACHMENTS_URL),
209+
HttpResponse(
210+
json.dumps(find_template("file_api/article_attachments", __file__)), 200
211+
),
212+
)
213+
http_mocker.get(
214+
HttpRequest(url=STREAM_ATTACHMENT_CONTENT_URL),
215+
HttpResponse(
216+
find_binary_response("file_api/article_attachment_content.png", __file__), 200
217+
),
218+
)
219+
220+
# Define a mock factory that forces emit_connector_builder_messages=True
221+
class MockModelToComponentFactory(OriginalModelToComponentFactory):
222+
def __init__(self, *args, **kwargs):
223+
kwargs["emit_connector_builder_messages"] = True
224+
super().__init__(*args, **kwargs)
225+
226+
# Patch the factory class where ConcurrentDeclarativeSource (parent of YamlDeclarativeSource) imports it
227+
with patch(
228+
"airbyte_cdk.sources.declarative.concurrent_declarative_source.ModelToComponentFactory",
229+
new=MockModelToComponentFactory,
230+
):
231+
output = read(
232+
self._config(),
233+
CatalogBuilder()
234+
.with_stream(ConfiguredAirbyteStreamBuilder().with_name("article_attachments"))
235+
.build(),
236+
yaml_file="test_file_stream_with_filename_extractor.yaml",
237+
)
238+
239+
assert len(output.records) == 1
240+
file_reference = output.records[0].record.file_reference
241+
assert file_reference
242+
assert file_reference.staging_file_url
243+
assert file_reference.source_file_relative_path
244+
# because we didn't write the file, the size is NOOP_FILE_SIZE
245+
assert file_reference.file_size_bytes == NoopFileWriter.NOOP_FILE_SIZE
246+
247+
# Assert file reference fields are copied to record data
248+
record_data = output.records[0].record.data
249+
assert record_data["staging_file_url"] == file_reference.staging_file_url
250+
assert (
251+
record_data["source_file_relative_path"]
252+
== file_reference.source_file_relative_path
253+
)
254+
assert record_data["file_size_bytes"] == file_reference.file_size_bytes
255+
193256
def test_discover_article_attachments(self) -> None:
194257
output = discover(self._config())
195258

0 commit comments

Comments
 (0)