Skip to content

feat(PoC): adjust file-based and file uploader component to latest protocol changes. #457

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Changes from all commits
Commits
Show all changes
47 commits
Select commit Hold shift + click to select a range
ac278b9
files-mode-api: initial changes to emit record
aldogonzalez8 Mar 25, 2025
7a02ec8
Add file_based information in discovered catalog (#446)
maxi297 Mar 26, 2025
962ddbe
file-mode-api: make uploader updates record file_reference field
aldogonzalez8 Mar 26, 2025
05bc2cb
file-mode-api: add logging to file creation
aldogonzalez8 Mar 27, 2025
3565455
file-mode-api: fix file_url to be full path
aldogonzalez8 Mar 28, 2025
188f9a5
feat(file-mode-api): move file uploader to record selector level. (#449)
aldogonzalez8 Mar 31, 2025
68480b7
feat(file-mode-api: add filename extractor component (#453)
aldogonzalez8 Mar 31, 2025
6ebfc05
file-mode-api: merge from maxi297/pox-file-upload
aldogonzalez8 Apr 2, 2025
b7bfef5
file-mode-api: initial commit for file based cdk to align with new pr…
aldogonzalez8 Apr 3, 2025
6445211
file-mode-api: merge from maxi297/pox-file-upload
aldogonzalez8 Apr 3, 2025
2cbd065
Auto-fix lint and format issues
Apr 3, 2025
69b7835
file-mode-api: fix lint
aldogonzalez8 Apr 3, 2025
682318e
file-mode-api: bump protocol to latest pre-dev
aldogonzalez8 Apr 4, 2025
8701c27
file-mode-api: add more tests
aldogonzalez8 Apr 4, 2025
9fb3b8c
file-mode-api: limit schema on cdk side with a model
aldogonzalez8 Apr 4, 2025
61ccf7c
file-mode-api: remove unnecesary imports
aldogonzalez8 Apr 4, 2025
aa1f394
file-mode-api: fix for mypy
aldogonzalez8 Apr 4, 2025
abeba93
Auto-fix lint and format issues
Apr 4, 2025
e68873a
file-mode-api: minor fix for schema not having extra fields
aldogonzalez8 Apr 4, 2025
2be82e9
file-mode-api: refactor _get_file_transfer_paths
aldogonzalez8 Apr 5, 2025
8080867
file-mode-api: remove file reference of the FileBasedStreamFacade
aldogonzalez8 Apr 7, 2025
6db0406
file-mode-api: add new file_reference in the FileBasedStreamFacade wh…
aldogonzalez8 Apr 7, 2025
199d99a
file-mode-api: remove_unnecesary absolut_path field from _get_file_tr…
aldogonzalez8 Apr 7, 2025
8920154
file-mode-api: provide ability to source to provide a path extractor …
aldogonzalez8 Apr 7, 2025
cb5884e
file-mode-api: bump to latest version (non-dev) of airbyte-protocol-m…
aldogonzalez8 Apr 7, 2025
fe77b13
file-mode-api: fix lint, andd ruff
aldogonzalez8 Apr 8, 2025
b4ce3fa
file-mode-api: fix for file_uploader as protocol attributes name chan…
aldogonzalez8 Apr 8, 2025
e746feb
Auto-fix lint and format issues
Apr 8, 2025
8d567ac
file-mode-api: add missing test file
aldogonzalez8 Apr 8, 2025
703c268
file-mode-api: add missing file_based key for some scenatios
aldogonzalez8 Apr 8, 2025
18a31d2
file-mode-api: fix tests for new is_file_based field in Stream
aldogonzalez8 Apr 8, 2025
682598b
Auto-fix lint and format issues
Apr 8, 2025
f8179ec
file-mode-api: remove unused fields from file-based schema
aldogonzalez8 Apr 9, 2025
9369f26
fil-mode-api: make file_transfer an internal field and remove unneces…
aldogonzalez8 Apr 14, 2025
c758f3b
merge from main
aldogonzalez8 Apr 14, 2025
9dc319e
fil-mode-api: poetry lock
aldogonzalez8 Apr 14, 2025
a4156fb
file-mode-api: uri make connectors pass the relative path of the file…
aldogonzalez8 Apr 14, 2025
8693830
file-mode-api: ruff format
aldogonzalez8 Apr 14, 2025
8623551
file-mode-api: add doc strings to _get_file_transfer_paths
aldogonzalez8 Apr 14, 2025
a50905b
file-mode-api: add source_uri to schema
aldogonzalez8 Apr 14, 2025
4658790
file-mode-api: fix unit tests
aldogonzalez8 Apr 14, 2025
0ac0d5c
merge from main
aldogonzalez8 Apr 15, 2025
e9474c7
merge from maxi297/poc-file-upload
aldogonzalez8 Apr 15, 2025
f144f98
file-api: add created_at to schema
aldogonzalez8 Apr 16, 2025
21b6413
Feat: New CDK-Native FAST Standard tests, replaces CAT (#349)
aaronsteers Apr 16, 2025
47abb52
merge from main
aldogonzalez8 Apr 16, 2025
3fc844c
poetry lockl
aldogonzalez8 Apr 16, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
@@ -8,8 +8,6 @@
from airbyte_protocol_dataclasses.models import * # noqa: F403 # Allow '*'
from serpyco_rs.metadata import Alias

from airbyte_cdk.models.file_transfer_record_message import AirbyteFileTransferRecordMessage

# ruff: noqa: F405 # ignore fuzzy import issues with 'import *'


@@ -84,7 +82,7 @@ class AirbyteMessage:
spec: Optional[ConnectorSpecification] = None # type: ignore [name-defined]
connectionStatus: Optional[AirbyteConnectionStatus] = None # type: ignore [name-defined]
catalog: Optional[AirbyteCatalog] = None # type: ignore [name-defined]
record: Optional[Union[AirbyteFileTransferRecordMessage, AirbyteRecordMessage]] = None # type: ignore [name-defined]
record: Optional[AirbyteRecordMessage] = None # type: ignore [name-defined]
state: Optional[AirbyteStateMessage] = None
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]
13 changes: 0 additions & 13 deletions airbyte_cdk/models/file_transfer_record_message.py

This file was deleted.

Original file line number Diff line number Diff line change
@@ -149,7 +149,6 @@ def on_record(self, record: Record) -> Iterable[AirbyteMessage]:
message = stream_data_to_airbyte_message(
stream_name=record.stream_name,
data_or_message=record.data,
is_file_transfer_message=record.is_file_transfer_message,
file_reference=record.file_reference,
)
stream = self._stream_name_to_instance[record.stream_name]
4 changes: 2 additions & 2 deletions airbyte_cdk/sources/declarative/retrievers/file_uploader.py
Original file line number Diff line number Diff line change
@@ -83,7 +83,7 @@ def upload(self, record: Record) -> None:
logger.info(f"File relative path: {str(file_relative_path)}")

record.file_reference = AirbyteRecordMessageFileReference(
file_url=str(full_path),
file_relative_path=str(file_relative_path),
staging_file_url=str(full_path),
source_file_relative_path=str(file_relative_path),
file_size_bytes=file_size_bytes,
)
54 changes: 38 additions & 16 deletions airbyte_cdk/sources/file_based/file_based_stream_reader.py
Original file line number Diff line number Diff line change
@@ -8,16 +8,18 @@
from enum import Enum
from io import IOBase
from os import makedirs, path
from typing import Any, Dict, Iterable, List, Optional, Set
from typing import Any, Callable, Iterable, List, MutableMapping, Optional, Set, Tuple

from wcmatch.glob import GLOBSTAR, globmatch

from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.file_based.config.abstract_file_based_spec import AbstractFileBasedSpec
from airbyte_cdk.sources.file_based.config.validate_config_transfer_modes import (
include_identities_stream,
preserve_directory_structure,
use_file_transfer,
)
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
from airbyte_cdk.sources.file_based.remote_file import RemoteFile


@@ -28,6 +30,10 @@ class FileReadMode(Enum):

class AbstractFileBasedStreamReader(ABC):
DATE_TIME_FORMAT = "%Y-%m-%dT%H:%M:%S.%fZ"
FILE_RELATIVE_PATH = "file_relative_path"
FILE_NAME = "file_name"
LOCAL_FILE_PATH = "local_file_path"
FILE_FOLDER = "file_folder"

def __init__(self) -> None:
self._config = None
@@ -148,9 +154,9 @@ def include_identities_stream(self) -> bool:
return False

@abstractmethod
def get_file(
def upload(
self, file: RemoteFile, local_directory: str, logger: logging.Logger
) -> Dict[str, Any]:
) -> Tuple[FileRecordData, AirbyteRecordMessageFileReference]:
"""
This is required for connectors that will support writing to
files. It will handle the logic to download,get,read,acquire or
@@ -162,25 +168,41 @@ def get_file(
logger (logging.Logger): Logger for logging information and errors.

Returns:
dict: A dictionary containing the following:
- "file_url" (str): The absolute path of the downloaded file.
- "bytes" (int): The file size in bytes.
- "file_relative_path" (str): The relative path of the file for local storage. Is relative to local_directory as
this a mounted volume in the pod container.

AirbyteRecordMessageFileReference: A file reference object containing:
- staging_file_url (str): The absolute path to the referenced file in the staging area.
- file_size_bytes (int): The size of the referenced file in bytes.
- source_file_relative_path (str): The relative path to the referenced file in source.
"""
...

def _get_file_transfer_paths(self, file: RemoteFile, local_directory: str) -> List[str]:
def _get_file_transfer_paths(
self, source_file_relative_path: str, staging_directory: str
) -> MutableMapping[str, Any]:
"""
This method is used to get the file transfer paths for a given source file relative path and local directory.
It returns a dictionary with the following keys:
- FILE_RELATIVE_PATH: The relative path to file in reference to the staging directory.
- LOCAL_FILE_PATH: The absolute path to the file.
- FILE_NAME: The name of the referenced file.
- FILE_FOLDER: The folder of the referenced file.
"""
preserve_directory_structure = self.preserve_directory_structure()

file_name = path.basename(source_file_relative_path)
file_folder = path.dirname(source_file_relative_path)
if preserve_directory_structure:
# Remove left slashes from source path format to make relative path for writing locally
file_relative_path = file.uri.lstrip("/")
file_relative_path = source_file_relative_path.lstrip("/")
else:
file_relative_path = path.basename(file.uri)
local_file_path = path.join(local_directory, file_relative_path)

file_relative_path = file_name
local_file_path = path.join(staging_directory, file_relative_path)
# Ensure the local directory exists
makedirs(path.dirname(local_file_path), exist_ok=True)
absolute_file_path = path.abspath(local_file_path)
return [file_relative_path, local_file_path, absolute_file_path]

file_paths = {
self.FILE_RELATIVE_PATH: file_relative_path,
self.LOCAL_FILE_PATH: local_file_path,
self.FILE_NAME: file_name,
self.FILE_FOLDER: file_folder,
}
return file_paths
23 changes: 23 additions & 0 deletions airbyte_cdk/sources/file_based/file_record_data.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
#

from datetime import datetime
from typing import Optional

from pydantic.v1 import BaseModel


class FileRecordData(BaseModel):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe future us problem: Can/Should we allow for additional properties so that sources can add whatever they want? If we do, I think it'll have impact on airbyte_cdk/sources/file_based/schema_helpers.py which seems a bit more challenging as it can't be modified based on the source as it is right now

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can/Should we allow for additional properties so that sources can add whatever they want?

For now, this is not a concern per the discussion in our last meetings with the team.

Users are getting this metadata "for free" (nobody requested it), but if we need to allow it in the future, we could provide an interface in stream reader similar to what we do to model permissions schema.

I agree this would mean maintenance, but we don't have a current use case, so my belief is that we are ok as it is.

cc @girarda @tryangul

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If allowing additional fields is difficult, I'd suggest adding an "additional_properties" field that sources can populate however they want

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Okay, that would make sense. I will write a follow-up issue in the epic for this, as I don't see it as a must for this iteration.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, issue created. I also added created_at to the schema, I feel this should be enough for this iteration.

"""
A record in a file-based stream.
"""

folder: str
filename: str
bytes: int
source_uri: str
id: Optional[str] = None
created_at: Optional[str] = None
updated_at: Optional[str] = None
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we also have a created_at?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, makes sense, I will add it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is now there

mime_type: Optional[str] = None
13 changes: 6 additions & 7 deletions airbyte_cdk/sources/file_based/file_types/file_transfer.py
Original file line number Diff line number Diff line change
@@ -2,11 +2,11 @@
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#
import logging
import os
from typing import Any, Dict, Iterable
from typing import Iterable, Tuple

from airbyte_cdk.sources.file_based.config.file_based_stream_config import FileBasedStreamConfig
from airbyte_cdk.models import AirbyteRecordMessageFileReference
from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFileBasedStreamReader
from airbyte_cdk.sources.file_based.file_record_data import FileRecordData
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
from airbyte_cdk.sources.utils.files_directory import get_files_directory

@@ -15,15 +15,14 @@ class FileTransfer:
def __init__(self) -> None:
self._local_directory = get_files_directory()

def get_file(
def upload(
self,
config: FileBasedStreamConfig,
file: RemoteFile,
stream_reader: AbstractFileBasedStreamReader,
logger: logging.Logger,
) -> Iterable[Dict[str, Any]]:
) -> Iterable[Tuple[FileRecordData, AirbyteRecordMessageFileReference]]:
try:
yield stream_reader.get_file(
yield stream_reader.upload(
file=file, local_directory=self._local_directory, logger=logger
)
except Exception as ex:
12 changes: 11 additions & 1 deletion airbyte_cdk/sources/file_based/schema_helpers.py
Original file line number Diff line number Diff line change
@@ -18,9 +18,19 @@
SchemaType = Mapping[str, Mapping[str, JsonSchemaSupportedType]]

schemaless_schema = {"type": "object", "properties": {"data": {"type": "object"}}}

file_transfer_schema = {
"type": "object",
"properties": {"data": {"type": "object"}, "file": {"type": "object"}},
"properties": {
"folder": {"type": "string"},
"file_name": {"type": "string"},
"source_uri": {"type": "string"},
"bytes": {"type": "integer"},
"id": {"type": ["null", "string"]},
"created_at": {"type": ["null", "string"]},
"updated_at": {"type": ["null", "string"]},
"mime_type": {"type": ["null", "string"]},
},
}


15 changes: 3 additions & 12 deletions airbyte_cdk/sources/file_based/stream/concurrent/adapters.py
Original file line number Diff line number Diff line change
@@ -4,7 +4,7 @@

import copy
import logging
from functools import cache, lru_cache
from functools import lru_cache
from typing import TYPE_CHECKING, Any, Iterable, List, Mapping, MutableMapping, Optional, Union

from typing_extensions import deprecated
@@ -258,19 +258,14 @@ def read(self) -> Iterable[Record]:
and record_data.record is not None
):
# `AirbyteMessage`s of type `Record` should also be yielded so they are enqueued
# If stream is flagged for file_transfer the record should data in file key
record_message_data = (
record_data.record.file
if self._use_file_transfer()
else record_data.record.data
)
record_message_data = record_data.record.data
if not record_message_data:
raise ExceptionWithDisplayMessage("A record without data was found")
else:
yield Record(
data=record_message_data,
stream_name=self.stream_name(),
is_file_transfer_message=self._use_file_transfer(),
file_reference=record_data.record.file_reference,
)
else:
self._message_repository.emit_message(record_data)
@@ -306,10 +301,6 @@ def __hash__(self) -> int:
def stream_name(self) -> str:
return self._stream.name

@cache
def _use_file_transfer(self) -> bool:
return hasattr(self._stream, "use_file_transfer") and self._stream.use_file_transfer

def __repr__(self) -> str:
return f"FileBasedStreamPartition({self._stream.name}, {self._slice})"

53 changes: 15 additions & 38 deletions airbyte_cdk/sources/file_based/stream/default_file_based_stream.py
Original file line number Diff line number Diff line change
@@ -11,7 +11,7 @@
from os import path
from typing import Any, Dict, Iterable, List, Mapping, MutableMapping, Optional, Set, Tuple, Union

from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, FailureType, Level
from airbyte_cdk.models import AirbyteLogMessage, AirbyteMessage, AirbyteStream, FailureType, Level
from airbyte_cdk.models import Type as MessageType
from airbyte_cdk.sources.file_based.config.file_based_stream_config import PrimaryKeyType
from airbyte_cdk.sources.file_based.exceptions import (
@@ -56,6 +56,7 @@ class DefaultFileBasedStream(AbstractFileBasedStream, IncrementalMixin):
airbyte_columns = [ab_last_mod_col, ab_file_name_col]
use_file_transfer = False
preserve_directory_structure = True
_file_transfer = FileTransfer()

def __init__(self, **kwargs: Any):
if self.FILE_TRANSFER_KW in kwargs:
@@ -93,21 +94,6 @@ def primary_key(self) -> PrimaryKeyType:
self.config
)

def _filter_schema_invalid_properties(
self, configured_catalog_json_schema: Dict[str, Any]
) -> Dict[str, Any]:
if self.use_file_transfer:
return {
"type": "object",
"properties": {
"file_path": {"type": "string"},
"file_size": {"type": "string"},
self.ab_file_name_col: {"type": "string"},
},
}
else:
return super()._filter_schema_invalid_properties(configured_catalog_json_schema)

def _duplicated_files_names(
self, slices: List[dict[str, List[RemoteFile]]]
) -> List[dict[str, List[str]]]:
@@ -145,14 +131,6 @@ def transform_record(
record[self.ab_file_name_col] = file.uri
return record

def transform_record_for_file_transfer(
self, record: dict[str, Any], file: RemoteFile
) -> dict[str, Any]:
# timstamp() returns a float representing the number of seconds since the unix epoch
record[self.modified] = int(file.last_modified.timestamp()) * 1000
record[self.source_file_url] = file.uri
return record

def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[AirbyteMessage]:
"""
Yield all records from all remote files in `list_files_for_this_sync`.
@@ -173,19 +151,13 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte

try:
if self.use_file_transfer:
self.logger.info(f"{self.name}: {file} file-based syncing")
# todo: complete here the code to not rely on local parser
file_transfer = FileTransfer()
for record in file_transfer.get_file(
self.config, file, self.stream_reader, self.logger
for file_record_data, file_reference in self._file_transfer.upload(
file=file, stream_reader=self.stream_reader, logger=self.logger
):
line_no += 1
if not self.record_passes_validation_policy(record):
n_skipped += 1
continue
record = self.transform_record_for_file_transfer(record, file)
yield stream_data_to_airbyte_message(
self.name, record, is_file_transfer_message=True
self.name,
file_record_data.dict(exclude_none=True),
file_reference=file_reference,
)
else:
for record in parser.parse_records(
@@ -259,6 +231,8 @@ def cursor_field(self) -> Union[str, List[str]]:

@cache
def get_json_schema(self) -> JsonSchema:
if self.use_file_transfer:
return file_transfer_schema
extra_fields = {
self.ab_last_mod_col: {"type": "string"},
self.ab_file_name_col: {"type": "string"},
@@ -282,9 +256,7 @@ def get_json_schema(self) -> JsonSchema:
return {"type": "object", "properties": {**extra_fields, **schema["properties"]}}

def _get_raw_json_schema(self) -> JsonSchema:
if self.use_file_transfer:
return file_transfer_schema
elif self.config.input_schema:
if self.config.input_schema:
return self.config.get_input_schema() # type: ignore
elif self.config.schemaless:
return schemaless_schema
@@ -341,6 +313,11 @@ def get_files(self) -> Iterable[RemoteFile]:
self.config.globs or [], self.config.legacy_prefix, self.logger
)

def as_airbyte_stream(self) -> AirbyteStream:
file_stream = super().as_airbyte_stream()
file_stream.is_file_based = self.use_file_transfer
return file_stream

def infer_schema(self, files: List[RemoteFile]) -> Mapping[str, Any]:
loop = asyncio.get_event_loop()
schema = loop.run_until_complete(self._infer_schema(files))
Original file line number Diff line number Diff line change
@@ -61,9 +61,7 @@ def read_records_from_slice(self, stream_slice: StreamSlice) -> Iterable[Airbyte
permissions_record = self.transform_record(
permissions_record, file, file_datetime_string
)
yield stream_data_to_airbyte_message(
self.name, permissions_record, is_file_transfer_message=False
)
yield stream_data_to_airbyte_message(self.name, permissions_record)
except Exception as e:
self.logger.error(f"Failed to retrieve permissions for file {file.uri}: {str(e)}")
yield AirbyteMessage(
Loading