diff --git a/.circleci/config.yml b/.circleci/config.yml index df6d9b280..fce6131ce 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -9,7 +9,7 @@ parameters: default: "3.9" jobs: - kolena-ci: + unit-test: parameters: python-version: type: string @@ -22,13 +22,13 @@ jobs: steps: - checkout - restore_cache: - key: &kolena-ci-cache kolena-ci-cache-<< parameters.python-version >>-{{ checksum "pyproject.toml" }} + key: &ci-base-cache ci-cache-<< parameters.python-version >>-{{ checksum "pyproject.toml" }} - run: | poetry config experimental.new-installer false poetry config installer.max-workers 10 poetry install --no-ansi - save_cache: - key: *kolena-ci-cache + key: *ci-base-cache paths: - /home/circleci/project/.poetry/virtualenvs - poetry.lock @@ -44,12 +44,51 @@ jobs: name: Run unit tests command: | poetry run pytest -vv --cov=kolena --cov-branch tests/unit + - when: + # Generate coverage only from one python version + condition: + equal: [ "3.9", << parameters.python-version >> ] + steps: + - run: + name: Coverage + command: | + poetry run coverage xml --data-file .coverage + - codecov/upload: + file: coverage.xml + + integration-test: + parameters: + python-version: + type: string + default: "3.9" + pytest-group: + type: string + default: "generic" + docker: + - image: cimg/python:<< parameters.python-version >> + resource_class: small + environment: + POETRY_CACHE_DIR: /home/circleci/project/.poetry + steps: + - checkout + - restore_cache: + key: ci-cache-<< parameters.python-version >>-{{ checksum "pyproject.toml" }} - run: - name: Run integration tests + name: Run << parameters.pytest-group >> integration tests command: | export KOLENA_TOKEN=${KOLENA_TOKEN} export KOLENA_CLIENT_BASE_URL=${KOLENA_CLIENT_BASE_URL} - poetry run pytest -vv --cov-append --durations=0 --cov=kolena --cov-branch tests/integration + TEST_GROUP="<< parameters.pytest-group >>" + if [ "$TEST_GROUP" = "misc" ]; then + poetry run pytest -vv --durations=0 --cov=kolena --cov-branch \ + --ignore=tests/integration/classification \ + --ignore=tests/integration/detection \ + --ignore=tests/integration/generic \ + --ignore=tests/integration/fr \ + tests/integration + else + poetry run pytest -vv --durations=0 --cov=kolena --cov-branch tests/integration/$TEST_GROUP + fi - when: # Generate coverage only from one python version condition: @@ -65,7 +104,15 @@ jobs: workflows: ci: jobs: - - kolena-ci: + - unit-test: + name: unit-test-<< matrix.python-version >> matrix: parameters: python-version: [ "3.7", "3.8", "3.9", "3.10" ] + - integration-test: + matrix: + parameters: + python-version: [ "3.9" ] + pytest-group: [ "detection", "fr", "generic", "misc" ] + requires: + - unit-test-<< matrix.python-version >> diff --git a/tests/integration/detection/conftest.py b/tests/integration/detection/conftest.py new file mode 100644 index 000000000..829001c0a --- /dev/null +++ b/tests/integration/detection/conftest.py @@ -0,0 +1,146 @@ +# Copyright 2021-2023 Kolena Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from dataclasses import dataclass +from typing import List + +import pytest + +from kolena.detection import Model +from kolena.detection import TestCase +from kolena.detection import TestImage +from kolena.detection import TestSuite +from kolena.detection.ground_truth import BoundingBox +from kolena.detection.ground_truth import ClassificationLabel +from kolena.detection.ground_truth import SegmentationMask +from tests.integration.helper import fake_locator +from tests.integration.helper import with_test_prefix + + +@dataclass(frozen=True) +class TestData: + test_cases: List[TestCase] + test_suites: List[TestSuite] + models: List[Model] + locators: List[str] + + +@pytest.fixture(scope="session") +def detection_test_data() -> TestData: + ground_truths = [ + ClassificationLabel("car"), + ClassificationLabel("bike"), + BoundingBox("boat", top_left=(0.0, 1.5), bottom_right=(0.3, 3.4)), + SegmentationMask("van", [(4.0, 1.5), (0.9, 3.4), (19.5, 17.6), (8, 8)]), + BoundingBox("boat", top_left=(50, 60), bottom_right=(60, 100)), + BoundingBox("pedestrian", top_left=(120, 70), bottom_right=(190, 100)), + SegmentationMask("truck", [(0, 15), (0.9, 3.4), (19.5, 17.6), (0, 15)]), + SegmentationMask("airplane", [(4.0, 1.5), (0.9, 3.4), (19.5, 17.6), (8, 8)]), + ] + dataset = with_test_prefix("fake-data-set") + images = [(fake_locator(i, "detection/base"), {"example": "metadata", "i": i}) for i in range(5)] + + test_case_a = TestCase( + with_test_prefix("A"), + description="filler", + images=[ + TestImage(locator=images[0][0], dataset=dataset, metadata=images[0][1], ground_truths=[ground_truths[0]]), + TestImage(locator=images[1][0], dataset=dataset, metadata=images[1][1]), + ], + ) + test_case_a_updated = TestCase( + with_test_prefix("A"), + description="description", + images=[ + TestImage(locator=images[0][0], dataset=dataset, metadata=images[0][1], ground_truths=[ground_truths[0]]), + TestImage(locator=images[1][0], dataset=dataset, metadata=images[1][1]), + TestImage(locator=images[2][0], dataset=dataset, metadata=images[2][1], ground_truths=[ground_truths[2]]), + TestImage(locator=images[4][0], dataset=dataset, metadata=images[4][1]), + ], + reset=True, + ) + test_case_b = TestCase( + with_test_prefix("B"), + description="fields", + images=[ + TestImage( + locator=images[2][0], + dataset=dataset, + metadata=images[2][1], + ground_truths=[ground_truths[1], ground_truths[2]], + ), + TestImage(locator=images[3][0], dataset=dataset, metadata=images[3][1], ground_truths=[ground_truths[4]]), + ], + ) + test_case_b_updated = TestCase( + with_test_prefix("B"), + description="etc", + images=[ + TestImage(locator=images[1][0], dataset=dataset, metadata=images[1][1]), + TestImage( + locator=images[2][0], + dataset=dataset, + metadata=images[2][1], + ground_truths=[ + ground_truths[2], + ground_truths[3], + ], + ), + TestImage( + locator=images[3][0], + dataset=dataset, + metadata=images[3][1], + ground_truths=[ + ground_truths[5], + ground_truths[7], + ], + ), + ], + reset=True, + ) + test_case_b_subset = TestCase( + with_test_prefix("B_subset"), + description="and more!", + images=[ + TestImage(locator=images[3][0], dataset=dataset, metadata=images[3][1], ground_truths=[ground_truths[6]]), + ], + ) + + test_cases = [test_case_a, test_case_a_updated, test_case_b, test_case_b_updated, test_case_b_subset] + + test_suite_name_a = with_test_prefix("A") + test_suite_a = TestSuite(test_suite_name_a, description="filler", test_cases=[test_case_a, test_case_b]) + test_suite_a_updated = TestSuite( + test_suite_name_a, + description="description", + test_cases=[test_case_a_updated, test_case_b], + reset=True, + ) + test_suite_b = TestSuite(with_test_prefix("B"), description="fields", test_cases=[test_case_b_updated]) + test_suite_a_subset = TestSuite( + with_test_prefix("A_subset"), + description="etc", + test_cases=[test_case_b_subset], + ) + + test_suites = [test_suite_a, test_suite_a_updated, test_suite_b, test_suite_a_subset] + + models = [ + Model(with_test_prefix("a"), metadata={"some": "metadata"}), + Model(with_test_prefix("b"), metadata={"one": 1, "false": False}), + ] + + return TestData(test_cases=test_cases, test_suites=test_suites, models=models, locators=[img[0] for img in images]) + + +pytest.register_assert_rewrite("tests.integration.detection.helper") diff --git a/tests/integration/detection/helper.py b/tests/integration/detection/helper.py new file mode 100644 index 000000000..54fcfec06 --- /dev/null +++ b/tests/integration/detection/helper.py @@ -0,0 +1,103 @@ +# Copyright 2021-2023 Kolena Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import json +import random +from typing import List +from typing import Tuple + +from kolena.detection import ground_truth +from kolena.detection import inference +from kolena.detection import TestImage + +fake_labels = [ + "car", + "bike", + "house", + "airplane", + "boat", + "bus", + "animal", + "person", + "cow", + "cat", + "dog", + "parakeet", + "weasel", + "rabbit", + "mouse", + "rat", + "anteater", + "aardvark", + "whale", + "seal", + "walrus", + "butterfly", + "hawk", + "pigeon", + "goose", +] + + +def fake_label() -> str: + return random.choice(fake_labels) + + +def fake_points(n: int) -> List[Tuple[float, float]]: + return [(round(random.random() * 300, 3), round(random.random(), 3)) for _ in range(n)] + + +def fake_gt_classification_label() -> ground_truth.ClassificationLabel: + return ground_truth.ClassificationLabel(fake_label()) + + +def fake_gt_bounding_box() -> ground_truth.BoundingBox: + return ground_truth.BoundingBox(fake_label(), *fake_points(2)) + + +def fake_gt_segmentation_mask() -> ground_truth.SegmentationMask: + return ground_truth.SegmentationMask(fake_label(), fake_points(random.randint(3, 15))) + + +def fake_confidence() -> float: + return round(random.random(), 3) + + +def fake_inference_classification_label() -> inference.ClassificationLabel: + return inference.ClassificationLabel(fake_label(), fake_confidence()) + + +def fake_inference_bounding_box() -> inference.BoundingBox: + return inference.BoundingBox(fake_label(), fake_confidence(), *fake_points(2)) + + +def fake_inference_segmentation_mask() -> inference.SegmentationMask: + return inference.SegmentationMask(fake_label(), fake_confidence(), fake_points(random.randint(3, 15))) + + +def assert_test_image_equal(a: TestImage, b: TestImage) -> None: + assert a.locator == b.locator + assert a.dataset == b.dataset + assert a.metadata == b.metadata + assert sorted(a.ground_truths, key=lambda x: json.dumps(x._to_dict(), sort_keys=True)) == sorted( + b.ground_truths, + key=lambda x: json.dumps(x._to_dict(), sort_keys=True), + ) + + +def assert_test_images_equal(actual: List[TestImage], expected: List[TestImage]) -> None: + assert len(actual) == len(expected) + actual = sorted(actual, key=lambda x: x.locator) + expected = sorted(expected, key=lambda x: x.locator) + for a, b in zip(actual, expected): + assert_test_image_equal(a, b) diff --git a/tests/integration/detection/test_model.py b/tests/integration/detection/test_model.py new file mode 100644 index 000000000..1fdea1d80 --- /dev/null +++ b/tests/integration/detection/test_model.py @@ -0,0 +1,193 @@ +# Copyright 2021-2023 Kolena Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import List + +import pytest +from pydantic import ValidationError + +from kolena._api.v1.workflow import WorkflowType +from kolena.detection import Model +from kolena.detection import TestCase +from kolena.detection import TestImage +from kolena.detection import TestRun +from kolena.detection import TestSuite +from kolena.detection.ground_truth import BoundingBox as GTBoundingBox +from kolena.detection.ground_truth import GroundTruth +from kolena.detection.ground_truth import SegmentationMask as GTSegmentationMask +from kolena.detection.inference import BoundingBox as InfBoundingBox +from kolena.detection.inference import Inference +from kolena.detection.inference import SegmentationMask as InfSegmentationMask +from tests.integration.detection.conftest import TestData +from tests.integration.detection.helper import assert_test_images_equal +from tests.integration.detection.helper import fake_confidence +from tests.integration.helper import fake_random_locator +from tests.integration.helper import with_test_prefix + + +def test_init() -> None: + name = with_test_prefix(f"{__file__}::test_init model") + metadata = dict(a="A", b=1, c=None, d=True, e=[1, "test", None], f=dict(g="test"), h=None) + model = Model(name, metadata=metadata) + assert model.name == name + assert model.metadata == metadata + assert model._workflow == WorkflowType.DETECTION + assert model == Model(name) + + +def test_init_changed_metadata() -> None: + name = with_test_prefix(f"{__file__}::test_init_changed_metadata model") + metadata = dict(one="two", three="four") + model = Model(name, metadata=metadata) + assert model == Model(name, metadata=dict(changed="metadata")) # changed metadata is ignored + + +def test_create_bad_metadata() -> None: + with pytest.raises(ValidationError): + Model(with_test_prefix(f"{__file__}::test_create_bad_metadata model"), "not a dict") # type: ignore + with pytest.raises(ValidationError): + Model( + with_test_prefix(f"{__file__}::test_create_bad_metadata model 2"), + ["also", "not", "a", "dict"], + ) # type: ignore + + +def test_load_inferences_with_no_inferences(detection_test_data: TestData) -> None: + model = detection_test_data.models[0] + test_suite = detection_test_data.test_suites[1] + test_cases = test_suite.test_cases + test_case_id_0 = test_suite.test_cases[0]._id + test_case_id_1 = test_suite.test_cases[1]._id + + inferences = model.load_inferences(test_suite) + assert len(inferences) == 5 + + inferences_0 = model.load_inferences(test_cases[0]) + assert len(inferences_0) == 4 + + inferences_1 = model.load_inferences(test_cases[1]) + assert len(inferences_1) == 2 + + inferences = model.load_inferences_by_test_case(test_suite) + assert [(test_case_id, len(infer)) for test_case_id, infer in inferences.items()] == [ + (test_case_id_0, 4), + (test_case_id_1, 2), + ] + # verify ground_truths are properly scoped per test-case + # test-cases[0] has sample (sample_0, sample_1, sample_2, sample_4), gt (gt_0, None, gt_2, None,) + # test-cases[1] has sample (sample_2, sample_2, sample_3), gt (gt_1, gt_2, gt_4,) + # test sample #3 across test suite, should have different ground_truths + sample, _ = inferences[test_case_id_0][2] + assert len(sample.ground_truths) == 1 + sample, _ = inferences[test_case_id_1][0] + assert len(sample.ground_truths) == 2 + + # extra check for behavior consistency + assert inferences[test_case_id_0] == inferences_0 + assert inferences[test_case_id_1] == inferences_1 + + +def _test_load_inferences(test_name: str, n_images: int, gts: List[GroundTruth], infs: List[Inference]) -> None: + model = Model(with_test_prefix(f"{test_name} model")) + images = [ + TestImage( + fake_random_locator("detection/test-model"), + ground_truths=gts, + ) + for _ in range(n_images) + ] + test_case = TestCase(with_test_prefix(f"{__file__}::{test_name} test_case"), images=images) + test_suite = TestSuite(with_test_prefix(f"{__file__}::{test_name} test_suite"), test_cases=[test_case]) + fake_inferences = [] + with TestRun(model, test_suite) as test_run: + for image in test_run.iter_images(): + fake_inferences.append(infs) + test_run.add_inferences(image, inferences=infs) + + # fetch inference to make sure it has all inferences + inferences = model.load_inferences(test_suite) + actual_images = [test_image for test_image, _ in inferences] + actual_inferences = [infer for _, infer in inferences] + assert_test_images_equal(images, actual_images) + assert fake_inferences == actual_inferences + + # fetch inference using load_inferences_by_test_case to make sure it has all inferences + inferences_by_test_case = model.load_inferences_by_test_case(test_suite) + assert len(inferences_by_test_case) == 1 + + actual_images = [test_image for _, infer in inferences_by_test_case.items() for test_image, _ in infer] + actual_inferences = [inf for _, infer in inferences_by_test_case.items() for _, inf in infer] + assert_test_images_equal(images, actual_images) + assert fake_inferences == actual_inferences + + +def test_load_inferences_bounding_box() -> None: + _test_load_inferences( + test_name="test_load_inferences_bounding_box", + n_images=5, + gts=[ + GTBoundingBox( + label="car", + top_left=(0, 0), + bottom_right=(50, 50), + ), + GTBoundingBox( + label="bus", + top_left=(20, 20), + bottom_right=(100, 100), + ), + ], + infs=[ + InfBoundingBox( + label="car", + confidence=fake_confidence(), + top_left=(0, 0), + bottom_right=(30, 50), + ), + InfBoundingBox( + label="bus", + confidence=fake_confidence(), + top_left=(20, 10), + bottom_right=(80, 80), + ), + ], + ) + + +def test_load_inferences_segmentation_mask() -> None: + _test_load_inferences( + test_name="test_load_inferences_segmentation_mask", + n_images=5, + gts=[ + GTSegmentationMask( + label="car", + points=[(0, 0), (1, 1), (2, 2)], + ), + GTSegmentationMask( + label="bus", + points=[(0, 0), (1, 1), (2, 2)], + ), + ], + infs=[ + InfSegmentationMask( + label="car", + confidence=fake_confidence(), + points=[(0, 0), (1, 1), (2, 2)], + ), + InfSegmentationMask( + label="bus", + confidence=fake_confidence(), + points=[(0, 0), (1, 1), (2, 2)], + ), + ], + ) diff --git a/tests/integration/detection/test_test_case.py b/tests/integration/detection/test_test_case.py new file mode 100644 index 000000000..353f55ccd --- /dev/null +++ b/tests/integration/detection/test_test_case.py @@ -0,0 +1,456 @@ +# Copyright 2021-2023 Kolena Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from copy import deepcopy +from typing import List + +import pytest + +import kolena.detection.metadata +from kolena._api.v1.workflow import WorkflowType +from kolena.classification import TestCase as ClassificationTestCase +from kolena.detection import TestCase +from kolena.detection import TestImage +from kolena.detection.ground_truth import BoundingBox +from kolena.detection.ground_truth import ClassificationLabel +from kolena.detection.ground_truth import SegmentationMask +from kolena.detection.metadata import Asset +from kolena.errors import NameConflictError +from kolena.errors import NotFoundError +from kolena.errors import WorkflowMismatchError +from kolena.workflow.annotation import BoundingBox as GenericBoundingBox +from tests.integration.detection.helper import assert_test_images_equal +from tests.integration.generic.dummy import DummyGroundTruth +from tests.integration.generic.dummy import DummyTestSample +from tests.integration.generic.dummy import TestCase as GenericTestCase +from tests.integration.helper import fake_random_locator +from tests.integration.helper import with_test_prefix + + +@pytest.fixture +def test_dataset() -> List[TestImage]: + sample_dir = "detection/test-case" + dataset = with_test_prefix(f"{__file__}::test_dataset fixture dataset") + return [ + TestImage(fake_random_locator(sample_dir), dataset=dataset), + TestImage(fake_random_locator(sample_dir), dataset=dataset, ground_truths=[ClassificationLabel("car")]), + TestImage(fake_random_locator(sample_dir), dataset=dataset, ground_truths=[ClassificationLabel("bike")]), + TestImage(fake_random_locator(sample_dir), dataset=dataset, ground_truths=[BoundingBox("car", (0, 0), (1, 1))]), + TestImage( + fake_random_locator(sample_dir), + dataset=dataset, + ground_truths=[ + BoundingBox("car", (0, 0), (1, 1)), + BoundingBox("car", (2, 2), (3, 4)), + BoundingBox("bike", (3, 3), (9, 9)), + BoundingBox("car", (4, 2), (7, 8)), + ], + ), + TestImage( + fake_random_locator(sample_dir), + dataset=dataset, + ground_truths=[ + SegmentationMask( + "bike", + [ + (0, 0), + (1, 2), + (2, 1), + ], + ), + ], + ), + ] + + +def test__init() -> None: + name = with_test_prefix(f"{__file__}::test__init test case") + description = "some\ndescription\n\twith punctuation!" + test_case = TestCase(name, description=description) + assert test_case.name == name + assert test_case.version == 0 + assert test_case.description == description + assert test_case._workflow == WorkflowType.DETECTION + + test_case2 = TestCase(name) # should re-load previously created + assert test_case == test_case2 + + test_case3 = TestCase(name, description="a different description") # different description is ignored + assert test_case == test_case3 + + +def test__init__with_version(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__init__with_version test case") + test_case = TestCase(name, description="test") + test_case0 = TestCase(name, version=test_case.version) # reload with matching version + assert test_case == test_case0 + + with pytest.raises(NameConflictError): + TestCase(name, version=123) # invalid version throws + + with test_case.edit() as editor: + editor.add(test_dataset[0]) + + assert test_case.version == 1 + assert test_case == TestCase(name, version=test_case.version) + assert test_case0 == TestCase(name, version=test_case0.version) + + +def test__init__with_images(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__init__with_images test case") + images = [test_dataset[0], test_dataset[3], test_dataset[4]] + test_case = TestCase(name, images=images) + assert test_case.version == 1 + assert_test_images_equal(test_case.load_images(), images) + assert test_case._workflow == WorkflowType.DETECTION + + +def test__init__reset(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__init__reset test case") + description = f"{name} (description)" + images = [test_dataset[3]] + TestCase(name, description=description, images=images) + + new_images = [test_dataset[4]] + test_case = TestCase(name, images=new_images, reset=True) + assert test_case.version == 2 + assert test_case.description == description # not updated or cleared + assert_test_images_equal(test_case.load_images(), new_images) + + +def test__init__with_images_reset(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__init__with_images_reset test case") + images = [test_dataset[0], test_dataset[3], test_dataset[4]] + test_case = TestCase(name, images=images, reset=True) + assert test_case.version == 1 + assert_test_images_equal(test_case.load_images(), images) + assert test_case._workflow == WorkflowType.DETECTION + + +def test__init__reset_with_overlap(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__init__reset_with_overlap test case") + description = f"{name} (description)" + images_1 = [test_dataset[0], test_dataset[3]] + images_2 = [test_dataset[0], test_dataset[4]] + TestCase(name, description=description, images=images_1) + + test_case = TestCase(name, images=images_2, reset=True) + assert test_case.version == 2 + assert test_case.description == description # not updated or cleared + assert_test_images_equal(test_case.load_images(), images_2) # overlapping should be preserved + + +def test__init__reset_with_other_test_case(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__init__reset_with_other_test_case test case") + name_other = with_test_prefix(f"{__file__}::test__init__reset_with_other_test_case test case (other)") + description = f"{name} (description)" + images_1 = [test_dataset[0], test_dataset[3]] + images_2 = [test_dataset[0], test_dataset[3], test_dataset[4]] + images_3 = [test_dataset[0]] + + # Create and update test case + TestCase(name, description=description, images=images_1) + test_case_other = TestCase(name_other, images=images_2) + + test_case = TestCase(name, images=images_3, reset=True) + assert test_case.version == 2 + assert test_case.description == description # not updated or cleared + assert_test_images_equal(test_case_other.load_images(), images_2) # images_2 should be untouched + assert_test_images_equal(test_case.load_images(), images_3) # images_1 should be cleared + + +def test__init__reset_resets_all_past_samples(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__init__reset_resets_all_past_samples test case") + description = f"{name} (description)" + images_1 = [test_dataset[0], test_dataset[3]] + images_2 = [test_dataset[0], test_dataset[3], test_dataset[4], test_dataset[5]] + images_3 = [test_dataset[1], test_dataset[2]] + + # Create and update test case + initial_test_case = TestCase(name, description=description, images=images_1) + with initial_test_case.edit() as editor: + for image in images_2: + editor.add(image) + + test_case = TestCase(name, images=images_3, reset=True) + assert test_case.version == 3 + assert test_case.description == description # not updated or cleared + assert_test_images_equal(test_case.load_images(), images_3) # both images_1 and images_2 should be cleared + + +def test__edit(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__edit test case") + test_case = TestCase(name) + assert test_case.version == 0 + + new_description = "updated description" + with test_case.edit() as editor: + editor.description(new_description) + for image in test_dataset: + editor.add(image) + editor.remove(test_dataset[-1]) + + assert test_case.version == 1 + assert test_case.description == new_description + images_loaded = test_case.load_images() + remaining_images = test_dataset[:-1] + assert_test_images_equal(images_loaded, remaining_images) + + with test_case.edit() as editor: + editor.remove(remaining_images[0]) + editor.remove(remaining_images[1]) + + assert test_case.version == 2 + images_loaded = test_case.load_images() + remaining_images = remaining_images[2:] + assert_test_images_equal(images_loaded, remaining_images) + + +def test__edit__reset(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__edit__reset test case") + description = f"{name} (description)" + images_1 = [test_dataset[0], test_dataset[3], test_dataset[4], test_dataset[5]] + images_2 = [test_dataset[1], test_dataset[2], test_dataset[3]] + test_case = TestCase(name, description=description, images=images_1) + + # no op + with test_case.edit(reset=True) as editor: + editor.description(description) + assert test_case.version == 1 + assert test_case.description == description + assert_test_images_equal(test_case.load_images(), images_1) + + with test_case.edit(reset=True) as editor: + for image in images_2: + editor.add(image) + + assert test_case.version == 2 + assert test_case.description == description + assert_test_images_equal(test_case.load_images(), images_2) + + +def test__edit__empty(test_dataset: List[TestImage]) -> None: + test_case = TestCase(with_test_prefix(f"{__file__}::test__edit__empty test case")) + + with test_case.edit() as editor: + editor.description("description") + assert test_case.version == 1 + assert test_case.description == "description" + + # add a sample to the test case for later removal + with test_case.edit() as editor: + editor.add(test_dataset[0]) + assert test_case.version == 2 + assert len(test_case.load_images()) == 1 + + # empty the test case + with test_case.edit() as editor: + editor.remove(test_dataset[0]) + assert test_case.version == 3 + assert len(test_case.load_images()) == 0 + + +def test__edit__no_ground_truths(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__edit__no_ground_truths test case") + test_case = TestCase(name) + assert test_case.version == 0 + + images_no_gt = [image.filter(lambda _: False) for image in test_dataset] + with test_case.edit() as editor: + for image in images_no_gt: + editor.add(image) + + assert_test_images_equal(test_case.load_images(), images_no_gt) + + +def test__edit__specific_ground_truths(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__edit__specific_ground_truths test case") + test_case = TestCase(name) + + images_car_only = [image.filter(lambda gt: gt.label == "car") for image in test_dataset] + images_car_only = [image for image in images_car_only if len(image.ground_truths) > 0] + with test_case.edit() as editor: + for image in images_car_only: + editor.add(image) + + assert test_case.version == 1 + assert_test_images_equal(test_case.load_images(), images_car_only) + + +def test__edit__no_op() -> None: + test_case = TestCase(with_test_prefix(f"{__file__}::test__edit__no_op test case")) + with test_case.edit(): + ... + assert test_case.version == 0 + + +def test__edit__updated(test_dataset: List[TestImage]) -> None: + test_case_name = with_test_prefix(f"{__file__} test__edit__updated test case") + images = [test_dataset[4]] + test_case = TestCase(test_case_name, images=images) + assert test_case.version == 1 + + # no op + with test_case.edit() as editor: + editor.add(images[0]) + assert test_case.version == 1 + + updated_image_0 = deepcopy(images[0]) + updated_label = "new label" + updated_image_0.ground_truths[0].label = updated_label + loaded_images_before = test_case.load_images() + # update the existing test sample + with test_case.edit() as editor: + editor.add(updated_image_0) + loaded_images_after = test_case.load_images() + assert test_case.version == 2 + assert len(loaded_images_before) == len(loaded_images_after) == 1 + assert loaded_images_before != loaded_images_after + assert updated_label not in [gt.label for gt in loaded_images_before[0].ground_truths] + assert updated_label in [gt.label for gt in loaded_images_after[0].ground_truths] + + +def test_update_dataset() -> None: + dataset = with_test_prefix("test") + locator = fake_random_locator() + name_prefix = with_test_prefix(f"{__file__}::test_update_dataset") + TestCase(f"{name_prefix} test case", images=[TestImage(locator, dataset=dataset)]) + + # shouldn't override previously set dataset + test_case0 = TestCase(f"{name_prefix} test case 0", images=[TestImage(locator)]) + test_case0_images = test_case0.load_images() + assert len(test_case0_images) == 1 + assert test_case0_images[0].dataset == dataset + + # shouldn't override previously set dataset + test_case1 = TestCase(f"{name_prefix} test case 1", images=[TestImage(locator, dataset="")]) + test_case1_images = test_case1.load_images() + assert len(test_case1_images) == 1 + assert test_case1_images[0].dataset == dataset + + # should override + test_case2 = TestCase(f"{name_prefix} test case 2", images=[TestImage(locator, dataset="new")]) + test_case2_images = test_case2.load_images() + assert len(test_case2_images) == 1 + assert test_case2_images[0].dataset == "new" + + +def test_update_metadata() -> None: + bbox = kolena.detection.metadata.BoundingBox(top_left=(0, 1), bottom_right=(2, 3)) + asset = Asset(locator=fake_random_locator()) + metadata = dict(a="a", b=True, c=3, d=asset, e=bbox) + test_image0 = TestImage(fake_random_locator(), dataset="test", metadata=metadata) + TestCase(with_test_prefix(f"{__file__}::test_update_metadata test case 0"), images=[test_image0]) + + metadata_updated = {**metadata, **dict(c=4.3, d=False)} + test_image1 = TestImage(test_image0.locator, dataset=test_image0.dataset, metadata=metadata_updated) + test_case1 = TestCase(with_test_prefix(f"{__file__}::test_update_metadata test case 1"), images=[test_image1]) + test_case1_images = test_case1.load_images() + assert len(test_case1_images) == 1 + assert test_case1_images[0].metadata == metadata_updated + + +def test__create() -> None: + test_case_name = with_test_prefix(f"{__file__} test__create test case") + description = f"{test_case_name} (description)" + test_case = TestCase.create(test_case_name, description) + assert test_case.version == 0 + assert test_case.name == test_case_name + assert test_case.description == description + assert test_case._workflow == WorkflowType.DETECTION + + +def test__create__with_images(test_dataset: List[TestImage]) -> None: + name = with_test_prefix(f"{__file__}::test__create__with_images test case") + description = f"{name} (description)" + images = [test_dataset[0], test_dataset[3], test_dataset[4]] + test_case = TestCase.create(name, description, images) + assert test_case.version == 1 + assert_test_images_equal(test_case.load_images(), images) + assert test_case._workflow == WorkflowType.DETECTION + + +def test__load() -> None: + test_case_name = with_test_prefix(f"{__file__} test__load test case") + test_case = TestCase(test_case_name) + loaded_test_case = TestCase.load(test_case_name) + assert test_case == loaded_test_case + + +def test__load__with_version() -> None: + test_case_name = with_test_prefix(f"{__file__} test__load__with_version test case") + test_case = TestCase(test_case_name) + new_description = f"{__file__} test__load__version new description" + with test_case.edit() as editor: + editor.description(new_description) + + loaded_test_case_default = TestCase.load(test_case_name) + loaded_test_case_v0 = TestCase.load(test_case_name, 0) + loaded_test_case_v1 = TestCase.load(test_case_name, 1) + + assert loaded_test_case_default == loaded_test_case_v1 + + assert loaded_test_case_default.version == 1 + assert loaded_test_case_default.description == new_description + + assert loaded_test_case_v0.version == 0 + assert loaded_test_case_v0.description == "" + + assert loaded_test_case_v1.version == 1 + assert loaded_test_case_v1.description == new_description + + +def test__load__mismatch() -> None: + test_case_name = with_test_prefix(f"{__file__} test__load__mismatch test case") + ClassificationTestCase(test_case_name) + with pytest.raises(WorkflowMismatchError) as exc_info: + TestCase.load(test_case_name) + + exc_info_value = str(exc_info.value) + assert ClassificationTestCase._workflow.value in exc_info_value + assert TestCase._workflow.value in exc_info_value + + +def test__load__with_version_mismatch() -> None: + test_case_name = with_test_prefix(f"{__file__} test__load__with_version_mismatch test case") + TestCase(test_case_name) + mismatch_version = 42 + with pytest.raises(NotFoundError) as exc_info: + TestCase.load(test_case_name, mismatch_version) + + exc_info_value = str(exc_info.value) + assert f"(version {mismatch_version})" in exc_info_value + + +def test__create__with_locator_collision() -> None: + test_case_name = with_test_prefix(f"{__file__} test__create__with_locator_collision test case") + locator = fake_random_locator() + + generic_sample = DummyTestSample( # type: ignore + locator=locator, + value=0, + bbox=GenericBoundingBox(top_left=(0, 0), bottom_right=(0, 0)), + ) + generic_ground_truth = DummyGroundTruth(label="dummy", value=0) + GenericTestCase( + with_test_prefix(f"{__file__}::{test_case_name} generic"), + test_samples=[ + ( + generic_sample, + generic_ground_truth, + ), + ], + ) + test_case = TestCase(test_case_name, images=[TestImage(locator)]) + images = test_case.load_images() + assert len(images) == 1 diff --git a/tests/integration/detection/test_test_image.py b/tests/integration/detection/test_test_image.py new file mode 100644 index 000000000..700344b7b --- /dev/null +++ b/tests/integration/detection/test_test_image.py @@ -0,0 +1,220 @@ +# Copyright 2021-2023 Kolena Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import random +import uuid +from typing import cast +from typing import List + +import pytest + +import kolena.detection.ground_truth as ground_truth +from kolena.detection import iter_images +from kolena.detection import load_images +from kolena.detection import TestCase +from kolena.detection import TestImage +from kolena.detection.metadata import Asset +from kolena.detection.metadata import BoundingBox +from kolena.detection.metadata import Landmarks +from tests.integration.detection.helper import assert_test_images_equal +from tests.integration.detection.helper import fake_gt_bounding_box +from tests.integration.detection.helper import fake_gt_classification_label +from tests.integration.detection.helper import fake_gt_segmentation_mask +from tests.integration.detection.helper import fake_label +from tests.integration.detection.helper import fake_points +from tests.integration.helper import fake_random_locator +from tests.integration.helper import with_test_prefix + + +@pytest.fixture(scope="module") +def test_images() -> List[TestImage]: + images = fake_test_images(15) + # images dataset is generated by with_test_prefix + TestCase(f"{images[0].dataset} test case for registration purposes", images=images) + return images + + +def test__load_images__all(test_images: List[TestImage]) -> None: + result = load_images() + assert len(result) >= len(test_images) + + result_locators = {image.locator for image in result} + test_image_locators = {image.locator for image in test_images} + # regardless of whatever other images have been registered, at least the expected test_images are present + assert len(test_image_locators - result_locators) == 0 + + +def test__load_images__with_dataset(test_images: List[TestImage]) -> None: + result = cast(List[TestImage], load_images(test_images[0].dataset)) + assert_test_images_equal(result, test_images) + + +def test__iter_images__with_dataset(test_images: List[TestImage]) -> None: + result = list(iter_images(test_images[0].dataset)) + assert_test_images_equal(result, test_images) + + +def test__resolve_existing() -> None: + def register(images: List[TestImage]) -> None: + TestCase(str(uuid.uuid4()), images=images) + + dataset = with_test_prefix(str(uuid.uuid4())) + image_a0 = TestImage( + fake_random_locator(), + dataset=dataset, + ground_truths=[fake_gt_classification_label(), fake_gt_segmentation_mask()], + ) + image_b0 = TestImage( + fake_random_locator(), + dataset=dataset, + ground_truths=[fake_gt_bounding_box(), fake_gt_bounding_box()], + ) + image_c0 = TestImage(fake_random_locator(), dataset=dataset, ground_truths=[]) + register([image_a0, image_b0, image_c0]) + + image_a1 = TestImage(image_a0.locator, dataset=dataset) + image_b1 = TestImage( + image_b0.locator, + dataset=dataset, + ground_truths=[image_b0.ground_truths[1], fake_gt_bounding_box()], + ) + image_c1 = TestImage(image_c0.locator, dataset=dataset, ground_truths=[fake_gt_segmentation_mask()]) + register([image_a1, image_b1, image_c1]) + + assert_test_images_equal( + load_images(dataset), + [ + image_a0, + TestImage( + image_b0.locator, + dataset=dataset, + ground_truths=[image_b0.ground_truths[0], *image_b1.ground_truths], + ), + image_c1, + ], + ) + + +def test__load_images__metadata() -> None: + metadata = dict( + example_str="some example string with\narbitrary\tcharacters 😁", + example_float=1.2, # relatively round; no guarantee of exactness + example_int=-3, + example_bool=True, + example_bounding_box=BoundingBox((1, 2), (3, 4)), + example_landmarks=Landmarks([(1, 2), (3, 4), (5, 6), (7, 8), (9, 0)]), + example_asset=Asset("s3://path/to/example/asset.jpg"), + ) + dataset = with_test_prefix(str(uuid.uuid4())) + image = TestImage(fake_random_locator(), dataset=dataset, ground_truths=[fake_gt_bounding_box()], metadata=metadata) + TestCase(with_test_prefix(str(uuid.uuid4())), images=[image]) + assert load_images(dataset) == [image] + + +def test__load_difficult_ground_truth() -> None: + difficult_classification = ground_truth.ClassificationLabel(fake_label(), difficult=True) + non_difficult_classification = ground_truth.ClassificationLabel(fake_label(), difficult=False) + difficult_bbox = ground_truth.BoundingBox(fake_label(), *fake_points(2), difficult=True) + non_difficult_bbox = ground_truth.BoundingBox(fake_label(), *fake_points(2), difficult=False) + difficult_seg_mask = ground_truth.SegmentationMask(fake_label(), fake_points(4), difficult=True) + non_difficult_seg_mask = ground_truth.SegmentationMask(fake_label(), fake_points(4), difficult=False) + + for gts in ( + [difficult_classification, non_difficult_classification], + [difficult_bbox, non_difficult_bbox], + [difficult_seg_mask, non_difficult_seg_mask], + ): + dataset = with_test_prefix(str(uuid.uuid4())) + image = TestImage(fake_random_locator(), dataset=dataset, ground_truths=gts) + TestCase(with_test_prefix(str(uuid.uuid4())), images=[image]) + # note: this method is deprecated + loaded_images = load_images(dataset) + assert_test_images_equal(cast(List[TestImage], loaded_images), [image]) + + +def test__load_duplicated_ground_truth() -> None: + locator = fake_random_locator() + label = fake_label() + points = fake_points(2) + + difficult_bbox = ground_truth.BoundingBox(label=label, top_left=points[0], bottom_right=points[1], difficult=True) + non_difficult_bbox = ground_truth.BoundingBox( + label=label, + top_left=points[0], + bottom_right=points[1], + difficult=False, + ) + + # Register single ground truth, get single ground truth + test_case = TestCase( + with_test_prefix(str(uuid.uuid4())), + images=[ + TestImage( + locator, + ground_truths=[ + non_difficult_bbox, + ], + ), + ], + ) + got_images = test_case.load_images() + assert len(got_images) == 1 + assert len(got_images[0].ground_truths) == 1 + assert got_images[0].ground_truths[0] == non_difficult_bbox + + # Register single ground truth with difficult, get single ground truth + test_case = TestCase( + with_test_prefix(str(uuid.uuid4())), + images=[ + TestImage( + locator, + ground_truths=[ + difficult_bbox, + ], + ), + ], + ) + got_images = test_case.load_images() + assert len(got_images) == 1 + assert len(got_images[0].ground_truths) == 1 + assert got_images[0].ground_truths[0] == difficult_bbox + + # Check that nothing changed with non-difficult case + test_case = TestCase( + with_test_prefix(str(uuid.uuid4())), + images=[ + TestImage( + locator, + ground_truths=[ + non_difficult_bbox, + ], + ), + ], + ) + got_images = test_case.load_images() + assert len(got_images) == 1 + assert len(got_images[0].ground_truths) == 1 + assert got_images[0].ground_truths[0] == non_difficult_bbox + + +def fake_test_images(n: int) -> List[TestImage]: + dataset = with_test_prefix(f"dataset-{str(uuid.uuid4())}") + gt_choices = [fake_gt_classification_label, fake_gt_bounding_box, fake_gt_segmentation_mask] + return [ + TestImage( + fake_random_locator(dataset), + dataset=dataset, + ground_truths=[random.choice(gt_choices)() for _ in range(random.randint(0, 3))], + ) + for _ in range(n) + ] diff --git a/tests/integration/detection/test_test_run.py b/tests/integration/detection/test_test_run.py new file mode 100644 index 000000000..d2168f23d --- /dev/null +++ b/tests/integration/detection/test_test_run.py @@ -0,0 +1,441 @@ +# Copyright 2021-2023 Kolena Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +import random +from typing import List +from typing import Optional +from typing import Tuple +from unittest.mock import patch + +import pytest + +import kolena +from kolena._api.v1.detection import CustomMetrics +from kolena._api.v1.detection import TestRun as TestRunAPI +from kolena.detection import InferenceModel +from kolena.detection import Model +from kolena.detection import test +from kolena.detection import TestCase +from kolena.detection import TestImage +from kolena.detection import TestSuite +from kolena.detection.ground_truth import BoundingBox as GTBoundingBox +from kolena.detection.inference import BoundingBox +from kolena.detection.inference import ClassificationLabel +from kolena.detection.inference import Inference +from kolena.detection.inference import SegmentationMask +from kolena.detection.test_run import TestRun +from kolena.errors import CustomMetricsException +from kolena.errors import InputValidationError +from kolena.errors import RemoteError +from tests.integration.detection.conftest import TestData +from tests.integration.helper import fake_random_locator +from tests.integration.helper import with_test_prefix + + +@pytest.fixture(scope="module") +def detection_model(detection_test_data: TestData) -> Model: + return detection_test_data.models[0] + + +@pytest.fixture(scope="module") +def detection_test_suites(detection_test_data: TestData) -> List[TestSuite]: + return [detection_test_data.test_suites[0], detection_test_data.test_suites[2]] + + +@pytest.fixture +def detection_test_image_locators(detection_test_data: TestData) -> List[str]: + # all image locators in test suite A + return sorted(detection_test_data.locators[:-1]) + + +def generate_image_results(images: List[TestImage]) -> List[Tuple[TestImage, Optional[List[Inference]]]]: + return [(image, generate_single_image_inferences(image)) for image in images] + + +def generate_single_image_inferences(image: TestImage) -> List[Inference]: + # deterministically generate inferences + random.seed(hash(image.locator)) + return_switch = random.random() + if return_switch < 0.2: + return [] + class_inf = ClassificationLabel(label="car", confidence=random.random()) + bb_inf = BoundingBox( + confidence=random.random(), + label="bike", + top_left=(random.random() * 300, random.random() * 300), + bottom_right=(random.random() * 300, random.random() * 300), + ) + seg_inf = SegmentationMask( + confidence=random.random(), + label="pedestrian", + points=[(random.random() * 300, random.random() * 300) for _ in range(5)], + ) + if return_switch < 0.4: + return [class_inf] + if return_switch < 0.6: + return [bb_inf] + if return_switch < 0.8: + return [seg_inf] + return [class_inf, bb_inf, seg_inf] + + +# +# Interacting with a TestRun is naturally a sequenced operation -- here each test depends on the next test and likely +# uses some of the same functionality +# + + +def test__create_or_retrieve(detection_test_data: TestData) -> None: + model = detection_test_data.models[0] + test_suite = detection_test_data.test_suites[0] + + with TestRun(model, test_suite) as test_run_created: + created_id = test_run_created._id + + with TestRun(model, test_suite) as test_run_retrieved: + retrieved_id = test_run_retrieved._id + + assert retrieved_id == created_id + + +def test__create_or_retrieve__with_params(detection_test_data: TestData) -> None: + model = Model(with_test_prefix(f"{__file__}::test_create_or_retrieve_test_run_with_params model")) + test_suite = detection_test_data.test_suites[0] + + # Check invalid kwargs + with pytest.raises(InputValidationError): + TestRun(model, test_suite, test_config=kolena.detection.test_config.FixedGlobalThreshold(-0.5)) + with pytest.raises(InputValidationError): + TestRun(model, test_suite, test_config=kolena.detection.test_config.FixedGlobalThreshold(1.5)) + with pytest.raises(InputValidationError): + TestRun(model, test_suite, test_config=kolena.detection.test_config.F1Optimal(-0.5)) + with pytest.raises(InputValidationError): + TestRun(model, test_suite, test_config=kolena.detection.test_config.F1Optimal(1.5)) + + with TestRun( + model, + test_suite, + test_config=kolena.detection.test_config.FixedGlobalThreshold(0.5), + ) as test_run_created: + created_id = test_run_created._id + + with TestRun( + model, + test_suite, + test_config=kolena.detection.test_config.FixedGlobalThreshold(0.5), + ) as test_run_retrieved: + retrieved_id = test_run_retrieved._id + + assert retrieved_id == created_id + + +@pytest.mark.depends(on=["test__create_or_retrieve"]) +def test__load_images( + detection_model: Model, + detection_test_suites: List[TestSuite], + detection_test_image_locators: List[str], +) -> None: + with TestRun(detection_model, detection_test_suites[0]) as test_run: + remaining_images_actual = test_run.load_images() + assert sorted(image.locator for image in remaining_images_actual) == detection_test_image_locators + assert sorted(image.metadata["i"] for image in remaining_images_actual) == list(range(4)) + + # fetching again should retrieve the same data if no results were uploaded + with TestRun(detection_model, detection_test_suites[0]) as test_run: + remaining_images_actual = test_run.load_images(batch_size=500) + assert sorted(image.locator for image in remaining_images_actual) == detection_test_image_locators + + batch_size = 2 + with TestRun(detection_model, detection_test_suites[0]) as test_run: + remaining_images_actual = test_run.load_images(batch_size=2) + assert len(remaining_images_actual) == batch_size + + # zero-size batches are not allowed + with pytest.raises(InputValidationError): + with TestRun(detection_model, detection_test_suites[0]) as test_run: + test_run.load_images(batch_size=0) + + +@pytest.mark.depends(on=["test__create_or_retrieve"]) +def test__iter_images( + detection_model: Model, + detection_test_suites: List[TestSuite], + detection_test_image_locators: List[str], +) -> None: + with TestRun(detection_model, detection_test_suites[0]) as test_run: + remaining_images_actual = list(test_run.iter_images()) + assert sorted(image.locator for image in remaining_images_actual) == detection_test_image_locators + + # fetching again should retrieve the same data if no results were uploaded + with TestRun(detection_model, detection_test_suites[0]) as test_run: + remaining_images_actual = list(test_run.iter_images()) + assert sorted(image.locator for image in remaining_images_actual) == detection_test_image_locators + + +@pytest.mark.depends(on=["test__load_images"]) +def test__add_inferences__validation(detection_model: Model, detection_test_suites: List[TestSuite]) -> None: + fake_image = TestImage(fake_random_locator()) + fake_inference = ClassificationLabel(label="car", confidence=0.5) + + with pytest.raises(InputValidationError): + # assert that we guard against images from outside the test suite + with TestRun(detection_model, detection_test_suites[0]) as test_run: + test_run.add_inferences(fake_image, [fake_inference]) + + +@pytest.mark.depends(on=["test__load_images"]) +def test__add_inferences__validation__invalid_confidence( + detection_model: Model, + detection_test_suites: List[TestSuite], +) -> None: + with pytest.raises(RemoteError): + with TestRun(detection_model, detection_test_suites[0]) as test_run: + [image] = test_run.load_images(batch_size=1) + bad_inference = ClassificationLabel("car", 0) + bad_inference.confidence = float("nan") # bypass validation on constructor + test_run.add_inferences(image, [bad_inference]) + + +@pytest.mark.depends(on=["test__load_images"]) +def test_add_inferences__validation__ignored_sample() -> None: + test_name = with_test_prefix(f"{__file__}::test_add_inferences__validation__ignored_sample") + model = Model(f"{test_name} model") + images = [ + TestImage( + fake_random_locator(), + ground_truths=[ + kolena.detection.ground_truth.BoundingBox(label="car", top_left=(0, 0), bottom_right=(100, 100)), + ], + ) + for _ in range(5) + ] + test_case = TestCase(f"{test_name} test_case", images=images) + test_suite = TestSuite(f"{test_name} test_suite", test_cases=[test_case]) + + with pytest.raises(RemoteError): + with TestRun(model, test_suite) as test_run: + for i, image in enumerate(test_run.iter_images()): + test_run.add_inferences( + image, + inferences=[BoundingBox(label="car", confidence=0.5, top_left=(0, 0), bottom_right=(100, 100))], + ) + if i == 0: + test_run.add_inferences(image, inferences=None) + + +@pytest.mark.depends(on=["test__load_images"]) +def test__add_inferences__validation__all_ignore() -> None: + test_name = with_test_prefix(f"{__file__}::test__add_inferences__validation__all_ignore") + model = Model(f"{test_name} model") + images = [ + TestImage( + fake_random_locator(), + ground_truths=[ + kolena.detection.ground_truth.BoundingBox(label="car", top_left=(0, 0), bottom_right=(100, 100)), + ], + ) + for _ in range(5) + ] + test_case = TestCase(f"{test_name} test_case", images=images) + test_suite = TestSuite(f"{test_name} test_suite", test_cases=[test_case]) + + with pytest.raises(RemoteError): + with TestRun(model, test_suite) as test_run: + for image in test_run.iter_images(): + test_run.add_inferences(image, inferences=None) + + +@pytest.mark.depends(on=["test__add_inferences__validation"]) +def test__add_inferences(detection_model: Model, detection_test_suites: List[TestSuite]) -> None: + with TestRun(detection_model, detection_test_suites[0]) as test_run: + remaining_images = test_run.load_images(batch_size=2) + image_results = generate_image_results(remaining_images) + for image, inferences in image_results: + test_run.add_inferences(image, inferences) + + with pytest.raises(InputValidationError): + # shouldn't be able to upload duplicate entries + with TestRun(detection_model, detection_test_suites[0]) as test_run: + for image, inferences in image_results: + test_run.add_inferences(image, inferences) + + # test no inferences and iter_images + with TestRun(detection_model, detection_test_suites[0]) as test_run: + for image in test_run.iter_images(): + test_run.add_inferences(image, []) + break # break to process only one image + + with TestRun(detection_model, detection_test_suites[0]) as test_run: + remaining_images = test_run.load_images() + image_results = generate_image_results(remaining_images) + # load images that have not been processed such that results can be uploaded + for _image, _inferences in image_results: + test_run.add_inferences(_image, _inferences) + + # results have been received for the entire suite, no more images to fetch + with TestRun(detection_model, detection_test_suites[0]) as test_run: + remaining_images = test_run.load_images() + assert len(remaining_images) == 0 + + +@pytest.mark.depends(on=["test__add_inferences"]) +def test__noop(detection_test_data: TestData) -> None: + model = detection_test_data.models[0] + # test-suite "A_subset" + with TestRun(model, detection_test_data.test_suites[3]) as test_run: + remaining_images = test_run.load_images() + assert len(remaining_images) == 0 + + +def test__test(detection_test_data: TestData) -> None: + model = InferenceModel(with_test_prefix(f"{__file__}::test__test inference model"), infer=lambda _image: []) + test_suite = detection_test_data.test_suites[3] + + with TestRun(model, test_suite) as test_run: + test_run_id = test_run._id + assert len(test_run.load_images()) > 0 + + # should complete all tests + test(model, test_suite) + + with TestRun(model, test_suite) as test_run: + assert test_run_id == test_run._id + assert len(test_run.load_images()) == 0 + + +def test__test__reset() -> None: + test_name = with_test_prefix(f"{__file__}::test__test__reset") + n_images = 5 + images = [ + TestImage( + fake_random_locator(), + dataset=test_name, + ground_truths=[ + GTBoundingBox( + "bike", + (0.0 + idx, 0.0 + idx), + (100.0 + idx, 100.0 + idx), + ), + GTBoundingBox( + "pedestrian", + (0.0 + idx * 10, 0.0 + idx * 10), + (100.0 + idx * 10, 100.0 + idx * 10), + ), + ], + ) + for idx in range(n_images) + ] + test_case = TestCase(f"{test_name} test_case", images=images) + test_suite = TestSuite(name=f"{test_name} test suite", test_cases=[test_case]) + + bb_bike = BoundingBox( + label="bike", + confidence=0.89, + top_left=(42.0, 42.0), + bottom_right=(420.0, 420.0), + ) + + def infer_bike(_: TestImage) -> List[BoundingBox]: + return [bb_bike] + + bb_pedestrian = BoundingBox( + label="pedestrian", + confidence=0.79, + top_left=(42.0, 42.0), + bottom_right=(420.0, 420.0), + ) + + def infer_pedestrian(_: TestImage) -> List[BoundingBox]: + return [bb_pedestrian] + + model_bike = InferenceModel(f"{test_name} inference model", infer=infer_bike) + model_pedestrian = InferenceModel(f"{test_name} inference model", infer=infer_pedestrian) + assert model_bike._id == model_pedestrian._id + + with TestRun(model_bike, test_suite) as test_run: + assert len(test_run.load_images()) > 0 + + test(model_bike, test_suite) + assert [inf for _, inf in model_bike.load_inferences(test_suite)] == [[bb_bike] for _ in range(n_images)] + + with TestRun(model_bike, test_suite) as test_run: + assert len(test_run.load_images()) == 0 + + test(model_pedestrian, test_suite, reset=True) + assert [inf for _, inf in model_pedestrian.load_inferences(test_suite)] == [ + [bb_pedestrian] for _ in range(n_images) + ] + + +def test__custom_metrics(detection_test_data: TestData) -> None: + def custom_metrics(inferences: List[Tuple[TestImage, Optional[List[Inference]]]]) -> CustomMetrics: + num_infers = sum(len(infer) if infer else 0 for sample, infer in inferences) + return {"foo": num_infers} + + model = InferenceModel( + with_test_prefix(f"{__file__}::test_test_run_custom_metrics"), + infer=lambda _image: [ + BoundingBox( + confidence=random.random(), + label="car", + top_left=(random.random() * 300, random.random() * 300), + bottom_right=(random.random() * 300, random.random() * 300), + ), + ], + ) + test_suite = detection_test_data.test_suites[1] + test(model, test_suite, custom_metrics_callback=custom_metrics) + + +def test__custom_metrics__error(detection_test_data: TestData) -> None: + def bad_custom_metrics(_: List[Tuple[TestImage, Optional[List[Inference]]]]) -> CustomMetrics: + raise KeyError("dumb error") + + model = InferenceModel( + with_test_prefix(f"{__file__}::test_test_run_custom_metrics_error"), + infer=lambda _image: [ + BoundingBox( + confidence=random.random(), + label="car", + top_left=(random.random() * 300, random.random() * 300), + bottom_right=(random.random() * 300, random.random() * 300), + ), + ], + ) + test_suite = detection_test_data.test_suites[0] + + with pytest.raises(CustomMetricsException): + test(model, test_suite, custom_metrics_callback=bad_custom_metrics) + + +def test__mark_crashed(detection_test_data: TestData) -> None: + def infer(_: TestImage) -> Optional[List[BoundingBox]]: + raise RuntimeError + + model = InferenceModel(with_test_prefix(f"{__file__}::test_mark_crashed inference model"), infer=infer) + test_suite = detection_test_data.test_suites[1] + + test_run = TestRun(model, test_suite) + + with patch("kolena.detection._internal.test_run.report_crash") as patched: + with pytest.raises(RuntimeError): + with test_run: + raise RuntimeError + + patched.assert_called_once_with(test_run._id, TestRunAPI.Path.MARK_CRASHED) + + with patch("kolena.detection._internal.test_run.report_crash") as patched: + with pytest.raises(RuntimeError): + test(model, test_suite) + + patched.assert_called_once_with(test_run._id, TestRunAPI.Path.MARK_CRASHED) diff --git a/tests/integration/detection/test_test_suite.py b/tests/integration/detection/test_test_suite.py new file mode 100644 index 000000000..acf930aaa --- /dev/null +++ b/tests/integration/detection/test_test_suite.py @@ -0,0 +1,325 @@ +# Copyright 2021-2023 Kolena Inc. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +from typing import List + +import pytest + +from kolena._api.v1.workflow import WorkflowType +from kolena.classification import TestCase as ClassificationTestCase +from kolena.classification import TestSuite as ClassificationTestSuite +from kolena.detection import TestCase +from kolena.detection import TestImage +from kolena.detection import TestSuite +from kolena.detection.ground_truth import ClassificationLabel +from kolena.errors import NameConflictError +from kolena.errors import NotFoundError +from kolena.errors import WorkflowMismatchError +from tests.integration.helper import fake_random_locator +from tests.integration.helper import with_test_prefix + + +@pytest.fixture(scope="module") +def test_case() -> TestCase: + name = with_test_prefix(f"{__file__}::test_case fixture test case") + return TestCase(name, description="test case description") + + +@pytest.fixture(scope="module") +def test_dataset() -> List[TestImage]: + name = with_test_prefix(f"{__file__}::test_dataset fixture dataset") + return [ + TestImage(fake_random_locator(), dataset=name), + TestImage(fake_random_locator(), dataset=name, ground_truths=[ClassificationLabel("car")]), + ] + + +@pytest.fixture(scope="module") +def test_case_versions(test_dataset: List[TestImage]) -> List[TestCase]: + name = with_test_prefix(f"{__file__}::test_case_versions fixture test case") + test_case = TestCase(name, description="test case description") + # load copy such that it is not modified by later edits + test_case_v0 = TestCase(test_case.name, version=test_case.version) + with test_case.edit() as editor: + editor.add(test_dataset[0]) + test_case_v1 = TestCase(test_case.name, version=test_case.version) + with test_case.edit() as editor: + editor.add(test_dataset[1]) + test_case_v2 = TestCase(test_case.name, version=test_case.version) + return [test_case_v0, test_case_v1, test_case_v2] + + +def test__init() -> None: + name = with_test_prefix(f"{__file__}::test__init test suite") + description = "A\n\tlong\ndescription including special characters! 🎉" + test_suite = TestSuite(name, description=description) + assert test_suite.name == name + assert test_suite.version == 0 + assert test_suite.description == description + assert test_suite.test_cases == [] + assert test_suite._workflow == WorkflowType.DETECTION + + test_suite2 = TestSuite(name) + assert test_suite == test_suite2 + + test_suite3 = TestSuite(name, description="different description should be ignored") + assert test_suite == test_suite3 + + +def test__init__reset(test_case: TestCase, test_case_versions: List[TestCase]) -> None: + name = with_test_prefix(f"{__file__}::test__init__reset test suite") + description = f"{name} (description)" + TestSuite(name, description=description, test_cases=[test_case, test_case_versions[0]]) + + new_test_cases = [test_case_versions[1]] + test_suite = TestSuite(name, test_cases=new_test_cases, reset=True) + assert test_suite.version == 2 + assert test_suite.description == description # not updated or cleared + assert test_suite.test_cases == new_test_cases + + +def test__init__with_version(test_case_versions: List[TestCase]) -> None: + name = with_test_prefix(f"{__file__}::test__init__with_version test suite") + description = "test suite description" + test_suite = TestSuite(name, description=description) + + test_suite0 = TestSuite(name, version=test_suite.version) + assert test_suite == test_suite0 + + with pytest.raises(NameConflictError): + TestSuite(name, version=123) + + with test_suite.edit() as editor: + new_description = "new description" + editor.description(new_description) + editor.add(test_case_versions[0]) + + assert test_suite.description == new_description + assert test_suite == TestSuite(name, version=test_suite.version) + assert test_suite == TestSuite(name) + assert test_suite.test_cases == [test_case_versions[0]] + + test_suite0_reloaded = TestSuite(name, version=test_suite0.version) + assert test_suite0.test_cases == test_suite0_reloaded.test_cases + assert test_suite0_reloaded.description == new_description + assert test_suite0_reloaded.test_cases == [] + + +def test__edit(test_case: TestCase) -> None: + name = with_test_prefix(f"{__file__}::test__edit test suite") + description = "test__edit test suite description" + test_suite = TestSuite(name, description=description) + with test_suite.edit() as editor: + editor.add(test_case) + assert test_suite.name == name + assert test_suite.version == 1 + assert test_suite.description == description + assert test_suite.test_cases == [test_case] + assert test_suite._workflow == WorkflowType.DETECTION + assert all(tc._workflow == WorkflowType.DETECTION for tc in test_suite.test_cases) + + test_case0 = TestCase(with_test_prefix(f"{__file__}::test__edit test suite test case")) + with test_suite.edit() as editor: + editor.add(test_case0) + assert test_suite.version == 2 + assert test_suite.test_cases == [test_case, test_case0] # note that ordering matters + + +def test__edit__no_op(test_case: TestCase) -> None: + test_suite = TestSuite(with_test_prefix(f"{__file__}::test__edit__no_op test suite")) + with test_suite.edit(): + ... + assert test_suite.version == 0 + + with test_suite.edit() as editor: + editor.add(test_case) + editor.remove(test_case) + assert test_suite.version == 0 + assert test_suite.test_cases == [] + + +def test__edit__idempotent(test_case: TestCase, test_case_versions: List[TestCase]) -> None: + test_cases = [test_case, test_case_versions[0]] + test_suite = TestSuite(with_test_prefix(f"{__file__}::test__edit__no_op test suite"), test_cases=test_cases) + assert test_suite.version == 1 + + # adding the same test cases in the same order doesn't edit the suite, no-op + with test_suite.edit() as editor: + for tc in test_cases: + editor.add(tc) + assert test_suite.version == 1 + assert test_suite.test_cases == test_cases + + +def test__edit__same_name_test_case(test_case_versions: List[TestCase]) -> None: + test_suite = TestSuite(with_test_prefix(f"{__file__}::test__edit__same_name_test_case test suite")) + with test_suite.edit() as editor: + editor.add(test_case_versions[2]) + + # a version is already in the test suite, 'add' should replace existing version + for test_case in test_case_versions: + with test_suite.edit() as editor: + editor.add(test_case) + assert test_suite.test_cases[0].version == test_case.version + + +def test__edit__add(test_case: TestCase, test_case_versions: List[TestCase]) -> None: + test_suite = TestSuite(with_test_prefix(f"{__file__}::test__edit__add test suite")) + with test_suite.edit() as editor: + editor.add(test_case) + editor.add(test_case_versions[0]) # same as add when the test case isn't already present + assert test_suite.version == 1 + assert test_suite.test_cases == [test_case, test_case_versions[0]] + previous_test_cases = test_suite.test_cases + + with test_suite.edit() as editor: + editor.add(test_case) # no-op + editor.add(test_case_versions[1]) # should replace the existing test_case_version + editor.add(test_case_versions[2]) # should replace the test_case_version added in the above line + assert test_suite.version == 2 + assert test_suite.test_cases == [test_case, test_case_versions[2]] + assert test_suite.test_cases != previous_test_cases + + +def test__edit__add_mismatch_workflow() -> None: + test_suite_name = with_test_prefix(f"{__file__}::test__edit__add_mismatch_workflow test suite") + test_suite = TestSuite(test_suite_name) + classification_test_case = ClassificationTestCase(f"{test_suite_name}::classification_test_case") + with test_suite.edit() as editor: + with pytest.raises(ValueError) as exc_info: + editor.add(classification_test_case) + exc_info_value = str(exc_info.value) + assert WorkflowType.CLASSIFICATION.value in exc_info_value + assert WorkflowType.DETECTION.value in exc_info_value + + +def test__edit__merge(test_case: TestCase, test_case_versions: List[TestCase]) -> None: + test_suite = TestSuite(with_test_prefix(f"{__file__}::test__edit__merge test suite")) + with test_suite.edit() as editor: + editor.add(test_case) + editor.merge(test_case_versions[0]) # same as add when the test case isn't already present + assert test_suite.version == 1 + assert test_suite.test_cases == [test_case, test_case_versions[0]] + previous_test_cases = test_suite.test_cases + + with test_suite.edit() as editor: + editor.merge(test_case) # no-op + editor.merge(test_case_versions[1]) # should replace the existing test_case_version + editor.merge(test_case_versions[2]) # should replace the test_case_version merged in the above line + assert test_suite.version == 2 + assert test_suite.test_cases == [test_case, test_case_versions[2]] + assert test_suite.test_cases != previous_test_cases + + +def test__edit__reset(test_case: TestCase, test_case_versions: List[TestCase]) -> None: + test_suite = TestSuite( + with_test_prefix(f"{__file__}::test__edit__reset test suite"), + test_cases=[ + test_case, + test_case_versions[0], + ], + ) + new_description = "new description" + + with test_suite.edit(reset=True) as editor: + editor.description(new_description) + editor.add(test_case_versions[1]) + assert test_suite.version == 2 + assert test_suite.description == new_description + assert test_suite.test_cases == [test_case_versions[1]] + + with test_suite.edit(reset=True) as editor: # no-op + editor.add(test_case_versions[1]) + assert test_suite.version == 2 + assert test_suite.description == new_description + assert test_suite.test_cases == [test_case_versions[1]] + + +def test__create() -> None: + test_suite_name = with_test_prefix(f"{__file__} test__create test suite") + description = "A\n\tlong\ndescription including special characters! 🎉" + test_suite = TestSuite.create(test_suite_name, description=description) + assert test_suite.name == test_suite_name + assert test_suite.version == 0 + assert test_suite.description == description + assert test_suite.test_cases == [] + assert test_suite._workflow == WorkflowType.DETECTION + + +def test__create__with_test_cases(test_case: TestCase, test_case_versions: List[TestCase]) -> None: + test_suite_name = with_test_prefix(f"{__file__} test__create__with_test_cases test suite") + description = "A\n\tlong\ndescription including special characters! 🎉" + test_cases = [test_case, test_case_versions[0]] + test_suite = TestSuite.create(test_suite_name, description=description, test_cases=test_cases) + assert test_suite.name == test_suite_name + assert test_suite.version == 1 + assert test_suite.description == description + assert test_suite.test_cases == test_cases + assert test_suite._workflow == WorkflowType.DETECTION + + +def test__load() -> None: + test_suite_name = with_test_prefix(f"{__file__} test__load test suite") + test_suite = TestSuite(test_suite_name) + loaded_test_suite = TestSuite.load(test_suite_name) + for key in ["name", "version", "description", "test_cases", "_id", "_workflow"]: + assert getattr(test_suite, key) == getattr(loaded_test_suite, key) + + +def test__load__with_version(test_case_versions: List[TestCase]) -> None: + test_suite_name = with_test_prefix(f"{__file__} test__load__version test suite") + test_suite = TestSuite(test_suite_name) + new_description = f"{__file__} test__load__version new description" + with test_suite.edit() as editor: + editor.description(new_description) + editor.add(test_case_versions[0]) + + loaded_test_suite_default = TestSuite.load(test_suite_name) + loaded_test_suite_v0 = TestSuite.load(test_suite_name, 0) + loaded_test_suite_v1 = TestSuite.load(test_suite_name, 1) + + assert loaded_test_suite_default == loaded_test_suite_v1 + + assert loaded_test_suite_default.version == 1 + assert loaded_test_suite_default.description == new_description + assert loaded_test_suite_default.test_cases == [test_case_versions[0]] + + assert loaded_test_suite_v0.version == 0 + assert loaded_test_suite_v0.description == new_description + assert loaded_test_suite_v0.test_cases == [] + + assert loaded_test_suite_v1.version == 1 + assert loaded_test_suite_v1.description == new_description + assert loaded_test_suite_v1.test_cases == [test_case_versions[0]] + + +def test__load__mismatch() -> None: + test_suite_name = with_test_prefix(f"{__file__} test__load__mismatch test suite") + ClassificationTestSuite(test_suite_name) + with pytest.raises(WorkflowMismatchError) as exc_info: + TestSuite.load(test_suite_name) + + exc_info_value = str(exc_info.value) + assert ClassificationTestSuite._workflow.value in exc_info_value + assert TestSuite._workflow.value in exc_info_value + + +def test__load__with_version_mismatch() -> None: + test_suite_name = with_test_prefix(f"{__file__} test__load__with_version_mismatch test suite") + TestSuite(test_suite_name) + mismatch_version = 42 + with pytest.raises(NotFoundError) as exc_info: + TestSuite.load(test_suite_name, mismatch_version) + + exc_info_value = str(exc_info.value) + assert f"(version {mismatch_version})" in exc_info_value diff --git a/tests/integration/helper.py b/tests/integration/helper.py index f489951e5..b817e565d 100644 --- a/tests/integration/helper.py +++ b/tests/integration/helper.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import uuid from typing import Any from typing import Iterable @@ -21,6 +22,10 @@ def fake_locator(index: int, directory: str = "default") -> str: return f"https://fake-locator/{TEST_PREFIX}/{directory}/{index}.png" +def fake_random_locator(directory: str = "default") -> str: + return f"https://fake-locator/{TEST_PREFIX}/{directory}/{uuid.uuid4()}.png" + + def with_test_prefix(value: str) -> str: return f"{TEST_PREFIX} {value}"