Skip to content

Commit b5b8197

Browse files
aldogonzalez8maxi297octavia-squidington-iiiaaronsteersdevin-ai-integration[bot]
authored
feat(PoC): adjust file-based and file uploader component to latest protocol changes. (#457)
Co-authored-by: Maxime Carbonneau-Leclerc <[email protected]> Co-authored-by: octavia-squidington-iii <[email protected]> Co-authored-by: Aaron ("AJ") Steers <[email protected]> Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent 4c29c91 commit b5b8197

File tree

59 files changed

+1397
-276
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

59 files changed

+1397
-276
lines changed

airbyte_cdk/models/airbyte_protocol.py

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,8 +8,6 @@
88
from airbyte_protocol_dataclasses.models import * # noqa: F403 # Allow '*'
99
from serpyco_rs.metadata import Alias
1010

11-
from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage
12-
1311
# ruff: noqa: F405 # ignore fuzzy import issues with 'import *'
1412

1513

@@ -84,7 +82,7 @@ class AirbyteMessage:
8482
spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined]
8583
connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined]
8684
catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined]
87-
record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined]
85+
record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined]
8886
state: Optional[AirbyteStateMessage] = None
8987
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
9088
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]

airbyte_cdk/models/file_transfer_record_message.py

Lines changed: 0 additions & 13 deletions
This file was deleted.

airbyte_cdk/sources/concurrent_source/concurrent_read_processor.py

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -149,7 +149,6 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
149149
message = stream_data_to_airbyte_message(
150150
stream_name=record.stream_name,
151151
data_or_message=record.data,
152-
is_file_transfer_message=record.is_file_transfer_message,
153152
file_reference=record.file_reference,
154153
)
155154
stream = self._stream_name_to_instance[record.stream_name]

airbyte_cdk/sources/declarative/retrievers/file_uploader.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,7 @@ def upload(self, record: Record) -> None:
8383
logger.info(f"File relative path: {str(file_relative_path)}")
8484

8585
record.file_reference = AirbyteRecordMessageFileReference(
86-
file_url=str(full_path),
87-
file_relative_path=str(file_relative_path),
86+
staging_file_url=str(full_path),
87+
source_file_relative_path=str(file_relative_path),
8888
file_size_bytes=file_size_bytes,
8989
)

airbyte_cdk/sources/file_based/file_based_stream_reader.py

Lines changed: 38 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,18 @@
88
from enum import Enum
99
from io import IOBase
1010
from os import makedirs, path
11-
from typing import Any, Dict, Iterable, List, Optional, Set
11+
from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple
1212

1313
from wcmatch.glob import GLOBSTAR, globmatch
1414

15+
from airbyte_cdk.models import AirbyteRecordMessageFileReference
1516
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
1617
from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import (
1718
include_identities_stream,
1819
preserve_directory_structure,
1920
use_file_transfer,
2021
)
22+
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
2123
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
2224

2325

@@ -28,6 +30,10 @@ class FileReadMode(Enum):
2830

2931
class AbstractFileBasedStreamReader(ABC):
3032
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
33+
FILE_RELATIVE_PATH = "file_relative_path"
34+
FILE_NAME = "file_name"
35+
LOCAL_FILE_PATH = "local_file_path"
36+
FILE_FOLDER = "file_folder"
3137

3238
def __init__(self) -> None:
3339
self._config = None
@@ -148,9 +154,9 @@ def include_identities_stream(self) -> bool:
148154
return False
149155

150156
@abstractmethod
151-
def get_file(
157+
def upload(
152158
self, file: RemoteFile, local_directory: str, logger: logging.Logger
153-
) -> Dict[str, Any]:
159+
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
154160
"""
155161
This is required for connectors that will support writing to
156162
files. It will handle the logic to download,get,read,acquire or
@@ -162,25 +168,41 @@ def get_file(
162168
logger (logging.Logger): Logger for logging information and errors.
163169
164170
Returns:
165-
dict: A dictionary containing the following:
166-
- "file_url" (str): The absolute path of the downloaded file.
167-
- "bytes" (int): The file size in bytes.
168-
- "file_relative_path" (str): The relative path of the file for local storage. Is relative to local_directory as
169-
this a mounted volume in the pod container.
170-
171+
AirbyteRecordMessageFileReference: A file reference object containing:
172+
- staging_file_url (str): The absolute path to the referenced file in the staging area.
173+
- file_size_bytes (int): The size of the referenced file in bytes.
174+
- source_file_relative_path (str): The relative path to the referenced file in source.
171175
"""
172176
...
173177

174-
def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> List[str]:
178+
def _get_file_transfer_paths(
179+
self, source_file_relative_path: str, staging_directory: str
180+
) -> MutableMapping[str, Any]:
181+
"""
182+
This method is used to get the file transfer paths for a given source file relative path and local directory.
183+
It returns a dictionary with the following keys:
184+
- FILE_RELATIVE_PATH: The relative path to file in reference to the staging directory.
185+
- LOCAL_FILE_PATH: The absolute path to the file.
186+
- FILE_NAME: The name of the referenced file.
187+
- FILE_FOLDER: The folder of the referenced file.
188+
"""
175189
preserve_directory_structure = self.preserve_directory_structure()
190+
191+
file_name = path.basename(source_file_relative_path)
192+
file_folder = path.dirname(source_file_relative_path)
176193
if preserve_directory_structure:
177194
# Remove left slashes from source path format to make relative path for writing locally
178-
file_relative_path = file.uri.lstrip("/")
195+
file_relative_path = source_file_relative_path.lstrip("/")
179196
else:
180-
file_relative_path = path.basename(file.uri)
181-
local_file_path = path.join(local_directory, file_relative_path)
182-
197+
file_relative_path = file_name
198+
local_file_path = path.join(staging_directory, file_relative_path)
183199
# Ensure the local directory exists
184200
makedirs(path.dirname(local_file_path), exist_ok=True)
185-
absolute_file_path = path.abspath(local_file_path)
186-
return [file_relative_path, local_file_path, absolute_file_path]
201+
202+
file_paths = {
203+
self.FILE_RELATIVE_PATH: file_relative_path,
204+
self.LOCAL_FILE_PATH: local_file_path,
205+
self.FILE_NAME: file_name,
206+
self.FILE_FOLDER: file_folder,
207+
}
208+
return file_paths
Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
#
2+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
3+
#
4+
5+
from datetime import datetime
6+
from typing import Optional
7+
8+
from pydantic.v1 import BaseModel
9+
10+
11+
class FileRecordData(BaseModel):
12+
"""
13+
A record in a file-based stream.
14+
"""
15+
16+
folder: str
17+
filename: str
18+
bytes: int
19+
source_uri: str
20+
id: Optional[str] = None
21+
created_at: Optional[str] = None
22+
updated_at: Optional[str] = None
23+
mime_type: Optional[str] = None

airbyte_cdk/sources/file_based/file_types/file_transfer.py

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,11 +2,11 @@
22
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
33
#
44
import logging
5-
import os
6-
from typing import Any, Dict, Iterable
5+
from typing import Iterable, Tuple
76

8-
from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
7+
from airbyte_cdk.models import AirbyteRecordMessageFileReference
98
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
9+
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
1010
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
1111
from airbyte_cdk.sources.utils.files_directory import get_files_directory
1212

@@ -15,15 +15,14 @@ class FileTransfer:
1515
def __init__(self) -> None:
1616
self._local_directory = get_files_directory()
1717

18-
def get_file(
18+
def upload(
1919
self,
20-
config: FileBasedStreamConfig,
2120
file: RemoteFile,
2221
stream_reader: AbstractFileBasedStreamReader,
2322
logger: logging.Logger,
24-
) -> Iterable[Dict[str, Any]]:
23+
) -> Iterable[Tuple[FileRecordData, AirbyteRecordMessageFileReference]]:
2524
try:
26-
yield stream_reader.get_file(
25+
yield stream_reader.upload(
2726
file=file, local_directory=self._local_directory, logger=logger
2827
)
2928
except Exception as ex:

airbyte_cdk/sources/file_based/schema_helpers.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,9 +18,19 @@
1818
SchemaType = Mapping[str, Mapping[str, JsonSchemaSupportedType]]
1919

2020
schemaless_schema = {"type": "object", "properties": {"data": {"type": "object"}}}
21+
2122
file_transfer_schema = {
2223
"type": "object",
23-
"properties": {"data": {"type": "object"}, "file": {"type": "object"}},
24+
"properties": {
25+
"folder": {"type": "string"},
26+
"file_name": {"type": "string"},
27+
"source_uri": {"type": "string"},
28+
"bytes": {"type": "integer"},
29+
"id": {"type": ["null", "string"]},
30+
"created_at": {"type": ["null", "string"]},
31+
"updated_at": {"type": ["null", "string"]},
32+
"mime_type": {"type": ["null", "string"]},
33+
},
2434
}
2535

2636

airbyte_cdk/sources/file_based/stream/concurrent/adapters.py

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44

55
import copy
66
import logging
7-
from functools import cache, lru_cache
7+
from functools import lru_cache
88
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union
99

1010
from typing_extensions import deprecated
@@ -258,19 +258,14 @@ def read(self) -> Iterable[Record]:
258258
and record_data.record is not None
259259
):
260260
# `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
261-
# If stream is flagged for file_transfer the record should data in file key
262-
record_message_data = (
263-
record_data.record.file
264-
if self._use_file_transfer()
265-
else record_data.record.data
266-
)
261+
record_message_data = record_data.record.data
267262
if not record_message_data:
268263
raise ExceptionWithDisplayMessage("A record without data was found")
269264
else:
270265
yield Record(
271266
data=record_message_data,
272267
stream_name=self.stream_name(),
273-
is_file_transfer_message=self._use_file_transfer(),
268+
file_reference=record_data.record.file_reference,
274269
)
275270
else:
276271
self._message_repository.emit_message(record_data)
@@ -306,10 +301,6 @@ def __hash__(self) -> int:
306301
def stream_name(self) -> str:
307302
return self._stream.name
308303

309-
@cache
310-
def _use_file_transfer(self) -> bool:
311-
return hasattr(self._stream, "use_file_transfer") and self._stream.use_file_transfer
312-
313304
def __repr__(self) -> str:
314305
return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"
315306

airbyte_cdk/sources/file_based/stream/default_file_based_stream.py

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
from os import path
1212
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union
1313

14-
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, FailureType, Level
14+
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, FailureType, Level
1515
from airbyte_cdk.models import Type as MessageType
1616
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
1717
from airbyte_cdk.sources.file_based.exceptions import (
@@ -56,6 +56,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
5656
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
5757
use_file_transfer = False
5858
preserve_directory_structure = True
59+
_file_transfer = FileTransfer()
5960

6061
def __init__(self, **kwargs: Any):
6162
if self.FILE_TRANSFER_KW in kwargs:
@@ -93,21 +94,6 @@ def primary_key(self) -> PrimaryKeyType:
9394
self.config
9495
)
9596

96-
def _filter_schema_invalid_properties(
97-
self, configured_catalog_json_schema: Dict[str, Any]
98-
) -> Dict[str, Any]:
99-
if self.use_file_transfer:
100-
return {
101-
"type": "object",
102-
"properties": {
103-
"file_path": {"type": "string"},
104-
"file_size": {"type": "string"},
105-
self.ab_file_name_col: {"type": "string"},
106-
},
107-
}
108-
else:
109-
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)
110-
11197
def _duplicated_files_names(
11298
self, slices: List[dict[str, List[RemoteFile]]]
11399
) -> List[dict[str, List[str]]]:
@@ -145,14 +131,6 @@ def transform_record(
145131
record[self.ab_file_name_col] = file.uri
146132
return record
147133

148-
def transform_record_for_file_transfer(
149-
self, record: dict[str, Any], file: RemoteFile
150-
) -> dict[str, Any]:
151-
# timstamp() returns a float representing the number of seconds since the unix epoch
152-
record[self.modified] = int(file.last_modified.timestamp()) * 1000
153-
record[self.source_file_url] = file.uri
154-
return record
155-
156134
def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
157135
"""
158136
Yield all records from all remote files in `list_files_for_this_sync`.
@@ -173,19 +151,13 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
173151

174152
try:
175153
if self.use_file_transfer:
176-
self.logger.info(f"{self.name}: {file} file-based syncing")
177-
# todo: complete here the code to not rely on local parser
178-
file_transfer = FileTransfer()
179-
for record in file_transfer.get_file(
180-
self.config, file, self.stream_reader, self.logger
154+
for file_record_data, file_reference in self._file_transfer.upload(
155+
file=file, stream_reader=self.stream_reader, logger=self.logger
181156
):
182-
line_no += 1
183-
if not self.record_passes_validation_policy(record):
184-
n_skipped += 1
185-
continue
186-
record = self.transform_record_for_file_transfer(record, file)
187157
yield stream_data_to_airbyte_message(
188-
self.name, record, is_file_transfer_message=True
158+
self.name,
159+
file_record_data.dict(exclude_none=True),
160+
file_reference=file_reference,
189161
)
190162
else:
191163
for record in parser.parse_records(
@@ -259,6 +231,8 @@ def cursor_field(self) -> Union[str, List[str]]:
259231

260232
@cache
261233
def get_json_schema(self) -> JsonSchema:
234+
if self.use_file_transfer:
235+
return file_transfer_schema
262236
extra_fields = {
263237
self.ab_last_mod_col: {"type": "string"},
264238
self.ab_file_name_col: {"type": "string"},
@@ -282,9 +256,7 @@ def get_json_schema(self) -> JsonSchema:
282256
return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}
283257

284258
def _get_raw_json_schema(self) -> JsonSchema:
285-
if self.use_file_transfer:
286-
return file_transfer_schema
287-
elif self.config.input_schema:
259+
if self.config.input_schema:
288260
return self.config.get_input_schema() # type: ignore
289261
elif self.config.schemaless:
290262
return schemaless_schema
@@ -341,6 +313,11 @@ def get_files(self) -> Iterable[RemoteFile]:
341313
self.config.globs or [], self.config.legacy_prefix, self.logger
342314
)
343315

316+
def as_airbyte_stream(self) -> AirbyteStream:
317+
file_stream = super().as_airbyte_stream()
318+
file_stream.is_file_based = self.use_file_transfer
319+
return file_stream
320+
344321
def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
345322
loop = asyncio.get_event_loop()
346323
schema = loop.run_until_complete(self._infer_schema(files))

0 commit comments

Comments
 (0)