From bdb84d0b35ad7d7f4ccc6d4b52c029cb59b52682 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 3 Dec 2021 18:08:26 +0100 Subject: [PATCH 1/6] debug leaking dynamodb tables Signed-off-by: pyalex --- .../feast/infra/online_stores/dynamodb.py | 9 +++++++++ .../integration/e2e/test_universal_e2e.py | 3 +++ .../registration/test_universal_types.py | 20 +++++++++++++++---- 3 files changed, 28 insertions(+), 4 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 7ff6702dea..2634ed1f56 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -11,6 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. +import logging from datetime import datetime from typing import Any, Callable, Dict, List, Optional, Sequence, Tuple, Union @@ -34,6 +35,9 @@ raise FeastExtrasDependencyImportError("aws", str(e)) +logger = logging.getLogger(__name__) + + class DynamoDBOnlineStoreConfig(FeastConfigBaseModel): """Online store config for DynamoDB store""" @@ -185,3 +189,8 @@ def _delete_tables_idempotent( # Otherwise, re-raise the exception if ce.response["Error"]["Code"] != "ResourceNotFoundException": raise + else: + logger.warning( + f"Trying to delete table that doesn't exist:" + f" {config.project}.{table_instance.name}" + ) diff --git a/sdk/python/tests/integration/e2e/test_universal_e2e.py b/sdk/python/tests/integration/e2e/test_universal_e2e.py index fbbdd14f23..d299a34685 100644 --- a/sdk/python/tests/integration/e2e/test_universal_e2e.py +++ b/sdk/python/tests/integration/e2e/test_universal_e2e.py @@ -17,6 +17,9 @@ def test_e2e_consistency(environment, e2e_data_sources, infer_features): fs = environment.feature_store df, data_source = e2e_data_sources + + fs.config.project = fs.config.project + str(infer_features) + fv = driver_feature_view( name=f"test_consistency_{'with_inference' if infer_features else ''}", data_source=data_source, diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 3716d8f870..0bc9828a55 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -106,6 +106,7 @@ def get_fixtures(request): field_mapping={"ts_1": "ts"}, ) fv = create_feature_view( + request.node.name, config.feature_dtype, config.feature_is_list, config.has_empty_list, @@ -151,7 +152,11 @@ def test_feature_get_historical_features_types_match(offline_types_test_fixtures environment, config, data_source, fv = offline_types_test_fixtures fs = environment.feature_store fv = create_feature_view( - config.feature_dtype, config.feature_is_list, config.has_empty_list, data_source + "get_historical_features_types_match", + config.feature_dtype, + config.feature_is_list, + config.has_empty_list, + data_source, ) entity = driver() fs.apply([fv, entity]) @@ -197,7 +202,11 @@ def test_feature_get_historical_features_types_match(offline_types_test_fixtures def test_feature_get_online_features_types_match(online_types_test_fixtures): environment, config, data_source, fv = online_types_test_fixtures fv = create_feature_view( - config.feature_dtype, config.feature_is_list, config.has_empty_list, data_source + "get_online_features_types_match", + config.feature_dtype, + config.feature_is_list, + config.has_empty_list, + data_source, ) fs = environment.feature_store features = [fv.name + ":value"] @@ -230,7 +239,9 @@ def test_feature_get_online_features_types_match(online_types_test_fixtures): assert isinstance(feature, expected_dtype) -def create_feature_view(feature_dtype, feature_is_list, has_empty_list, data_source): +def create_feature_view( + name, feature_dtype, feature_is_list, has_empty_list, data_source +): if feature_is_list is True: if feature_dtype == "int32": value_type = ValueType.INT32_LIST @@ -249,7 +260,8 @@ def create_feature_view(feature_dtype, feature_is_list, has_empty_list, data_sou value_type = ValueType.FLOAT elif feature_dtype == "bool": value_type = ValueType.BOOL - return driver_feature_view(data_source, value_type=value_type,) + + return driver_feature_view(data_source, name=name, value_type=value_type,) def assert_expected_historical_feature_types( From 585cf5be1758f51fe88e906e2a00a5fb09134427 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 3 Dec 2021 20:21:31 +0100 Subject: [PATCH 2/6] more logs Signed-off-by: pyalex --- sdk/python/feast/infra/online_stores/dynamodb.py | 1 + 1 file changed, 1 insertion(+) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index 2634ed1f56..a5903989b1 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -183,6 +183,7 @@ def _delete_tables_idempotent( f"{config.project}.{table_instance.name}" ) table.delete() + logger.info(f"Dynamo table {config.project}.{table_instance.name} was deleted") except ClientError as ce: # If the table deletion fails with ResourceNotFoundException, # it means the table has already been deleted. From b2f6e7271de5ac1dbf5ba95246e6542295130ed7 Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 3 Dec 2021 20:23:25 +0100 Subject: [PATCH 3/6] no project update Signed-off-by: pyalex --- sdk/python/tests/integration/e2e/test_universal_e2e.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sdk/python/tests/integration/e2e/test_universal_e2e.py b/sdk/python/tests/integration/e2e/test_universal_e2e.py index d299a34685..464722f507 100644 --- a/sdk/python/tests/integration/e2e/test_universal_e2e.py +++ b/sdk/python/tests/integration/e2e/test_universal_e2e.py @@ -18,7 +18,7 @@ def test_e2e_consistency(environment, e2e_data_sources, infer_features): fs = environment.feature_store df, data_source = e2e_data_sources - fs.config.project = fs.config.project + str(infer_features) + # fs.config.project = fs.config.project + str(infer_features) fv = driver_feature_view( name=f"test_consistency_{'with_inference' if infer_features else ''}", From cb2df66fccb83d99c4d48da65f530caf90ed462e Mon Sep 17 00:00:00 2001 From: pyalex Date: Fri, 3 Dec 2021 20:32:46 +0100 Subject: [PATCH 4/6] emulate redshift connection error Signed-off-by: pyalex --- sdk/python/feast/infra/online_stores/dynamodb.py | 4 +++- sdk/python/feast/infra/utils/aws_utils.py | 6 +++++- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/sdk/python/feast/infra/online_stores/dynamodb.py b/sdk/python/feast/infra/online_stores/dynamodb.py index a5903989b1..858103207a 100644 --- a/sdk/python/feast/infra/online_stores/dynamodb.py +++ b/sdk/python/feast/infra/online_stores/dynamodb.py @@ -183,7 +183,9 @@ def _delete_tables_idempotent( f"{config.project}.{table_instance.name}" ) table.delete() - logger.info(f"Dynamo table {config.project}.{table_instance.name} was deleted") + logger.info( + f"Dynamo table {config.project}.{table_instance.name} was deleted" + ) except ClientError as ce: # If the table deletion fails with ResourceNotFoundException, # it means the table has already been deleted. diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 6211c75e37..4a4c9e0579 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -1,5 +1,6 @@ import contextlib import os +import random import tempfile import uuid from typing import Any, Dict, Iterator, Optional, Tuple @@ -59,7 +60,7 @@ def get_bucket_and_key(s3_path: str) -> Tuple[str, str]: @retry( wait=wait_exponential(multiplier=1, max=4), retry=retry_if_exception_type(ConnectionClosedError), - stop=stop_after_attempt(5), + stop=stop_after_attempt(2), reraise=True, ) def execute_redshift_statement_async( @@ -79,6 +80,9 @@ def execute_redshift_statement_async( Returns: JSON response """ + if random.random() < 0.5: + raise ConnectionClosedError() + try: return redshift_data_client.execute_statement( ClusterIdentifier=cluster_id, Database=database, DbUser=user, Sql=query, From 9f1d81b916a65d99e02f569caeed6308596455ee Mon Sep 17 00:00:00 2001 From: pyalex Date: Mon, 6 Dec 2021 14:04:16 +0100 Subject: [PATCH 5/6] more logs Signed-off-by: pyalex --- sdk/python/feast/infra/utils/aws_utils.py | 5 +---- .../tests/integration/e2e/test_universal_e2e.py | 2 -- .../integration/registration/test_universal_types.py | 11 +++++++++-- 3 files changed, 10 insertions(+), 8 deletions(-) diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 4a4c9e0579..13cc93880a 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -1,6 +1,5 @@ import contextlib import os -import random import tempfile import uuid from typing import Any, Dict, Iterator, Optional, Tuple @@ -60,7 +59,7 @@ def get_bucket_and_key(s3_path: str) -> Tuple[str, str]: @retry( wait=wait_exponential(multiplier=1, max=4), retry=retry_if_exception_type(ConnectionClosedError), - stop=stop_after_attempt(2), + stop=stop_after_attempt(5), reraise=True, ) def execute_redshift_statement_async( @@ -80,8 +79,6 @@ def execute_redshift_statement_async( Returns: JSON response """ - if random.random() < 0.5: - raise ConnectionClosedError() try: return redshift_data_client.execute_statement( diff --git a/sdk/python/tests/integration/e2e/test_universal_e2e.py b/sdk/python/tests/integration/e2e/test_universal_e2e.py index 464722f507..84140b3797 100644 --- a/sdk/python/tests/integration/e2e/test_universal_e2e.py +++ b/sdk/python/tests/integration/e2e/test_universal_e2e.py @@ -18,8 +18,6 @@ def test_e2e_consistency(environment, e2e_data_sources, infer_features): fs = environment.feature_store df, data_source = e2e_data_sources - # fs.config.project = fs.config.project + str(infer_features) - fv = driver_feature_view( name=f"test_consistency_{'with_inference' if infer_features else ''}", data_source=data_source, diff --git a/sdk/python/tests/integration/registration/test_universal_types.py b/sdk/python/tests/integration/registration/test_universal_types.py index 0bc9828a55..c007d56c35 100644 --- a/sdk/python/tests/integration/registration/test_universal_types.py +++ b/sdk/python/tests/integration/registration/test_universal_types.py @@ -1,3 +1,4 @@ +import logging from dataclasses import dataclass from datetime import datetime, timedelta from typing import List @@ -18,6 +19,8 @@ from tests.integration.feature_repos.universal.entities import driver from tests.integration.feature_repos.universal.feature_views import driver_feature_view +logger = logging.getLogger(__name__) + def populate_test_configs(offline: bool): entity_type_feature_dtypes = [ @@ -106,7 +109,7 @@ def get_fixtures(request): field_mapping={"ts_1": "ts"}, ) fv = create_feature_view( - request.node.name, + request.fixturename, config.feature_dtype, config.feature_is_list, config.has_empty_list, @@ -114,7 +117,11 @@ def get_fixtures(request): ) def cleanup(): - type_test_environment.data_source_creator.teardown() + try: + type_test_environment.data_source_creator.teardown() + except Exception: # noqa + logger.exception("DataSourceCreator teardown has failed") + type_test_environment.feature_store.teardown() request.addfinalizer(cleanup) From 0b9b0fc411c14946a8a0cf08c3a1ba311bee072e Mon Sep 17 00:00:00 2001 From: pyalex Date: Mon, 6 Dec 2021 14:42:35 +0100 Subject: [PATCH 6/6] test step cant be cancelled Signed-off-by: pyalex --- .github/workflows/pr_integration_tests.yml | 1 + sdk/python/feast/infra/utils/aws_utils.py | 1 - sdk/python/tests/integration/e2e/test_universal_e2e.py | 1 - 3 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/pr_integration_tests.yml b/.github/workflows/pr_integration_tests.yml index 6b7511a529..ebd7a54173 100644 --- a/.github/workflows/pr_integration_tests.yml +++ b/.github/workflows/pr_integration_tests.yml @@ -128,6 +128,7 @@ jobs: - name: Install dependencies run: make install-python-ci-dependencies - name: Test python + if: ${{ always() }} # this will guarantee that step won't be canceled and resources won't leak env: FEAST_SERVER_DOCKER_IMAGE_TAG: ${{ needs.build-docker-image.outputs.DOCKER_IMAGE_TAG }} FEAST_USAGE: "False" diff --git a/sdk/python/feast/infra/utils/aws_utils.py b/sdk/python/feast/infra/utils/aws_utils.py index 13cc93880a..6211c75e37 100644 --- a/sdk/python/feast/infra/utils/aws_utils.py +++ b/sdk/python/feast/infra/utils/aws_utils.py @@ -79,7 +79,6 @@ def execute_redshift_statement_async( Returns: JSON response """ - try: return redshift_data_client.execute_statement( ClusterIdentifier=cluster_id, Database=database, DbUser=user, Sql=query, diff --git a/sdk/python/tests/integration/e2e/test_universal_e2e.py b/sdk/python/tests/integration/e2e/test_universal_e2e.py index 84140b3797..fbbdd14f23 100644 --- a/sdk/python/tests/integration/e2e/test_universal_e2e.py +++ b/sdk/python/tests/integration/e2e/test_universal_e2e.py @@ -17,7 +17,6 @@ def test_e2e_consistency(environment, e2e_data_sources, infer_features): fs = environment.feature_store df, data_source = e2e_data_sources - fv = driver_feature_view( name=f"test_consistency_{'with_inference' if infer_features else ''}", data_source=data_source,