Skip to content

Commit

Permalink
SAT: compatibility tests for catalogs (#15486)
Browse files Browse the repository at this point in the history
  • Loading branch information
alafanechere authored Aug 10, 2022
1 parent 1ad5152 commit fd70913
Show file tree
Hide file tree
Showing 8 changed files with 414 additions and 102 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
# Changelog

## 0.1.62
Backward compatibility tests: add syntactic validation of catalogs [#15486](https://github.com/airbytehq/airbyte/pull/15486/)

## 0.1.61
Add unit tests coverage computation [#15443](https://github.com/airbytehq/airbyte/pull/15443/).

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ COPY pytest.ini setup.py ./
COPY source_acceptance_test ./source_acceptance_test
RUN pip install .

LABEL io.airbyte.version=0.1.61
LABEL io.airbyte.version=0.1.62
LABEL io.airbyte.name=airbyte/source-acceptance-test

ENTRYPOINT ["python", "-m", "pytest", "-p", "source_acceptance_test.plugin", "-r", "fEsx"]
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,9 @@ class Status(Enum):
class DiscoveryTestConfig(BaseConfig):
config_path: str = config_path
timeout_seconds: int = timeout_seconds
backward_compatibility_tests_config: BackwardCompatibilityTestsConfig = Field(
description="Configuration for the backward compatibility tests.", default=BackwardCompatibilityTestsConfig()
)


class ExpectedRecordsConfig(BaseModel):
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -178,6 +178,12 @@ def cached_schemas_fixture() -> MutableMapping[str, AirbyteStream]:
return {}


@pytest.fixture(name="previous_cached_schemas", scope="session")
def previous_cached_schemas_fixture() -> MutableMapping[str, AirbyteStream]:
"""Simple cache for discovered catalog of previous connector: stream_name -> json_schema"""
return {}


@pytest.fixture(name="discovered_catalog")
def discovered_catalog_fixture(connector_config, docker_runner: ConnectorRunner, cached_schemas) -> MutableMapping[str, AirbyteStream]:
"""JSON schemas for each stream"""
Expand All @@ -190,6 +196,19 @@ def discovered_catalog_fixture(connector_config, docker_runner: ConnectorRunner,
return cached_schemas


@pytest.fixture(name="previous_discovered_catalog")
def previous_discovered_catalog_fixture(
connector_config, previous_connector_docker_runner: ConnectorRunner, previous_cached_schemas
) -> MutableMapping[str, AirbyteStream]:
"""JSON schemas for each stream"""
if not previous_cached_schemas:
output = previous_connector_docker_runner.call_discover(config=connector_config)
catalogs = [message.catalog for message in output if message.type == Type.CATALOG]
for stream in catalogs[-1].streams:
previous_cached_schemas[stream.name] = stream
return previous_cached_schemas


@pytest.fixture
def detailed_logger() -> Logger:
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,9 @@
from docker.errors import ContainerError
from jsonschema._utils import flatten
from source_acceptance_test.base import BaseTest
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig, SpecTestConfig
from source_acceptance_test.config import BasicReadTestConfig, ConnectionTestConfig, DiscoveryTestConfig, SpecTestConfig
from source_acceptance_test.utils import ConnectorRunner, SecretDict, filter_output, make_hashable, verify_records_schema
from source_acceptance_test.utils.backward_compatibility import SpecDiffChecker, validate_previous_configs
from source_acceptance_test.utils.backward_compatibility import CatalogDiffChecker, SpecDiffChecker, validate_previous_configs
from source_acceptance_test.utils.common import find_all_values_for_key_in_schema, find_keyword_schema
from source_acceptance_test.utils.json_schema_helper import JsonSchemaHelper, get_expected_schema_structure, get_object_structure

Expand Down Expand Up @@ -192,7 +192,7 @@ def test_backward_compatibility(
assert isinstance(actual_connector_spec, ConnectorSpecification) and isinstance(previous_connector_spec, ConnectorSpecification)
spec_diff = self.compute_spec_diff(actual_connector_spec, previous_connector_spec)
checker = SpecDiffChecker(spec_diff)
checker.assert_spec_is_backward_compatible()
checker.assert_is_backward_compatible()
validate_previous_configs(previous_connector_spec, actual_connector_spec, number_of_configs_to_generate)

def test_additional_properties_is_true(self, actual_connector_spec: ConnectorSpecification):
Expand Down Expand Up @@ -235,6 +235,31 @@ def test_check(self, connector_config, inputs: ConnectionTestConfig, docker_runn

@pytest.mark.default_timeout(30)
class TestDiscovery(BaseTest):
@staticmethod
def compute_discovered_catalog_diff(
discovered_catalog: MutableMapping[str, AirbyteStream], previous_discovered_catalog: MutableMapping[str, AirbyteStream]
):
return DeepDiff(
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in previous_discovered_catalog.items()},
{stream_name: airbyte_stream.dict().pop("json_schema") for stream_name, airbyte_stream in discovered_catalog.items()},
view="tree",
ignore_order=True,
)

@pytest.fixture(name="skip_backward_compatibility_tests")
def skip_backward_compatibility_tests_fixture(
self, inputs: DiscoveryTestConfig, previous_connector_docker_runner: ConnectorRunner
) -> bool:
if previous_connector_docker_runner is None:
pytest.skip("The previous connector image could not be retrieved.")

# Get the real connector version in case 'latest' is used in the config:
previous_connector_version = previous_connector_docker_runner._image.labels.get("io.airbyte.version")

if previous_connector_version == inputs.backward_compatibility_tests_config.disable_for_version:
pytest.skip(f"Backward compatibility tests are disabled for version {previous_connector_version}.")
return False

def test_discover(self, connector_config, docker_runner: ConnectorRunner):
"""Verify that discover produce correct schema."""
output = docker_runner.call_discover(config=connector_config)
Expand Down Expand Up @@ -307,6 +332,23 @@ def test_additional_properties_is_true(self, discovered_catalog: Mapping[str, An
[additional_properties_value is True for additional_properties_value in additional_properties_values]
), "When set, additionalProperties field value must be true for backward compatibility."

@pytest.mark.default_timeout(60)
@pytest.mark.backward_compatibility
def test_backward_compatibility(
self,
skip_backward_compatibility_tests: bool,
discovered_catalog: MutableMapping[str, AirbyteStream],
previous_discovered_catalog: MutableMapping[str, AirbyteStream],
):
"""Check if the current spec is backward_compatible:
1. Perform multiple hardcoded syntactic checks with SpecDiffChecker.
2. Validate fake generated previous configs against the actual connector specification with validate_previous_configs.
"""
assert isinstance(discovered_catalog, MutableMapping) and isinstance(previous_discovered_catalog, MutableMapping)
catalog_diff = self.compute_discovered_catalog_diff(discovered_catalog, previous_discovered_catalog)
checker = CatalogDiffChecker(catalog_diff)
checker.assert_is_backward_compatible()


def primary_keys_for_records(streams, records):
streams_with_primary_key = [stream for stream in streams if stream.stream.source_defined_primary_key]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,52 +2,36 @@
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#

from abc import ABC, abstractmethod
from multiprocessing import context

import jsonschema
from airbyte_cdk.models import ConnectorSpecification
from deepdiff import DeepDiff
from hypothesis import given, settings
from hypothesis import HealthCheck, Verbosity, given, settings
from hypothesis_jsonschema import from_schema
from source_acceptance_test.utils import SecretDict


class NonBackwardCompatibleSpecError(Exception):
class NonBackwardCompatibleError(Exception):
pass


class SpecDiffChecker:
"""A class to perform multiple backward compatible checks on a spec diff"""

class BaseDiffChecker(ABC):
def __init__(self, diff: DeepDiff) -> None:
self._diff = diff

def assert_spec_is_backward_compatible(self):
self.check_if_declared_new_required_field()
self.check_if_added_a_new_required_property()
self.check_if_value_of_type_field_changed()
# self.check_if_new_type_was_added() We want to allow type expansion atm
self.check_if_type_of_type_field_changed()
self.check_if_field_was_made_not_nullable()
self.check_if_enum_was_narrowed()
self.check_if_declared_new_enum_field()

def _raise_error(self, message: str):
raise NonBackwardCompatibleSpecError(f"{message}: {self._diff.pretty()}")
raise NonBackwardCompatibleError(f"{context} - {message}. Diff: {self._diff.pretty()}")

def check_if_declared_new_required_field(self):
"""Check if the new spec declared a 'required' field."""
added_required_fields = [
addition for addition in self._diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required"
]
if added_required_fields:
self._raise_error("The current spec declared a new 'required' field")
@property
@abstractmethod
def context(self): # pragma: no cover
pass

def check_if_added_a_new_required_property(self):
"""Check if the new spec added a property to the 'required' list."""
added_required_properties = [
addition for addition in self._diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required"
]
if added_required_properties:
self._raise_error("A new property was added to 'required'")
@abstractmethod
def assert_is_backward_compatible(self): # pragma: no cover
pass

def check_if_value_of_type_field_changed(self):
"""Check if a type was changed"""
Expand All @@ -59,9 +43,9 @@ def check_if_value_of_type_field_changed(self):
change for change in self._diff.get("values_changed", []) if change.path(output_format="list")[-2] == "type"
]
if type_values_changed or type_values_changed_in_list:
self._raise_error("The current spec changed the value of a 'type' field")
self._raise_error("The'type' field value was changed.")

def check_if_new_type_was_added(self):
def check_if_new_type_was_added(self): # pragma: no cover
"""Detect type value added to type list if new type value is not None (e.g ["str"] -> ["str", "int"])"""
new_values_in_type_list = [
change
Expand All @@ -70,7 +54,7 @@ def check_if_new_type_was_added(self):
if change.t2 != "null"
]
if new_values_in_type_list:
self._raise_error("The current spec changed the value of a 'type' field")
self._raise_error("A new value was added to a 'type' field")

def check_if_type_of_type_field_changed(self):
"""
Expand All @@ -90,29 +74,57 @@ def check_if_type_of_type_field_changed(self):
# This might be something already guaranteed by JSON schema validation.
if isinstance(change.t1, str):
if not isinstance(change.t2, list):
self._raise_error("The current spec change a type field from string to an invalid value.")
if not 0 < len(change.t2) <= 2:
self._raise_error(
"The current spec change a type field from string to an invalid value. The type list length should not be empty and have a maximum of two items."
)
self._raise_error("A 'type' field was changed from string to an invalid value.")
# If the new type field is a list we want to make sure it only has the original type (t1) and null: e.g. "str" -> ["str", "null"]
# We want to raise an error otherwise.
t2_not_null_types = [_type for _type in change.t2 if _type != "null"]
if not (len(t2_not_null_types) == 1 and t2_not_null_types[0] == change.t1):
self._raise_error("The current spec change a type field to a list with multiple invalid values.")
self._raise_error("The 'type' field was changed to a list with multiple invalid values")
if isinstance(change.t1, list):
if not isinstance(change.t2, str):
self._raise_error("The current spec change a type field from list to an invalid value.")
self._raise_error("The 'type' field was changed from a list to an invalid value")
if not (len(change.t1) == 1 and change.t2 == change.t1[0]):
self._raise_error("The current spec narrowed a field type.")
self._raise_error("An element was removed from the list of 'type'")


class SpecDiffChecker(BaseDiffChecker):
"""A class to perform backward compatibility checks on a connector specification diff"""

context = "Specification"

def assert_is_backward_compatible(self):
self.check_if_declared_new_required_field()
self.check_if_added_a_new_required_property()
self.check_if_value_of_type_field_changed()
# self.check_if_new_type_was_added() We want to allow type expansion atm
self.check_if_type_of_type_field_changed()
self.check_if_field_was_made_not_nullable()
self.check_if_enum_was_narrowed()
self.check_if_declared_new_enum_field()

def check_if_declared_new_required_field(self):
"""Check if the new spec declared a 'required' field."""
added_required_fields = [
addition for addition in self._diff.get("dictionary_item_added", []) if addition.path(output_format="list")[-1] == "required"
]
if added_required_fields:
self._raise_error("A new 'required' field was declared.")

def check_if_added_a_new_required_property(self):
"""Check if the new spec added a property to the 'required' list"""
added_required_properties = [
addition for addition in self._diff.get("iterable_item_added", []) if addition.up.path(output_format="list")[-1] == "required"
]
if added_required_properties:
self._raise_error("A new property was added to 'required'")

def check_if_field_was_made_not_nullable(self):
"""Detect when field was made not nullable but is still a list: e.g ["string", "null"] -> ["string"]"""
removed_nullable = [
change for change in self._diff.get("iterable_item_removed", []) if change.path(output_format="list")[-2] == "type"
]
if removed_nullable:
self._raise_error("The current spec narrowed a field type or made a field not nullable.")
self._raise_error("A field type was narrowed or made a field not nullable")

def check_if_enum_was_narrowed(self):
"""Check if the list of values in a enum was shortened in a spec."""
Expand All @@ -122,7 +134,7 @@ def check_if_enum_was_narrowed(self):
if enum_removal.up.path(output_format="list")[-1] == "enum"
]
if enum_removals:
self._raise_error("The current spec narrowed an enum field.")
self._raise_error("An enum field was narrowed.")

def check_if_declared_new_enum_field(self):
"""Check if an 'enum' field was added to the spec."""
Expand All @@ -132,7 +144,7 @@ def check_if_declared_new_enum_field(self):
if enum_addition.path(output_format="list")[-1] == "enum"
]
if enum_additions:
self._raise_error("An 'enum' field was declared on an existing property of the spec.")
self._raise_error("An 'enum' field was declared on an existing property")


def validate_previous_configs(
Expand All @@ -143,13 +155,34 @@ def validate_previous_configs(
2. Validate a fake previous config against the actual connector specification json schema."""

@given(from_schema(previous_connector_spec.dict()["connectionSpecification"]))
@settings(max_examples=number_of_configs_to_generate)
@settings(max_examples=number_of_configs_to_generate, verbosity=Verbosity.quiet, suppress_health_check=(HealthCheck.too_slow,))
def check_fake_previous_config_against_actual_spec(fake_previous_config):
fake_previous_config = SecretDict(fake_previous_config)
filtered_fake_previous_config = {key: value for key, value in fake_previous_config.data.items() if not key.startswith("_")}
try:
jsonschema.validate(instance=filtered_fake_previous_config, schema=actual_connector_spec.connectionSpecification)
except jsonschema.exceptions.ValidationError as err:
raise NonBackwardCompatibleSpecError(err)
if isinstance(fake_previous_config, dict): # Looks like hypothesis-jsonschema not only generate dict objects...
fake_previous_config = SecretDict(fake_previous_config)
filtered_fake_previous_config = {key: value for key, value in fake_previous_config.data.items() if not key.startswith("_")}
try:
jsonschema.validate(instance=filtered_fake_previous_config, schema=actual_connector_spec.connectionSpecification)
except jsonschema.exceptions.ValidationError as err:
raise NonBackwardCompatibleError(err)

check_fake_previous_config_against_actual_spec()


class CatalogDiffChecker(BaseDiffChecker):
"""A class to perform backward compatibility checks on a discoverd catalog diff"""

context = "Catalog"

def assert_is_backward_compatible(self):
self.check_if_stream_was_removed()
self.check_if_value_of_type_field_changed()
self.check_if_type_of_type_field_changed()

def check_if_stream_was_removed(self):
"""Check if a stream was removed from the catalog."""
removed_streams = []
for removal in self._diff.get("dictionary_item_removed", []):
if removal.path() != "root" and removal.up.path() == "root":
removed_streams.append(removal.path(output_format="list")[0])
if removed_streams:
self._raise_error(f"The following streams were removed: {','.join(removed_streams)}")
Loading

0 comments on commit fd70913

Please sign in to comment.