Skip to content

Commit 21b6413

Browse files
aaronsteersoctavia-squidington-iiidevin-ai-integration[bot]
authored
Feat: New CDK-Native FAST Standard tests, replaces CAT (#349)
Co-authored-by: octavia-squidington-iii <[email protected]> Co-authored-by: Devin AI <158243242+devin-ai-integration[bot]@users.noreply.github.com>
1 parent bf998bd commit 21b6413

32 files changed

+879
-27
lines changed

airbyte_cdk/test/entrypoint_wrapper.py

+4
Original file line numberDiff line numberDiff line change
@@ -82,6 +82,10 @@ def records(self) -> List[AirbyteMessage]:
8282
def state_messages(self) -> List[AirbyteMessage]:
8383
return self._get_message_by_types([Type.STATE])
8484

85+
@property
86+
def connection_status_messages(self) -> List[AirbyteMessage]:
87+
return self._get_message_by_types([Type.CONNECTION_STATUS])
88+
8589
@property
8690
def most_recent_state(self) -> Any:
8791
state_messages = self._get_message_by_types([Type.STATE])
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
'''FAST Airbyte Standard Tests
3+
4+
This module provides a set of base classes for declarative connector test suites.
5+
The goal of this module is to provide a robust and extensible framework for testing Airbyte
6+
connectors.
7+
8+
Example usage:
9+
10+
```python
11+
# `test_airbyte_standards.py`
12+
from airbyte_cdk.test import standard_tests
13+
14+
pytest_plugins = [
15+
"airbyte_cdk.test.standard_tests.pytest_hooks",
16+
]
17+
18+
19+
class TestSuiteSourcePokeAPI(standard_tests.DeclarativeSourceTestSuite):
20+
"""Test suite for the source."""
21+
```
22+
23+
Available test suites base classes:
24+
- `DeclarativeSourceTestSuite`: A test suite for declarative sources.
25+
- `SourceTestSuiteBase`: A test suite for sources.
26+
- `DestinationTestSuiteBase`: A test suite for destinations.
27+
28+
'''
29+
30+
from airbyte_cdk.test.standard_tests.connector_base import (
31+
ConnectorTestScenario,
32+
ConnectorTestSuiteBase,
33+
)
34+
from airbyte_cdk.test.standard_tests.declarative_sources import (
35+
DeclarativeSourceTestSuite,
36+
)
37+
from airbyte_cdk.test.standard_tests.destination_base import DestinationTestSuiteBase
38+
from airbyte_cdk.test.standard_tests.source_base import SourceTestSuiteBase
39+
40+
__all__ = [
41+
"ConnectorTestScenario",
42+
"ConnectorTestSuiteBase",
43+
"DeclarativeSourceTestSuite",
44+
"DestinationTestSuiteBase",
45+
"SourceTestSuiteBase",
46+
]
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,159 @@
1+
# Copyright (c) 2025 Airbyte, Inc., all rights reserved.
2+
"""Job runner for Airbyte Standard Tests."""
3+
4+
import logging
5+
import tempfile
6+
import uuid
7+
from dataclasses import asdict
8+
from pathlib import Path
9+
from typing import Any, Callable, Literal
10+
11+
import orjson
12+
from typing_extensions import Protocol, runtime_checkable
13+
14+
from airbyte_cdk.models import (
15+
ConfiguredAirbyteCatalog,
16+
Status,
17+
)
18+
from airbyte_cdk.test import entrypoint_wrapper
19+
from airbyte_cdk.test.standard_tests.models import (
20+
ConnectorTestScenario,
21+
)
22+
23+
24+
def _errors_to_str(
25+
entrypoint_output: entrypoint_wrapper.EntrypointOutput,
26+
) -> str:
27+
"""Convert errors from entrypoint output to a string."""
28+
if not entrypoint_output.errors:
29+
# If there are no errors, return an empty string.
30+
return ""
31+
32+
return "\n" + "\n".join(
33+
[
34+
str(error.trace.error).replace(
35+
"\\n",
36+
"\n",
37+
)
38+
for error in entrypoint_output.errors
39+
if error.trace
40+
],
41+
)
42+
43+
44+
@runtime_checkable
45+
class IConnector(Protocol):
46+
"""A connector that can be run in a test scenario.
47+
48+
Note: We currently use 'spec' to determine if we have a connector object.
49+
In the future, it would be preferred to leverage a 'launch' method instead,
50+
directly on the connector (which doesn't yet exist).
51+
"""
52+
53+
def spec(self, logger: logging.Logger) -> Any:
54+
"""Connectors should have a `spec` method."""
55+
56+
57+
def run_test_job(
58+
connector: IConnector | type[IConnector] | Callable[[], IConnector],
59+
verb: Literal["read", "check", "discover"],
60+
test_scenario: ConnectorTestScenario,
61+
*,
62+
catalog: ConfiguredAirbyteCatalog | dict[str, Any] | None = None,
63+
) -> entrypoint_wrapper.EntrypointOutput:
64+
"""Run a test scenario from provided CLI args and return the result."""
65+
if not connector:
66+
raise ValueError("Connector is required")
67+
68+
if catalog and isinstance(catalog, ConfiguredAirbyteCatalog):
69+
# Convert the catalog to a dict if it's already a ConfiguredAirbyteCatalog.
70+
catalog = asdict(catalog)
71+
72+
connector_obj: IConnector
73+
if isinstance(connector, type) or callable(connector):
74+
# If the connector is a class or a factory lambda, instantiate it.
75+
connector_obj = connector()
76+
elif isinstance(connector, IConnector):
77+
connector_obj = connector
78+
else:
79+
raise ValueError(
80+
f"Invalid connector input: {type(connector)}",
81+
)
82+
83+
args: list[str] = [verb]
84+
if test_scenario.config_path:
85+
args += ["--config", str(test_scenario.config_path)]
86+
elif test_scenario.config_dict:
87+
config_path = (
88+
Path(tempfile.gettempdir()) / "airbyte-test" / f"temp_config_{uuid.uuid4().hex}.json"
89+
)
90+
config_path.parent.mkdir(parents=True, exist_ok=True)
91+
config_path.write_text(orjson.dumps(test_scenario.config_dict).decode())
92+
args += ["--config", str(config_path)]
93+
94+
catalog_path: Path | None = None
95+
if verb not in ["discover", "check"]:
96+
# We need a catalog for read.
97+
if catalog:
98+
# Write the catalog to a temp json file and pass the path to the file as an argument.
99+
catalog_path = (
100+
Path(tempfile.gettempdir())
101+
/ "airbyte-test"
102+
/ f"temp_catalog_{uuid.uuid4().hex}.json"
103+
)
104+
catalog_path.parent.mkdir(parents=True, exist_ok=True)
105+
catalog_path.write_text(orjson.dumps(catalog).decode())
106+
elif test_scenario.configured_catalog_path:
107+
catalog_path = Path(test_scenario.configured_catalog_path)
108+
109+
if catalog_path:
110+
args += ["--catalog", str(catalog_path)]
111+
112+
# This is a bit of a hack because the source needs the catalog early.
113+
# Because it *also* can fail, we have to redundantly wrap it in a try/except block.
114+
115+
result: entrypoint_wrapper.EntrypointOutput = entrypoint_wrapper._run_command( # noqa: SLF001 # Non-public API
116+
source=connector_obj, # type: ignore [arg-type]
117+
args=args,
118+
expecting_exception=test_scenario.expect_exception,
119+
)
120+
if result.errors and not test_scenario.expect_exception:
121+
raise AssertionError(
122+
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
123+
)
124+
125+
if verb == "check":
126+
# Check is expected to fail gracefully without an exception.
127+
# Instead, we assert that we have a CONNECTION_STATUS message with
128+
# a failure status.
129+
assert len(result.connection_status_messages) == 1, (
130+
"Expected exactly one CONNECTION_STATUS message. Got "
131+
f"{len(result.connection_status_messages)}:\n"
132+
+ "\n".join([str(msg) for msg in result.connection_status_messages])
133+
+ _errors_to_str(result)
134+
)
135+
if test_scenario.expect_exception:
136+
conn_status = result.connection_status_messages[0].connectionStatus
137+
assert conn_status, (
138+
"Expected CONNECTION_STATUS message to be present. Got: \n"
139+
+ "\n".join([str(msg) for msg in result.connection_status_messages])
140+
)
141+
assert conn_status.status == Status.FAILED, (
142+
"Expected CONNECTION_STATUS message to be FAILED. Got: \n"
143+
+ "\n".join([str(msg) for msg in result.connection_status_messages])
144+
)
145+
146+
return result
147+
148+
# For all other verbs, we assert check that an exception is raised (or not).
149+
if test_scenario.expect_exception:
150+
if not result.errors:
151+
raise AssertionError("Expected exception but got none.")
152+
153+
return result
154+
155+
assert not result.errors, (
156+
f"Expected no errors but got {len(result.errors)}: \n" + _errors_to_str(result)
157+
)
158+
159+
return result
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
# Copyright (c) 2024 Airbyte, Inc., all rights reserved.
2+
"""Base class for connector test suites."""
3+
4+
from __future__ import annotations
5+
6+
import abc
7+
import inspect
8+
import sys
9+
from collections.abc import Callable
10+
from pathlib import Path
11+
from typing import cast
12+
13+
import yaml
14+
from boltons.typeutils import classproperty
15+
16+
from airbyte_cdk.models import (
17+
AirbyteMessage,
18+
Type,
19+
)
20+
from airbyte_cdk.test import entrypoint_wrapper
21+
from airbyte_cdk.test.standard_tests._job_runner import IConnector, run_test_job
22+
from airbyte_cdk.test.standard_tests.models import (
23+
ConnectorTestScenario,
24+
)
25+
26+
ACCEPTANCE_TEST_CONFIG = "acceptance-test-config.yml"
27+
MANIFEST_YAML = "manifest.yaml"
28+
29+
30+
class ConnectorTestSuiteBase(abc.ABC):
31+
"""Base class for connector test suites."""
32+
33+
connector: type[IConnector] | Callable[[], IConnector] | None = None
34+
"""The connector class or a factory function that returns an scenario of IConnector."""
35+
36+
@classmethod
37+
def get_test_class_dir(cls) -> Path:
38+
"""Get the file path that contains the class."""
39+
module = sys.modules[cls.__module__]
40+
# Get the directory containing the test file
41+
return Path(inspect.getfile(module)).parent
42+
43+
@classmethod
44+
def create_connector(
45+
cls,
46+
scenario: ConnectorTestScenario,
47+
) -> IConnector:
48+
"""Instantiate the connector class."""
49+
connector = cls.connector # type: ignore
50+
if connector:
51+
if callable(connector) or isinstance(connector, type):
52+
# If the connector is a class or factory function, instantiate it:
53+
return cast(IConnector, connector()) # type: ignore [redundant-cast]
54+
55+
# Otherwise, we can't instantiate the connector. Fail with a clear error message.
56+
raise NotImplementedError(
57+
"No connector class or connector factory function provided. "
58+
"Please provide a class or factory function in `cls.connector`, or "
59+
"override `cls.create_connector()` to define a custom initialization process."
60+
)
61+
62+
# Test Definitions
63+
64+
def test_check(
65+
self,
66+
scenario: ConnectorTestScenario,
67+
) -> None:
68+
"""Run `connection` acceptance tests."""
69+
result: entrypoint_wrapper.EntrypointOutput = run_test_job(
70+
self.create_connector(scenario),
71+
"check",
72+
test_scenario=scenario,
73+
)
74+
conn_status_messages: list[AirbyteMessage] = [
75+
msg for msg in result._messages if msg.type == Type.CONNECTION_STATUS
76+
] # noqa: SLF001 # Non-public API
77+
assert len(conn_status_messages) == 1, (
78+
f"Expected exactly one CONNECTION_STATUS message. Got: {result._messages}"
79+
)
80+
81+
@classmethod
82+
def get_connector_root_dir(cls) -> Path:
83+
"""Get the root directory of the connector."""
84+
for parent in cls.get_test_class_dir().parents:
85+
if (parent / MANIFEST_YAML).exists():
86+
return parent
87+
if (parent / ACCEPTANCE_TEST_CONFIG).exists():
88+
return parent
89+
if parent.name == "airbyte_cdk":
90+
break
91+
# If we reach here, we didn't find the manifest file in any parent directory
92+
# Check if the manifest file exists in the current directory
93+
for parent in Path.cwd().parents:
94+
if (parent / MANIFEST_YAML).exists():
95+
return parent
96+
if (parent / ACCEPTANCE_TEST_CONFIG).exists():
97+
return parent
98+
if parent.name == "airbyte_cdk":
99+
break
100+
101+
raise FileNotFoundError(
102+
"Could not find connector root directory relative to "
103+
f"'{str(cls.get_test_class_dir())}' or '{str(Path.cwd())}'."
104+
)
105+
106+
@classproperty
107+
def acceptance_test_config_path(cls) -> Path:
108+
"""Get the path to the acceptance test config file."""
109+
result = cls.get_connector_root_dir() / ACCEPTANCE_TEST_CONFIG
110+
if result.exists():
111+
return result
112+
113+
raise FileNotFoundError(f"Acceptance test config file not found at: {str(result)}")
114+
115+
@classmethod
116+
def get_scenarios(
117+
cls,
118+
) -> list[ConnectorTestScenario]:
119+
"""Get acceptance tests for a given category.
120+
121+
This has to be a separate function because pytest does not allow
122+
parametrization of fixtures with arguments from the test class itself.
123+
"""
124+
category = "connection"
125+
all_tests_config = yaml.safe_load(cls.acceptance_test_config_path.read_text())
126+
if "acceptance_tests" not in all_tests_config:
127+
raise ValueError(
128+
f"Acceptance tests config not found in {cls.acceptance_test_config_path}."
129+
f" Found only: {str(all_tests_config)}."
130+
)
131+
if category not in all_tests_config["acceptance_tests"]:
132+
return []
133+
if "tests" not in all_tests_config["acceptance_tests"][category]:
134+
raise ValueError(f"No tests found for category {category}")
135+
136+
tests_scenarios = [
137+
ConnectorTestScenario.model_validate(test)
138+
for test in all_tests_config["acceptance_tests"][category]["tests"]
139+
if "iam_role" not in test["config_path"]
140+
]
141+
connector_root = cls.get_connector_root_dir().absolute()
142+
for test in tests_scenarios:
143+
if test.config_path:
144+
test.config_path = connector_root / test.config_path
145+
if test.configured_catalog_path:
146+
test.configured_catalog_path = connector_root / test.configured_catalog_path
147+
148+
return tests_scenarios

0 commit comments

Comments
 (0)