Skip to content

Commit

Permalink
SAT: DX improvements, better error handling and more (#4260)
Browse files Browse the repository at this point in the history
 small fixes for SAT for better DX:

- better stack trace in case of error inside the connector, print only relevant information with proper formatting (multiline stack trace instead of single string)
- better logging - print message about image pulling only when it actually happens, stop tests if image not found
- using discovery command for json_schema, when configured_catalog will be loaded we populate `json_schema` from a schema that we get from discovery command, the result is cached for all session duration.
- better record comparison, takes care of lists inside dicts - because lists are unordered we will have false negatives when compare serialized records.
- copied pytest config to airbyte root folder, so when pytest runs tests locally it can find it, this will affect all local execution of pytest
- add IPython as a standard debugger

Co-authored-by: Eugene Kulak <[email protected]>
  • Loading branch information
keu and eugene-kulak authored Jun 22, 2021
1 parent 39fe5fe commit cef8b80
Show file tree
Hide file tree
Showing 9 changed files with 67 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ COPY setup.py ./
COPY pytest.ini ./
RUN pip install .

LABEL io.airbyte.version=0.1.3
LABEL io.airbyte.version=0.1.4
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin"]
4 changes: 2 additions & 2 deletions airbyte-integrations/bases/source-acceptance-test/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,9 +29,9 @@
"airbyte-cdk~=0.1",
"docker~=4.4",
"PyYAML~=5.4",
"inflection~=0.5",
"icdiff~=1.9",
"pendulum~=1.2",
"inflection~=0.5",
"pdbpp~=0.10",
"pydantic~=1.6",
"pytest~=6.1",
"pytest-sugar~=0.9",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
from typing import Any, List, MutableMapping, Optional

import pytest
from airbyte_cdk.models import AirbyteCatalog, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification
from airbyte_cdk.models import AirbyteCatalog, AirbyteRecordMessage, ConfiguredAirbyteCatalog, ConnectorSpecification, Type
from docker import errors
from source_acceptance_test.config import Config
from source_acceptance_test.utils import ConnectorRunner, SecretDict, load_config

Expand Down Expand Up @@ -75,9 +76,12 @@ def configured_catalog_path_fixture(inputs, base_path) -> Optional[str]:


@pytest.fixture(name="configured_catalog")
def configured_catalog_fixture(configured_catalog_path) -> Optional[ConfiguredAirbyteCatalog]:
def configured_catalog_fixture(configured_catalog_path, catalog_schemas) -> Optional[ConfiguredAirbyteCatalog]:
if configured_catalog_path:
return ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
catalog = ConfiguredAirbyteCatalog.parse_file(configured_catalog_path)
for configured_stream in catalog.streams:
configured_stream.stream.json_schema = catalog_schemas.get(configured_stream.stream.name, {})
return catalog
return None


Expand Down Expand Up @@ -128,9 +132,12 @@ def docker_runner_fixture(image_tag, tmp_path) -> ConnectorRunner:
@pytest.fixture(scope="session", autouse=True)
def pull_docker_image(acceptance_test_config) -> None:
"""Startup fixture to pull docker image"""
print("Pulling docker image", acceptance_test_config.connector_image)
ConnectorRunner(image_name=acceptance_test_config.connector_image, volume=Path("."))
print("Pulling completed")
image_name = acceptance_test_config.connector_image
config_filename = "acceptance-test-config.yml"
try:
ConnectorRunner(image_name=image_name, volume=Path("."))
except errors.ImageNotFound:
pytest.exit(f"Docker image `{image_name}` not found, please check your {config_filename} file", returncode=1)


@pytest.fixture(name="expected_records")
Expand All @@ -141,3 +148,21 @@ def expected_records_fixture(inputs, base_path) -> List[AirbyteRecordMessage]:

with open(str(base_path / getattr(expect_records, "path"))) as f:
return [AirbyteRecordMessage.parse_raw(line) for line in f]


@pytest.fixture(name="cached_schemas", scope="session")
def cached_schemas_fixture() -> MutableMapping[str, Any]:
"""Simple cache for discovered catalog: stream_name -> json_schema"""
return {}


@pytest.fixture(name="catalog_schemas")
def catalog_schemas_fixture(connector_config, docker_runner: ConnectorRunner, cached_schemas) -> MutableMapping[str, Any]:
"""JSON schemas for each stream"""
if not cached_schemas:
output = docker_runner.call_discover(config=connector_config)
catalogs = [message.catalog for message in output if message.type == Type.CATALOG]
for stream in catalogs[-1].streams:
cached_schemas[stream.name] = stream.json_schema

return cached_schemas
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@
#


import json
from collections import Counter, defaultdict
from typing import Any, List, Mapping, MutableMapping

Expand All @@ -32,7 +31,7 @@
from docker.errors import ContainerError
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig
from source_acceptance_test.utils import ConnectorRunner
from source_acceptance_test.utils import ConnectorRunner, serialize


@pytest.mark.timeout(10)
Expand Down Expand Up @@ -152,8 +151,8 @@ def compare_records(stream_name, actual, expected, extra_fields, exact_order, ex
r2 = TestBasicRead.remove_extra_fields(r2, r1)
assert r1 == r2, f"Stream {stream_name}: Mismatch of record order or values"
else:
expected = set(map(TestBasicRead.serialize_record_for_comparison, expected))
actual = set(map(TestBasicRead.serialize_record_for_comparison, actual))
expected = set(map(serialize, expected))
actual = set(map(serialize, actual))
missing_expected = set(expected) - set(actual)

assert not missing_expected, f"Stream {stream_name}: All expected records must be produced"
Expand All @@ -170,7 +169,3 @@ def group_by_stream(records) -> MutableMapping[str, List[MutableMapping]]:
result[record.stream].append(record.data)

return result

@staticmethod
def serialize_record_for_comparison(record: Mapping) -> str:
return json.dumps(record, sort_keys=True)
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,10 @@
#


import json
from functools import partial

import pytest
from airbyte_cdk.models import Type
from source_acceptance_test.base import BaseTest
from source_acceptance_test.utils import ConnectorRunner, full_refresh_only_catalog
from source_acceptance_test.utils import ConnectorRunner, full_refresh_only_catalog, serialize


@pytest.mark.timeout(20 * 60)
Expand All @@ -41,7 +38,6 @@ def test_sequential_reads(self, connector_config, configured_catalog, docker_run

output = docker_runner.call_read(connector_config, configured_catalog)
records_2 = [message.record.data for message in output if message.type == Type.RECORD]
serialize = partial(json.dumps, sort_keys=True)

assert not (
set(map(serialize, records_1)) - set(map(serialize, records_2))
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
from .common import SecretDict, filter_output, full_refresh_only_catalog, incremental_only_catalog, load_config
from .compare import diff_dicts
from .compare import diff_dicts, serialize
from .connector_runner import ConnectorRunner
from .json_schema_helper import JsonSchemaHelper

Expand All @@ -12,4 +12,5 @@
"SecretDict",
"ConnectorRunner",
"diff_dicts",
"serialize",
]
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@
#


from typing import List, Optional
import json
from typing import List, Mapping, Optional

import icdiff
import py
Expand Down Expand Up @@ -67,3 +68,12 @@ def diff_dicts(left, right, use_markup) -> Optional[List[str]]:
icdiff_lines = list(differ.make_table(pretty_left, pretty_right, context=True))

return ["equals failed"] + [color_off + line for line in icdiff_lines]


def serialize(value) -> str:
"""Simplify comparison of nested dicts/lists"""
if isinstance(value, Mapping):
return json.dumps({k: serialize(v) for k, v in value.items()}, sort_keys=True)
if isinstance(value, List):
return sorted([serialize(v) for v in value])
return str(value)
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@

import docker
from airbyte_cdk.models import AirbyteMessage, ConfiguredAirbyteCatalog
from docker.errors import ContainerError
from pydantic import ValidationError


Expand All @@ -39,7 +40,9 @@ def __init__(self, image_name: str, volume: Path):
try:
self._image = self._client.images.get(image_name)
except docker.errors.ImageNotFound:
print("Pulling docker image", image_name)
self._image = self._client.images.pull(image_name)
print("Pulling completed")
self._runs = 0
self._volume_base = volume

Expand Down Expand Up @@ -107,10 +110,17 @@ def call_read_with_state(self, config, catalog, state, **kwargs) -> List[Airbyte
def run(self, cmd, config=None, state=None, catalog=None, **kwargs) -> Iterable[AirbyteMessage]:
self._runs += 1
volumes = self._prepare_volumes(config, state, catalog)
logs = self._client.containers.run(
image=self._image, command=cmd, working_dir="/data", volumes=volumes, network="host", stdout=True, stderr=True, **kwargs
)
logging.info("Docker run: \n%s\ninput: %s\noutput: %s", cmd, self.input_folder, self.output_folder)
try:
logs = self._client.containers.run(
image=self._image, command=cmd, working_dir="/data", volumes=volumes, network="host", stdout=True, stderr=True, **kwargs
)
except ContainerError as err:
# beautify error from container
patched_error = ContainerError(
container=err.container, exit_status=err.exit_status, command=err.command, image=err.image, stderr=err.stderr.decode()
)
raise patched_error from None # get rid of any previous exception stack

with open(str(self.output_folder / "raw"), "wb+") as f:
f.write(logs)
Expand Down
3 changes: 3 additions & 0 deletions pytest.ini
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
[pytest]

addopts = -r a --capture=no -vv --log-level=INFO --color=yes

0 comments on commit cef8b80

Please sign in to comment.