Skip to content

feat: skip config validation during discovery for sources with DynamicSchemaLoader #467

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
21 commits
Select commit Hold shift + click to select a range
cdd1ac9
feat: skip config validation during discovery for sources with Dynami…
devin-ai-integration[bot] Apr 8, 2025
7490ad1
style: fix formatting issues
devin-ai-integration[bot] Apr 8, 2025
9e84e1c
fix: update entrypoint to make --config optional for discovery
devin-ai-integration[bot] Apr 8, 2025
47bd67c
style: fix formatting issues
devin-ai-integration[bot] Apr 8, 2025
36d7f1f
fix: add type annotation for empty_config
devin-ai-integration[bot] Apr 8, 2025
b002218
refactor: use generator comprehension instead of list comprehension
devin-ai-integration[bot] Apr 8, 2025
acbab7e
Update airbyte_cdk/entrypoint.py
aaronsteers Apr 8, 2025
d33dcdd
Update CHANGELOG.md
aaronsteers Apr 8, 2025
4253f28
Update airbyte_cdk/sources/declarative/manifest_declarative_source.py
aaronsteers Apr 8, 2025
64610b9
feat: add check_config_during_discover flag for targeted config valid…
devin-ai-integration[bot] Apr 8, 2025
b228857
style: fix formatting issues
devin-ai-integration[bot] Apr 8, 2025
77772c3
Update CHANGELOG.md
aaronsteers Apr 8, 2025
6ca213c
refactor: push check_config_during_discover flag into connector base …
devin-ai-integration[bot] Apr 8, 2025
dce4f8c
style: fix formatting issues
devin-ai-integration[bot] Apr 8, 2025
24a0919
fix: resolve MyPy type checking issues with check_config_during_disco…
devin-ai-integration[bot] Apr 9, 2025
f920f04
refactor: move check_config_during_discover to BaseConnector class
devin-ai-integration[bot] Apr 9, 2025
f01525f
Update airbyte_cdk/connector.py
aaronsteers Apr 9, 2025
769d361
Update airbyte_cdk/connector.py
aaronsteers Apr 9, 2025
c3cbad8
Update airbyte_cdk/sources/declarative/manifest_declarative_source.py
aaronsteers Apr 9, 2025
3cb8faf
Auto-fix lint and format issues
Apr 9, 2025
08397ad
fix condition direction
aaronsteers Apr 9, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion airbyte_cdk/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,15 @@ def load_optional_package_file(package: str, filename: str) -> Optional[bytes]:


class BaseConnector(ABC, Generic[TConfig]):
# configure whether the `check_config_against_spec_or_exit()` needs to be called
check_config_against_spec: bool = True
"""Configure whether `check_config_against_spec_or_exit()` needs to be called."""

check_config_during_discover: bool = False
"""Determines whether config validation should be skipped during discovery.

By default, config validation is not skipped during discovery. This can be overridden
by sources that can provide catalog information without requiring authentication.
"""

@abstractmethod
def configure(self, config: Mapping[str, Any], temp_dir: str) -> TConfig:
Expand Down
27 changes: 21 additions & 6 deletions airbyte_cdk/entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ def parse_args(args: List[str]) -> argparse.Namespace:
)
required_discover_parser = discover_parser.add_argument_group("required named arguments")
required_discover_parser.add_argument(
"--config", type=str, required=True, help="path to the json configuration file"
"--config", type=str, required=False, help="path to the json configuration file"
)

# read
Expand Down Expand Up @@ -141,19 +141,34 @@ def run(self, parsed_args: argparse.Namespace) -> Iterable[str]:
)
if cmd == "spec":
message = AirbyteMessage(type=Type.SPEC, spec=source_spec)
yield from [
yield from (
self.airbyte_message_to_string(queued_message)
for queued_message in self._emit_queued_messages(self.source)
]
)
yield self.airbyte_message_to_string(message)
elif (
cmd == "discover"
and not parsed_args.config
and not self.source.check_config_during_discover
):
# Connector supports unprivileged discover
empty_config: dict[str, Any] = {}
yield from (
self.airbyte_message_to_string(queued_message)
for queued_message in self._emit_queued_messages(self.source)
)
yield from map(
AirbyteEntrypoint.airbyte_message_to_string,
self.discover(source_spec, empty_config),
)
else:
raw_config = self.source.read_config(parsed_args.config)
config = self.source.configure(raw_config, temp_dir)

yield from [
yield from (
self.airbyte_message_to_string(queued_message)
for queued_message in self._emit_queued_messages(self.source)
]
)
if cmd == "check":
yield from map(
AirbyteEntrypoint.airbyte_message_to_string,
Expand Down Expand Up @@ -225,7 +240,7 @@ def discover(
self, source_spec: ConnectorSpecification, config: TConfig
) -> Iterable[AirbyteMessage]:
self.set_up_secret_filter(config, source_spec.connectionSpecification)
if self.source.check_config_against_spec:
if not self.source.check_config_during_discover:
self.validate_connection(source_spec, config)
catalog = self.source.discover(self.logger, config)

Expand Down
35 changes: 35 additions & 0 deletions airbyte_cdk/sources/declarative/manifest_declarative_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
)
from airbyte_cdk.sources.declarative.resolvers import COMPONENTS_RESOLVER_TYPE_MAPPING
from airbyte_cdk.sources.message import MessageRepository
from airbyte_cdk.sources.source import Source
from airbyte_cdk.sources.streams.core import Stream
from airbyte_cdk.sources.types import ConnectionDefinition
from airbyte_cdk.sources.utils.slice_logger import (
Expand Down Expand Up @@ -109,6 +110,8 @@ def __init__(
self._config = config or {}
self._validate_source()

self.check_config_during_discover = self._uses_dynamic_schema_loader()

@property
def resolved_manifest(self) -> Mapping[str, Any]:
return self._source_config
Expand Down Expand Up @@ -440,3 +443,35 @@ def _dynamic_stream_configs(

def _emit_manifest_debug_message(self, extra_args: dict[str, Any]) -> None:
self.logger.debug("declarative source created from manifest", extra=extra_args)

def _uses_dynamic_schema_loader(self) -> bool:
"""
Determines if any stream in the source uses a DynamicSchemaLoader.

DynamicSchemaLoader makes a separate call to retrieve schema information,
which might not require authentication, so we can skip config validation
during discovery when it's used.

Returns:
bool: True if any stream uses a DynamicSchemaLoader, False otherwise.
"""
for stream_config in self._stream_configs(self._source_config):
schema_loader = stream_config.get("schema_loader", {})
if (
isinstance(schema_loader, dict)
and schema_loader.get("type") == "DynamicSchemaLoader"
):
return True

dynamic_streams = self._source_config.get("dynamic_streams", [])
if dynamic_streams:
for dynamic_stream in dynamic_streams:
stream_template = dynamic_stream.get("stream_template", {})
schema_loader = stream_template.get("schema_loader", {})
if (
isinstance(schema_loader, dict)
and schema_loader.get("type") == "DynamicSchemaLoader"
):
return True

return False
Original file line number Diff line number Diff line change
@@ -0,0 +1,174 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

"""Tests for the ManifestDeclarativeSource with DynamicSchemaLoader."""

from unittest.mock import MagicMock, patch

import pytest

from airbyte_cdk.models import AirbyteCatalog
from airbyte_cdk.sources.declarative.manifest_declarative_source import ManifestDeclarativeSource
from airbyte_cdk.sources.utils.schema_helpers import check_config_against_spec_or_exit


def test_check_config_during_discover_with_dynamic_schema_loader():
"""Test that check_config_during_discover is True when DynamicSchemaLoader is used."""
source_config = {
"type": "DeclarativeSource",
"check": {"type": "CheckStream"},
"streams": [
{
"name": "test_stream",
"schema_loader": {
"type": "DynamicSchemaLoader",
"retriever": {
"type": "SimpleRetriever",
"requester": {"url_base": "https://example.com", "http_method": "GET"},
"record_selector": {"extractor": {"field_path": []}},
},
"schema_type_identifier": {
"key_pointer": ["name"],
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {"url_base": "https://example.com", "http_method": "GET"},
"record_selector": {"extractor": {"field_path": []}},
},
}
],
"version": "0.1.0",
}

source = ManifestDeclarativeSource(source_config=source_config)

assert source.check_config_during_discover is True
assert source.check_config_against_spec is True


def test_check_config_during_discover_without_dynamic_schema_loader():
"""Test that check_config_during_discover is False when DynamicSchemaLoader is not used."""
source_config = {
"type": "DeclarativeSource",
"check": {"type": "CheckStream"},
"streams": [
{
"name": "test_stream",
"schema_loader": {"type": "InlineSchemaLoader", "schema": {}},
"retriever": {
"type": "SimpleRetriever",
"requester": {"url_base": "https://example.com", "http_method": "GET"},
"record_selector": {"extractor": {"field_path": []}},
},
}
],
"version": "0.1.0",
}

source = ManifestDeclarativeSource(source_config=source_config)

assert source.check_config_during_discover is False
assert source.check_config_against_spec is True


@patch(
"airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams"
)
def test_discover_with_dynamic_schema_loader_no_config(mock_streams):
"""Test that discovery works without config when DynamicSchemaLoader is used."""
mock_stream = MagicMock()
mock_stream.name = "test_dynamic_stream"

mock_airbyte_stream = MagicMock()
type(mock_airbyte_stream).name = "test_dynamic_stream"
mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream

mock_streams.return_value = [mock_stream]

source_config = {
"type": "DeclarativeSource",
"check": {"type": "CheckStream"},
"streams": [
{
"name": "test_dynamic_stream",
"schema_loader": {
"type": "DynamicSchemaLoader",
"retriever": {
"type": "SimpleRetriever",
"requester": {"url_base": "https://example.com", "http_method": "GET"},
"record_selector": {"extractor": {"field_path": []}},
},
"schema_type_identifier": {
"key_pointer": ["name"],
},
},
"retriever": {
"type": "SimpleRetriever",
"requester": {"url_base": "https://example.com", "http_method": "GET"},
"record_selector": {"extractor": {"field_path": []}},
},
}
],
"version": "0.1.0",
}

source = ManifestDeclarativeSource(source_config=source_config)

assert source.check_config_during_discover is True
assert source.check_config_against_spec is True

logger = MagicMock()
catalog = source.discover(logger, {})

assert isinstance(catalog, AirbyteCatalog)
assert len(catalog.streams) == 1
assert catalog.streams[0].name == "test_dynamic_stream"
Comment on lines +125 to +127
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should be failing. Probably failing to fail because we're mocking.



@patch(
"airbyte_cdk.sources.declarative.manifest_declarative_source.ManifestDeclarativeSource.streams"
)
def test_discover_without_dynamic_schema_loader_no_config(mock_streams):
"""Test that discovery validates config when DynamicSchemaLoader is not used."""
mock_stream = MagicMock()
mock_stream.name = "test_static_stream"

mock_airbyte_stream = MagicMock()
type(mock_airbyte_stream).name = "test_static_stream"
mock_stream.as_airbyte_stream.return_value = mock_airbyte_stream

mock_streams.return_value = [mock_stream]

source_config = {
"type": "DeclarativeSource",
"check": {"type": "CheckStream"},
"streams": [
{
"name": "test_static_stream",
"schema_loader": {"type": "InlineSchemaLoader", "schema": {}},
"retriever": {
"type": "SimpleRetriever",
"requester": {"url_base": "https://example.com", "http_method": "GET"},
"record_selector": {"extractor": {"field_path": []}},
},
}
],
"version": "0.1.0",
}

source = ManifestDeclarativeSource(source_config=source_config)

assert source.check_config_during_discover is False
assert source.check_config_against_spec is True

logger = MagicMock()
catalog = source.discover(logger, {})

assert isinstance(catalog, AirbyteCatalog)
assert len(catalog.streams) == 1
assert catalog.streams[0].name == "test_static_stream"

assert source.check_config_during_discover is False
assert source.check_config_against_spec is True
3 changes: 1 addition & 2 deletions unit_tests/test_entrypoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,14 +172,13 @@ def test_parse_valid_args(
["cmd", "args"],
[
("check", {"config": "config_path"}),
("discover", {"config": "config_path"}),
("read", {"config": "config_path", "catalog": "catalog_path"}),
],
)
def test_parse_missing_required_args(
cmd: str, args: MutableMapping[str, Any], entrypoint: AirbyteEntrypoint
):
required_args = {"check": ["config"], "discover": ["config"], "read": ["config", "catalog"]}
required_args = {"check": ["config"], "read": ["config", "catalog"]}
for required_arg in required_args[cmd]:
argcopy = deepcopy(args)
del argcopy[required_arg]
Expand Down
Loading