Skip to content

feat: destination discover PoC and adding sync modes #527

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 22 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
128 changes: 78 additions & 50 deletions airbyte_cdk/destinations/destination.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,24 +10,93 @@
from typing import Any, Iterable, List, Mapping

import orjson
from airbyte_protocol_dataclasses.models import (
AirbyteMessage,
ConfiguredAirbyteCatalog,
DestinationCatalog,
Type,
)

from airbyte_cdk.connector import Connector
from airbyte_cdk.exception_handler import init_uncaught_exception_handler
from airbyte_cdk.models import (
AirbyteMessage,
AirbyteMessageSerializer,
ConfiguredAirbyteCatalog,
ConfiguredAirbyteCatalogSerializer,
Type,
)
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit
from airbyte_cdk.utils.traced_exception import AirbyteTracedException

logger = logging.getLogger("airbyte")


def parse_args(args: List[str]) -> argparse.Namespace:
"""
:param args: commandline arguments
:return:
"""

parent_parser = argparse.ArgumentParser(add_help=False)
parent_parser.add_argument(
"--debug", action="store_true", help="enables detailed debug logs related to the sync"
)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")

# spec
subparsers.add_parser(
"spec", help="outputs the json configuration specification", parents=[parent_parser]
)

# check
check_parser = subparsers.add_parser(
"check", help="checks the config can be used to connect", parents=[parent_parser]
)
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument(
"--config", type=str, required=True, help="path to the json configuration file"
)

# discover
discover_parser = subparsers.add_parser(
"discover",
help="discover the objects available in the destination",
parents=[parent_parser],
)
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument(
"--config", type=str, required=True, help="path to the json configuration file"
)

# write
write_parser = subparsers.add_parser(
"write", help="Writes data to the destination", parents=[parent_parser]
)
write_required = write_parser.add_argument_group("required named arguments")
write_required.add_argument(
"--config", type=str, required=True, help="path to the JSON configuration file"
)
write_required.add_argument(
"--catalog", type=str, required=True, help="path to the configured catalog JSON file"
)

parsed_args = main_parser.parse_args(args)
cmd = parsed_args.command
if not cmd:
raise Exception("No command entered. ")
elif cmd not in ["spec", "check", "discover", "write"]:
# This is technically dead code since parse_args() would fail if this was the case
# But it's non-obvious enough to warrant placing it here anyways
raise Exception(f"Unknown command entered: {cmd}")

return parsed_args


class Destination(Connector, ABC):
VALID_CMDS = {"spec", "check", "write"}
VALID_CMDS = {"spec", "check", "discover", "write"}

def discover(self) -> DestinationCatalog:
"""Implement to define what objects are available in the destination"""
raise NotImplementedError("Discover method is not implemented")

@abstractmethod
def write(
Expand Down Expand Up @@ -68,52 +137,9 @@ def _run_write(
)
logger.info("Writing complete.")

def parse_args(self, args: List[str]) -> argparse.Namespace:
"""
:param args: commandline arguments
:return:
"""

parent_parser = argparse.ArgumentParser(add_help=False)
main_parser = argparse.ArgumentParser()
subparsers = main_parser.add_subparsers(title="commands", dest="command")

# spec
subparsers.add_parser(
"spec", help="outputs the json configuration specification", parents=[parent_parser]
)

# check
check_parser = subparsers.add_parser(
"check", help="checks the config can be used to connect", parents=[parent_parser]
)
required_check_parser = check_parser.add_argument_group("required named arguments")
required_check_parser.add_argument(
"--config", type=str, required=True, help="path to the json configuration file"
)

# write
write_parser = subparsers.add_parser(
"write", help="Writes data to the destination", parents=[parent_parser]
)
write_required = write_parser.add_argument_group("required named arguments")
write_required.add_argument(
"--config", type=str, required=True, help="path to the JSON configuration file"
)
write_required.add_argument(
"--catalog", type=str, required=True, help="path to the configured catalog JSON file"
)

parsed_args = main_parser.parse_args(args)
cmd = parsed_args.command
if not cmd:
raise Exception("No command entered. ")
elif cmd not in ["spec", "check", "write"]:
# This is technically dead code since parse_args() would fail if this was the case
# But it's non-obvious enough to warrant placing it here anyways
raise Exception(f"Unknown command entered: {cmd}")

return parsed_args
@staticmethod
def parse_args(args: List[str]) -> argparse.Namespace:
return parse_args(args)
Comment on lines +140 to +142
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In terms of patterns, I think my preference here and proposed best best practice (eventually) is that we make this a class method on the base connector class. I would call it "launch" and then every connector class could be able to invoke itself. In theory, all sources and destinations could share the same implementation across connectors of the same type, and unless the connector needs something special (a code smell anyway), the "cls" input arg should be all you need in order to instantiate a connector using CLI args

I don't think we need to tackle all the scope here in this PR, but your implementation is already very close to what I think is the ideal state, so I'll mention it here as a non-blocking proposal.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That makes sense to me. I started diverging a bit from the current solution because the run_cmd it is not static so we need to instantiate the object before calling it but the way we instantiate the destination depends on how the method of the protocal that is called (see this piece of code for an example).

So do we agree that the launch method would be static? If so, I think we are moving the right direction. I can start adding this in later changes. If not, I'll need more information about what you had in mind.

Copy link
Contributor

@aaronsteers aaronsteers May 6, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - Difference b/w static method and class method for this use case is just the a class method knows what class it is (static methods don't get a cls input), and that class method implementations can be inherited by subclasses. So, yes, agreed method is static in a sense that it doesn't need the object, but in order to not need to implement on each class, I think making a class method is slightly cleaner in the long run. Sorry if I'm not good at explaining this, and please don't block on my comment - either way, it's a step in the right direction, I think, if the class knows how to instantiate itself, without needing an external class or function in order to be instantiated. 👍

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@maxi297 - Here's the early implementation I wrote for S3 a while back...

    @classmethod
    def launch(cls, args: list[str] | None = None) -> None:
        """Launch the source using the provided CLI args.

        If no args are provided, the launch args will be inferred automatically.

        In the future, we should consider moving this method to the Connector base class,
        so that all sources and destinations can launch themselves and so none of this
        code needs to live in the connector itself.
        """
        args = args or sys.argv[1:]
        catalog_path = AirbyteEntrypoint.extract_catalog(args)
        # TODO: Delete if not needed:
        # config_path = AirbyteEntrypoint.extract_config(args)
        # state_path = AirbyteEntrypoint.extract_state(args)

        source = cls.create(
            configured_catalog_path=Path(catalog_path) if catalog_path else None,
        )
        # The following function will wrap the execution in proper error handling.
        # Failures prior to here may not emit proper Airbyte TRACE or CONNECTION_STATUS messages.
        launch(
            source=source,
            args=args,
        )

This wasn't a very good implementation, but you can see the generic cls.create() ref.

Where cls.create() was defined as:

    @classmethod
    def create(
        cls,
        *,
        configured_catalog_path: Path | str | None = None,
    ) -> SourceS3:
        """Create a new instance of the source.
        ...
        # A bunch of hacky stuff here.
        ...
        return cls(
            # These are the defaults for the source. No need for a caller to change them:
            stream_reader=SourceS3StreamReader(),
            spec_class=Config,
            cursor_cls=Cursor,
            # This is needed early. (We also will provide it again later.)
            catalog=configured_catalog,
            # These will be provided later, after we have wrapped proper error handling.
            config=None,
            state=None,
        )

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another problem with our current design is that we ask for config, catalog, etc in the constructor, but then pass it again during invocation of 'check', 'discover', etc. I think it would be better to not need these in the constructor at all, in which case we greatly simplify the process of creating a connector class, and we put all code that can fail in a code path that can properly message about any failures.

Again - hopefully this is helpful for long-term thinking, but I don't think it needs to block the current PR.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

classmethod makes sense, yes!

And I would prefer to have these as part of the constructor personally as if we don't, each method will need to instantiate the object it needs every time is called or have some kind on intelligent accessor which would check if the field has already been instantiated and if not instantiate it. It feels complex instead of just instantiating the connector properly in launch. WDYT?


def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:
cmd = parsed_args.command
Expand All @@ -137,6 +163,8 @@ def run_cmd(self, parsed_args: argparse.Namespace) -> Iterable[AirbyteMessage]:

if cmd == "check":
yield self._run_check(config=config)
elif cmd == "discover":
yield AirbyteMessage(type=Type.DESTINATION_CATALOG, destination_catalog=self.discover())
elif cmd == "write":
# Wrap in UTF-8 to override any other input encodings
wrapped_stdin = io.TextIOWrapper(sys.stdin.buffer, encoding="utf-8")
Expand Down
6 changes: 3 additions & 3 deletions airbyte_cdk/logger.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,14 +8,14 @@
from typing import Any, Callable, Mapping, Optional, Tuple

import orjson

from airbyte_cdk.models import (
from airbyte_protocol_dataclasses.models import (
AirbyteLogMessage,
AirbyteMessage,
AirbyteMessageSerializer,
Level,
Type,
)

from airbyte_cdk.models import AirbyteMessageSerializer
from airbyte_cdk.utils import PrintBuffer
from airbyte_cdk.utils.airbyte_secrets_utils import filter_secrets

Expand Down
2 changes: 2 additions & 0 deletions airbyte_cdk/models/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,8 @@
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
ConnectorSpecification,
DestinationCatalog,
DestinationOperation,
DestinationSyncMode,
EstimateType,
FailureType,
Expand Down
5 changes: 5 additions & 0 deletions airbyte_cdk/models/airbyte_protocol.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,10 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

"""
This file is necessary because the `AirbyteStateBlob` implementation in the protocol lib is incomplete and given we use the incomplete implementation, we will get `TypeError: AirbyteStateBlob.__init__() takes 1 positional argument but 2 were given`. Hence, we need to redefine all the classes that could serialize AirbyteStateBlob to use the CDK implementation, not the protocol lib one.
"""

from dataclasses import InitVar, dataclass
from typing import Annotated, Any, Dict, List, Mapping, Optional, Union

Expand Down Expand Up @@ -86,3 +90,4 @@ class AirbyteMessage:
state: Optional[AirbyteStateMessage] = None
trace: Optional[AirbyteTraceMessage] = None # type: ignore [name-defined]
control: Optional[AirbyteControlMessage] = None # type: ignore [name-defined]
destination_catalog: Optional[DestinationCatalog] = None # type: ignore [name-defined]
61 changes: 49 additions & 12 deletions airbyte_cdk/sources/declarative/schema/dynamic_schema_loader.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,10 @@
#
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
#


from abc import ABC, abstractmethod
from copy import deepcopy
from dataclasses import InitVar, dataclass, field
from typing import Any, List, Mapping, MutableMapping, Optional, Union
from typing import Any, Dict, List, Mapping, MutableMapping, Optional, Union

import dpath
from typing_extensions import deprecated
Expand All @@ -16,7 +15,7 @@
from airbyte_cdk.sources.declarative.schema.schema_loader import SchemaLoader
from airbyte_cdk.sources.declarative.transformations import RecordTransformation
from airbyte_cdk.sources.source import ExperimentalClassWarning
from airbyte_cdk.sources.types import Config, StreamSlice, StreamState
from airbyte_cdk.sources.types import Config

AIRBYTE_DATA_TYPES: Mapping[str, MutableMapping[str, Any]] = {
"string": {"type": ["null", "string"]},
Expand Down Expand Up @@ -114,6 +113,38 @@ def _update_pointer(
)


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
},
<...>
}
```
... will look like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
"toto": "tata"
},
<...>
}
```
"""

@abstractmethod
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Infers additional property fields from the given property definition.
"""
pass

Comment on lines +116 to +146
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Consider making the inferrer contract read-only

infer() receives a mutable property_definition. Accidental in-place edits by an inferrer could leak into the final schema.
Would switching the parameter type from MutableMapping to Mapping (and passing a copy) help protect against that?

-    def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
+    def infer(self, property_definition: Mapping[str, Any]) -> Mapping[str, Any]:

That way implementers must return a new dict, reducing side-effects – wdyt?

📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
},
<...>
}
```
... will look like this:
```
"properties": {
"Id": {
"type": ["null", "string"],
"toto": "tata"
},
<...>
}
```
"""
@abstractmethod
def infer(self, property_definition: MutableMapping[str, Any]) -> MutableMapping[str, Any]:
"""
Infers additional property fields from the given property definition.
"""
pass
@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
class AdditionalPropertyFieldsInferrer(ABC):
"""
Infers additional fields to be added to each property. For example, if this inferrer returns {"toto": "tata"}, a property that would have looked like this:


@deprecated("This class is experimental. Use at your own risk.", category=ExperimentalClassWarning)
@dataclass
class DynamicSchemaLoader(SchemaLoader):
Expand All @@ -126,6 +157,8 @@ class DynamicSchemaLoader(SchemaLoader):
parameters: InitVar[Mapping[str, Any]]
schema_type_identifier: SchemaTypeIdentifier
schema_transformations: List[RecordTransformation] = field(default_factory=lambda: [])
additional_property_fields_inferrer: Optional[AdditionalPropertyFieldsInferrer] = None
allow_additional_properties: bool = True

def get_json_schema(self) -> Mapping[str, Any]:
"""
Expand All @@ -149,22 +182,26 @@ def get_json_schema(self) -> Mapping[str, Any]:
property_definition,
self.schema_type_identifier.type_pointer,
)

value.update(
self.additional_property_fields_inferrer.infer(property_definition)
if self.additional_property_fields_inferrer
else {}
)
properties[key] = value

transformed_properties = self._transform(properties, {})
transformed_properties = self._transform(properties)

return {
"$schema": "https://json-schema.org/draft-07/schema#",
"type": "object",
"additionalProperties": True,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💎 Nice. Appreciate that we're following JSON Schema standards for communicating this. 👍

"additionalProperties": self.allow_additional_properties,
"properties": transformed_properties,
}

def _transform(
self,
properties: Mapping[str, Any],
stream_state: StreamState,
stream_slice: Optional[StreamSlice] = None,
) -> Mapping[str, Any]:
for transformation in self.schema_transformations:
transformation.transform(
Expand All @@ -190,7 +227,7 @@ def _get_type(
self,
raw_schema: MutableMapping[str, Any],
field_type_path: Optional[List[Union[InterpolatedString, str]]],
) -> Union[Mapping[str, Any], List[Mapping[str, Any]]]:
) -> Dict[str, Any]:
"""
Determines the JSON Schema type for a field, supporting nullable and combined types.
"""
Expand Down Expand Up @@ -220,7 +257,7 @@ def _get_type(
f"Invalid data type. Available string or two items list of string. Got {mapped_field_type}."
)

def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Mapping[str, Any]:
def _resolve_complex_type(self, complex_type: ComplexFieldType) -> Dict[str, Any]:
if not complex_type.items:
return self._get_airbyte_type(complex_type.field_type)

Expand Down Expand Up @@ -255,14 +292,14 @@ def _replace_type_if_not_valid(
return field_type

@staticmethod
def _get_airbyte_type(field_type: str) -> MutableMapping[str, Any]:
def _get_airbyte_type(field_type: str) -> Dict[str, Any]:
"""
Maps a field type to its corresponding Airbyte type definition.
"""
if field_type not in AIRBYTE_DATA_TYPES:
raise ValueError(f"Invalid Airbyte data type: {field_type}")

return deepcopy(AIRBYTE_DATA_TYPES[field_type])
return deepcopy(AIRBYTE_DATA_TYPES[field_type]) # type: ignore # a copy of a dict should be a dict, not a MutableMapping

def _extract_data(
self,
Expand Down
16 changes: 14 additions & 2 deletions airbyte_cdk/test/catalog_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

from typing import Any, Dict, List, Union, overload

from airbyte_protocol_dataclasses.models import DestinationSyncMode

from airbyte_cdk.models import (
ConfiguredAirbyteCatalog,
ConfiguredAirbyteStream,
Expand All @@ -19,7 +21,7 @@ def __init__(self) -> None:
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_primary_key": [["id"]],
},
"primary_key": [["id"]],
"primary_key": None,
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
}
Expand All @@ -32,6 +34,16 @@ def with_sync_mode(self, sync_mode: SyncMode) -> "ConfiguredAirbyteStreamBuilder
self._stream["sync_mode"] = sync_mode.name
return self

def with_destination_sync_mode(
self, sync_mode: DestinationSyncMode
) -> "ConfiguredAirbyteStreamBuilder":
self._stream["destination_sync_mode"] = sync_mode.name
return self

def with_destination_object_name(self, name: str) -> "ConfiguredAirbyteStreamBuilder":
self._stream["destination_object_name"] = name
return self

def with_primary_key(self, pk: List[List[str]]) -> "ConfiguredAirbyteStreamBuilder":
self._stream["primary_key"] = pk
self._stream["stream"]["source_defined_primary_key"] = pk # type: ignore # we assume that self._stream["stream"] is a Dict[str, Any]
Expand All @@ -58,7 +70,7 @@ def with_stream(self, name: str, sync_mode: SyncMode) -> "CatalogBuilder": ...
def with_stream(
self,
name: Union[str, ConfiguredAirbyteStreamBuilder],
sync_mode: Union[SyncMode, None] = None,
sync_mode: SyncMode = SyncMode.full_refresh,
) -> "CatalogBuilder":
# As we are introducing a fully fledge ConfiguredAirbyteStreamBuilder, we would like to deprecate the previous interface
# with_stream(str, SyncMode)
Expand Down
6 changes: 5 additions & 1 deletion airbyte_cdk/test/mock_http/request.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,11 @@ def _to_mapping(
elif isinstance(body, bytes):
return json.loads(body.decode()) # type: ignore # assumes return type of Mapping[str, Any]
elif isinstance(body, str):
return json.loads(body) # type: ignore # assumes return type of Mapping[str, Any]
try:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this addition, depending on the order of evaluation, test test_given_on_match_is_mapping_but_not_input_when_matches_then_return_false would fail

return json.loads(body) # type: ignore # assumes return type of Mapping[str, Any]
except json.JSONDecodeError:
# one of the body is a mapping while the other isn't so comparison should fail anyway
return None
return None

@staticmethod
Expand Down
Loading
Loading