From 96efcfb8301ed938ce62d46a12b429b724e7cf0f Mon Sep 17 00:00:00 2001 From: Jarek Potiuk Date: Thu, 24 Aug 2023 12:07:34 +0200 Subject: [PATCH] Further improvements for provider verification (#33670) This one completes review and check of provider.yaml verifications. There was one more check - for duplicates - that did not work as expected. It was hiding errors detected. This PR fixes it and also adds a bit more diagnostics messagese on what is actually being checked to give a bit more clue if some check is not doing what it is supposed to be doing. Follow up after #33662 and #33640 --- airflow/providers/google/provider.yaml | 3 - .../providers/microsoft/azure/provider.yaml | 3 - .../run_provider_yaml_files_check.py | 257 ++++++++++++------ 3 files changed, 180 insertions(+), 83 deletions(-) diff --git a/airflow/providers/google/provider.yaml b/airflow/providers/google/provider.yaml index 29fd2c8073a76..94f9353eb0a5a 100644 --- a/airflow/providers/google/provider.yaml +++ b/airflow/providers/google/provider.yaml @@ -928,9 +928,6 @@ transfers: target-integration-name: Google Cloud Storage (GCS) how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/gdrive_to_gcs.rst python-module: airflow.providers.google.cloud.transfers.gdrive_to_gcs - - source-integration-name: Microsoft SQL Server (MSSQL) - target-integration-name: Google Cloud Storage (GCS) - python-module: airflow.providers.google.cloud.transfers.mssql_to_gcs - source-integration-name: Microsoft Azure FileShare target-integration-name: Google Cloud Storage (GCS) how-to-guide: /docs/apache-airflow-providers-google/operators/transfer/azure_fileshare_to_gcs.rst diff --git a/airflow/providers/microsoft/azure/provider.yaml b/airflow/providers/microsoft/azure/provider.yaml index 9719b03d8900f..a0ed5fbdf2599 100644 --- a/airflow/providers/microsoft/azure/provider.yaml +++ b/airflow/providers/microsoft/azure/provider.yaml @@ -245,9 +245,6 @@ transfers: target-integration-name: Microsoft Azure Blob Storage how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/local_to_wasb.rst python-module: airflow.providers.microsoft.azure.transfers.local_to_wasb - - source-integration-name: Microsoft Azure Blob Storage - target-integration-name: Google Cloud Storage (GCS) - python-module: airflow.providers.microsoft.azure.transfers.azure_blob_to_gcs - source-integration-name: SSH File Transfer Protocol (SFTP) target-integration-name: Microsoft Azure Blob Storage how-to-guide: /docs/apache-airflow-providers-microsoft-azure/transfer/sftp_to_wasb.rst diff --git a/scripts/in_container/run_provider_yaml_files_check.py b/scripts/in_container/run_provider_yaml_files_check.py index a300bb8be6bbd..956adc515f816 100755 --- a/scripts/in_container/run_provider_yaml_files_check.py +++ b/scripts/in_container/run_provider_yaml_files_check.py @@ -17,6 +17,7 @@ # under the License. from __future__ import annotations +import functools import importlib import inspect import itertools @@ -29,7 +30,7 @@ import warnings from collections import Counter from enum import Enum -from typing import Any, Iterable +from typing import Any, Callable, Iterable import jsonschema import yaml @@ -132,22 +133,6 @@ def get_all_integration_names(yaml_files) -> list[str]: return all_integrations -def check_integration_duplicates(yaml_files: dict[str, dict]): - """Integration names must be globally unique.""" - print("Checking integration duplicates") - all_integrations = get_all_integration_names(yaml_files) - - duplicates = [(k, v) for (k, v) in Counter(all_integrations).items() if v > 1] - - if duplicates: - print( - "Duplicate integration names found. Integration names must be globally unique. " - "Please delete duplicates." - ) - print(tabulate(duplicates, headers=["Integration name", "Number of occurrences"])) - sys.exit(3) - - def assert_sets_equal( set1: set[str], set_name_1: str, @@ -175,11 +160,11 @@ def assert_sets_equal( lines.append(f" Left set:{set_name_1}") lines.append(f" Right set:{set_name_2}") if difference1: - lines.append(" -- Items in the left set but not the right:") + lines.append(" Items in the left set but not the right:") for item in sorted(difference1): lines.append(f" {item!r}") if difference2 and not allow_extra_in_set2: - lines.append(" -- Items in the right set but not the left:") + lines.append(" Items in the right set but not the left:") for item in sorted(difference2): lines.append(f" {item!r}") @@ -194,7 +179,10 @@ class ObjectType(Enum): CLASS = "class" -def check_if_object_exist(object_name: str, resource_type: str, yaml_file_path: str, object_type: ObjectType): +def check_if_object_exist( + object_name: str, resource_type: str, yaml_file_path: str, object_type: ObjectType +) -> int: + num_errors = 0 try: if object_type == ObjectType.CLASS: module_name, class_name = object_name.rsplit(".", maxsplit=1) @@ -214,8 +202,9 @@ def check_if_object_exist(object_name: str, resource_type: str, yaml_file_path: f"[yellow]How to fix it[/]: Please remove it from provider.yaml and replace with " f"the new class." ) + num_errors += 1 if the_class and inspect.isclass(the_class): - return + return num_errors elif object_type == ObjectType.MODULE: with warnings.catch_warnings(record=True) as w: module = importlib.import_module(object_name) @@ -228,8 +217,9 @@ def check_if_object_exist(object_name: str, resource_type: str, yaml_file_path: f"with the new module. If you see warnings in classes - fix the classes so that " f"they are not raising Deprecation Warnings when module is imported." ) + num_errors += 1 if inspect.ismodule(module): - return + return num_errors else: raise RuntimeError(f"Wrong enum {object_type}???") except Exception as e: @@ -237,11 +227,14 @@ def check_if_object_exist(object_name: str, resource_type: str, yaml_file_path: f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not exist " f"or is not a {object_type.value}: {e}" ) + num_errors += 1 else: errors.append( f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not exist " f"or is not a {object_type.value}." ) + num_errors += 1 + return num_errors def check_if_objects_exist_and_belong_to_package( @@ -250,7 +243,8 @@ def check_if_objects_exist_and_belong_to_package( yaml_file_path: str, resource_type: str, object_type: ObjectType, -): +) -> int: + num_errors = 0 for object_name in object_names: if os.environ.get("VERBOSE"): console.print( @@ -262,7 +256,9 @@ def check_if_objects_exist_and_belong_to_package( f"The `{object_name}` object in {resource_type} list in {yaml_file_path} does not start" f" with the expected {provider_package}." ) - check_if_object_exist(object_name, resource_type, yaml_file_path, object_type) + num_errors += 1 + num_errors += check_if_object_exist(object_name, resource_type, yaml_file_path, object_type) + return num_errors def parse_module_data(provider_data, resource_type, yaml_file_path): @@ -279,8 +275,48 @@ def parse_module_data(provider_data, resource_type, yaml_file_path): return expected_modules, provider_package, resource_data -def check_correctness_of_list_of_sensors_operators_hook_trigger_modules(yaml_files: dict[str, dict]): - print(" -- Checking completeness of list of {sensors, hooks, operators, triggers}") +def run_check(title: str): + def inner(func: Callable[..., tuple[int, int]]): + @functools.wraps(func) + def wrapper(*args, **kwargs): + console.print(f"\n[magenta]Starting check:[/] {title}\n") + check_num, error_num = func(*args, **kwargs) + console.print( + f"\n[magenta]Finished check:[/] Found {error_num} " + f"error{'' if error_num == 1 else 's'} in {check_num} " + f"checked items\n" + ) + return check_num, error_num + + return wrapper + + return inner + + +@run_check("Checking integration duplicates") +def check_integration_duplicates(yaml_files: dict[str, dict]) -> tuple[int, int]: + """Integration names must be globally unique.""" + num_errors = 0 + all_integrations = get_all_integration_names(yaml_files) + num_integrations = len(all_integrations) + duplicates = [(k, v) for (k, v) in Counter(all_integrations).items() if v > 1] + + if duplicates: + console.print( + "Duplicate integration names found. Integration names must be globally unique. " + "Please delete duplicates." + ) + errors.append(tabulate(duplicates, headers=["Integration name", "Number of occurrences"])) + num_errors += 1 + return num_integrations, num_errors + + +@run_check("Checking completeness of list of {sensors, hooks, operators, triggers}") +def check_correctness_of_list_of_sensors_operators_hook_trigger_modules( + yaml_files: dict[str, dict] +) -> tuple[int, int]: + num_errors = 0 + num_modules = 0 for (yaml_file_path, provider_data), resource_type in itertools.product( yaml_files.items(), ["sensors", "operators", "hooks", "triggers"] ): @@ -289,8 +325,8 @@ def check_correctness_of_list_of_sensors_operators_hook_trigger_modules(yaml_fil ) expected_modules = {module for module in expected_modules if module not in DEPRECATED_MODULES} current_modules = {str(i) for r in resource_data for i in r.get("python-modules", [])} - - check_if_objects_exist_and_belong_to_package( + num_modules += len(current_modules) + num_errors += check_if_objects_exist_and_belong_to_package( current_modules, provider_package, yaml_file_path, resource_type, ObjectType.MODULE ) try: @@ -302,8 +338,9 @@ def check_correctness_of_list_of_sensors_operators_hook_trigger_modules(yaml_fil f"Found list of {resource_type} modules in provider package: {package_name}", set(current_modules), f"Currently configured list of {resource_type} modules in {yaml_file_path}", - extra_message="[yellow]If there are deprecated modules in the list, please add them to " - f"DEPRECATED_MODULES in {pathlib.Path(__file__).relative_to(ROOT_DIR)}[/]", + extra_message="[yellow]Additional check[/]: If there are deprecated modules in the list," + "please add them to DEPRECATED_MODULES in " + f"{pathlib.Path(__file__).relative_to(ROOT_DIR)}[/]", ) except AssertionError as ex: nested_error = textwrap.indent(str(ex), " ") @@ -311,36 +348,45 @@ def check_correctness_of_list_of_sensors_operators_hook_trigger_modules(yaml_fil f"Incorrect content of key '{resource_type}/python-modules' " f"in file: {yaml_file_path}\n{nested_error}" ) + num_errors += 1 + return num_modules, num_errors -def check_duplicates_in_integrations_names_of_hooks_sensors_operators(yaml_files: dict[str, dict]): - print("Checking for duplicates in list of {sensors, hooks, operators, triggers}") +@run_check("Checking for duplicates in list of {sensors, hooks, operators, triggers}") +def check_duplicates_in_integrations_names_of_hooks_sensors_operators( + yaml_files: dict[str, dict] +) -> tuple[int, int]: + num_errors = 0 + num_integrations = 0 for (yaml_file_path, provider_data), resource_type in itertools.product( yaml_files.items(), ["sensors", "operators", "hooks", "triggers"] ): resource_data = provider_data.get(resource_type, []) count_integrations = Counter(r.get("integration-name", "") for r in resource_data) + num_integrations += len(count_integrations) for integration, count in count_integrations.items(): if count > 1: errors.append( f"Duplicated content of '{resource_type}/integration-name/{integration}' " f"in file: {yaml_file_path}" ) + num_errors += 1 + return num_integrations, num_errors -def check_completeness_of_list_of_transfers(yaml_files: dict[str, dict]): - print("Checking completeness of list of transfers") +@run_check("Checking completeness of list of transfers") +def check_completeness_of_list_of_transfers(yaml_files: dict[str, dict]) -> tuple[int, int]: resource_type = "transfers" - - print(" -- Checking transfers modules") + num_errors = 0 + num_transfers = 0 for yaml_file_path, provider_data in yaml_files.items(): expected_modules, provider_package, resource_data = parse_module_data( provider_data, resource_type, yaml_file_path ) expected_modules = {module for module in expected_modules if module not in DEPRECATED_MODULES} current_modules = {r.get("python-module") for r in resource_data} - - check_if_objects_exist_and_belong_to_package( + num_transfers += len(current_modules) + num_errors += check_if_objects_exist_and_belong_to_package( current_modules, provider_package, yaml_file_path, resource_type, ObjectType.MODULE ) try: @@ -359,72 +405,91 @@ def check_completeness_of_list_of_transfers(yaml_files: dict[str, dict]): f"Incorrect content of key '{resource_type}/python-module' " f"in file: {yaml_file_path}\n{nested_error}" ) + num_errors += 1 + return num_transfers, num_errors -def check_hook_class_names(yaml_files: dict[str, dict]): - print("Checking connection classes belong to package, exist and are classes") +@run_check("Checking if hook classes specified by hook-class-name in connection type are importable") +def check_hook_class_name_entries_in_connection_types(yaml_files: dict[str, dict]) -> tuple[int, int]: resource_type = "connection-types" + num_errors = 0 + num_connection_types = 0 for yaml_file_path, provider_data in yaml_files.items(): provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".") connection_types = provider_data.get(resource_type) if connection_types: + num_connection_types += len(connection_types) hook_class_names = {connection_type["hook-class-name"] for connection_type in connection_types} - check_if_objects_exist_and_belong_to_package( + num_errors += check_if_objects_exist_and_belong_to_package( hook_class_names, provider_package, yaml_file_path, resource_type, ObjectType.CLASS ) + return num_connection_types, num_errors -def check_plugin_classes(yaml_files: dict[str, dict]): - print("Checking plugin classes belong to package, exist and are classes") +@run_check("Checking plugin classes belong to package are importable and belong to package") +def check_plugin_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: resource_type = "plugins" + num_errors = 0 + num_plugins = 0 for yaml_file_path, provider_data in yaml_files.items(): provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".") plugins = provider_data.get(resource_type) if plugins: - check_if_objects_exist_and_belong_to_package( + num_plugins += len(plugins) + num_errors += check_if_objects_exist_and_belong_to_package( {plugin["plugin-class"] for plugin in plugins}, provider_package, yaml_file_path, resource_type, ObjectType.CLASS, ) + return num_plugins, num_errors -def check_extra_link_classes(yaml_files: dict[str, dict]): - print("Checking extra-links belong to package, exist and are classes") +@run_check("Checking extra-links belong to package, exist and are classes") +def check_extra_link_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: resource_type = "extra-links" + num_errors = 0 + num_extra_links = 0 for yaml_file_path, provider_data in yaml_files.items(): provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".") extra_links = provider_data.get(resource_type) if extra_links: - check_if_objects_exist_and_belong_to_package( + num_extra_links += len(extra_links) + num_errors += check_if_objects_exist_and_belong_to_package( extra_links, provider_package, yaml_file_path, resource_type, ObjectType.CLASS ) + return num_extra_links, num_errors -def check_notification_classes(yaml_files: dict[str, dict]): - print("Checking notifications belong to package, exist and are classes") +@run_check("Checking notifications belong to package, exist and are classes") +def check_notification_classes(yaml_files: dict[str, dict]) -> tuple[int, int]: resource_type = "notifications" + num_errors = 0 + num_notifications = 0 for yaml_file_path, provider_data in yaml_files.items(): provider_package = pathlib.Path(yaml_file_path).parent.as_posix().replace("/", ".") notifications = provider_data.get(resource_type) if notifications: - check_if_objects_exist_and_belong_to_package( + num_notifications += len(notifications) + num_errors += check_if_objects_exist_and_belong_to_package( notifications, provider_package, yaml_file_path, resource_type, ObjectType.CLASS ) + return num_notifications, num_errors -def check_duplicates_in_list_of_transfers(yaml_files: dict[str, dict]): - print("Checking for duplicates in list of transfers") - errors = [] +@run_check("Checking for duplicates in list of transfers") +def check_duplicates_in_list_of_transfers(yaml_files: dict[str, dict]) -> tuple[int, int]: resource_type = "transfers" + num_errors = 0 + num_integrations = 0 for yaml_file_path, provider_data in yaml_files.items(): resource_data = provider_data.get(resource_type, []) - count_integrations = Counter( (r.get("source-integration-name", ""), r.get("target-integration-name", "")) for r in resource_data ) + num_integrations += len(count_integrations) for (source, target), count in count_integrations.items(): if count > 1: errors.append( @@ -433,12 +498,15 @@ def check_duplicates_in_list_of_transfers(yaml_files: dict[str, dict]): f" '{resource_type}/target-integration-name/{target}' " f"in file: {yaml_file_path}" ) + num_errors += 1 + return num_integrations, num_errors -def check_invalid_integration(yaml_files: dict[str, dict]): - print("Detect unregistered integrations") +@run_check("Detect unregistered integrations") +def check_invalid_integration(yaml_files: dict[str, dict]) -> tuple[int, int]: all_integration_names = set(get_all_integration_names(yaml_files)) - + num_errors = 0 + num_integrations = len(all_integration_names) for (yaml_file_path, provider_data), resource_type in itertools.product( yaml_files.items(), ["sensors", "operators", "hooks", "triggers"] ): @@ -450,6 +518,7 @@ def check_invalid_integration(yaml_files: dict[str, dict]): f"Incorrect content of key '{resource_type}/integration-name' in file: {yaml_file_path}. " f"Invalid values: {invalid_names}" ) + num_errors += 1 for (yaml_file_path, provider_data), key in itertools.product( yaml_files.items(), ["source-integration-name", "target-integration-name"] @@ -462,10 +531,14 @@ def check_invalid_integration(yaml_files: dict[str, dict]): f"Incorrect content of key 'transfers/{key}' in file: {yaml_file_path}. " f"Invalid values: {invalid_names}" ) + num_errors += 1 + return num_integrations, num_errors -def check_doc_files(yaml_files: dict[str, dict]): - print("Checking doc files") +@run_check("Checking doc files") +def check_doc_files(yaml_files: dict[str, dict]) -> tuple[int, int]: + num_docs = 0 + num_errors = 0 current_doc_urls: list[str] = [] current_logo_urls: list[str] = [] for provider in yaml_files.values(): @@ -483,8 +556,9 @@ def check_doc_files(yaml_files: dict[str, dict]): current_doc_urls.extend( op["how-to-guide"] for op in provider["transfers"] if "how-to-guide" in op ) - console.print("[yellow]Suspended providers:[/]") - console.print(suspended_providers) + if suspended_providers: + console.print("[yellow]Suspended providers:[/]") + console.print(suspended_providers) expected_doc_files = itertools.chain( DOCS_DIR.glob("apache-airflow-providers-*/operators/**/*.rst"), @@ -502,8 +576,10 @@ def check_doc_files(yaml_files: dict[str, dict]): for f in DOCS_DIR.glob("apache-airflow-providers-*/operators.rst") if not f.relative_to(DOCS_DIR).as_posix().startswith(tuple(suspended_providers)) } - console.print("[yellow]Suspended logos:[/]") - console.print(suspended_logos) + if suspended_logos: + console.print("[yellow]Suspended logos:[/]") + console.print(suspended_logos) + console.print() expected_logo_urls = { f"/{f.relative_to(DOCS_DIR).as_posix()}" for f in DOCS_DIR.glob("integration-logos/**/*") @@ -511,46 +587,61 @@ def check_doc_files(yaml_files: dict[str, dict]): } try: - print(" -- Checking document urls") + console.print("Checking document urls") assert_sets_equal( set(expected_doc_urls), "Document urls found in airflow/docs", set(current_doc_urls), "Document urls configured in provider.yaml files", ) - print(" -- Checking logo urls") + console.print(f"Checked {len(current_doc_urls)} doc urls") + console.print() + console.print("Checking logo urls") assert_sets_equal( set(expected_logo_urls), "Logo urls found in airflow/docs/integration-logos", set(current_logo_urls), "Logo urls configured in provider.yaml files", ) + console.print(f"Checked {len(current_logo_urls)} logo urls") + console.print() except AssertionError as ex: - print(ex) - sys.exit(1) + nested_error = textwrap.indent(str(ex), " ") + errors.append( + f"Discrepancies between documentation/logos for providers and provider.yaml files " + f"[yellow]How to fix it[/]: Please synchronize the docs/logs.\n{nested_error}" + ) + num_errors += 1 + return num_docs, num_errors -def check_unique_provider_name(yaml_files: dict[str, dict]): +@run_check("Checking if provider names are unique") +def check_unique_provider_name(yaml_files: dict[str, dict]) -> tuple[int, int]: + num_errors = 0 name_counter = Counter(d["name"] for d in yaml_files.values()) duplicates = {k for k, v in name_counter.items() if v > 1} if duplicates: errors.append(f"Provider name must be unique. Duplicates: {duplicates}") + num_errors += 1 + return len(name_counter.items()), num_errors +@run_check(f"Checking providers are mentioned in {PROVIDER_ISSUE_TEMPLATE_PATH}") def check_providers_are_mentioned_in_issue_template(yaml_files: dict[str, dict]): - print("Checking providers are mentioned in issue template") + num_errors = 0 + num_providers = 0 prefix_len = len("apache-airflow-providers-") short_provider_names = [d["package-name"][prefix_len:] for d in yaml_files.values()] # exclude deprecated provider that shouldn't be in issue template deprecated_providers: list[str] = [] for item in deprecated_providers: short_provider_names.remove(item) + num_providers += len(short_provider_names) jsonpath_expr = parse('$.body[?(@.attributes.label == "Apache Airflow Provider(s)")]..options[*]') with PROVIDER_ISSUE_TEMPLATE_PATH.open() as issue_file: issue_template = yaml.safe_load(issue_file) all_mentioned_providers = [match.value for match in jsonpath_expr.find(issue_template)] try: - print(f" -- Checking providers are mentioned in {PROVIDER_ISSUE_TEMPLATE_PATH}") # in case of suspended providers, we still want to have them in the issue template assert_sets_equal( set(short_provider_names), @@ -560,21 +651,33 @@ def check_providers_are_mentioned_in_issue_template(yaml_files: dict[str, dict]) allow_extra_in_set2=True, ) except AssertionError as ex: - print(ex) - sys.exit(1) + nested_error = textwrap.indent(str(ex), " ") + errors.append( + f"Discrepancies between providers available in `airflow/providers` and providers " + f"in {PROVIDER_ISSUE_TEMPLATE_PATH}.\n" + f"[yellow]How to fix it[/]: Please synchronize the list.\n{nested_error}" + ) + num_errors += 1 + return num_providers, num_errors +@run_check("Checking providers have all documentation files") def check_providers_have_all_documentation_files(yaml_files: dict[str, dict]): + num_errors = 0 + num_providers = 0 expected_files = ["commits.rst", "index.rst", "installing-providers-from-sources.rst"] for package_info in yaml_files.values(): + num_providers += 1 package_name = package_info["package-name"] provider_dir = DOCS_DIR.joinpath(package_name) for file in expected_files: if not provider_dir.joinpath(file).is_file(): errors.append( f"The provider {package_name} misses `{file}` in documentation. " - f"Please add the file to {provider_dir}" + f"[yellow]How to fix it[/]: Please add the file to {provider_dir}" ) + num_errors += 1 + return num_providers, num_errors if __name__ == "__main__": @@ -592,12 +695,11 @@ def check_providers_have_all_documentation_files(yaml_files: dict[str, dict]): all_files_loaded = len(all_provider_files) == len(paths) check_integration_duplicates(all_parsed_yaml_files) - + check_duplicates_in_list_of_transfers(all_parsed_yaml_files) check_duplicates_in_integrations_names_of_hooks_sensors_operators(all_parsed_yaml_files) check_completeness_of_list_of_transfers(all_parsed_yaml_files) - check_duplicates_in_list_of_transfers(all_parsed_yaml_files) - check_hook_class_names(all_parsed_yaml_files) + check_hook_class_name_entries_in_connection_types(all_parsed_yaml_files) check_plugin_classes(all_parsed_yaml_files) check_extra_link_classes(all_parsed_yaml_files) check_correctness_of_list_of_sensors_operators_hook_trigger_modules(all_parsed_yaml_files) @@ -612,7 +714,8 @@ def check_providers_have_all_documentation_files(yaml_files: dict[str, dict]): check_providers_are_mentioned_in_issue_template(all_parsed_yaml_files) if errors: - console.print(f"[red]Found {len(errors)} errors in providers[/]") + error_num = len(errors) + console.print(f"[red]Found {error_num} error{'' if error_num == 1 else 's'} in providers[/]") for error in errors: console.print(f"[red]Error:[/] {error}") sys.exit(1)