diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py index 8ca4cad470f67e..6d26cf4498bdf9 100644 --- a/metadata-ingestion-modules/airflow-plugin/setup.py +++ b/metadata-ingestion-modules/airflow-plugin/setup.py @@ -1,4 +1,5 @@ import os +import pathlib import sys from typing import Dict, Set @@ -14,14 +15,11 @@ def get_long_description(): root = os.path.dirname(__file__) - with open(os.path.join(root, "README.md")) as f: - description = f.read() - - return description + return pathlib.Path(os.path.join(root, "README.md")).read_text() base_requirements = { - # Compatability. + # Compatibility. "dataclasses>=0.6; python_version < '3.7'", "typing_extensions>=3.10.0.2", "mypy_extensions>=0.4.3", diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py index c893ff61cb9ec3..f7b384c1d32390 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py @@ -1,3 +1,4 @@ +import contextlib import traceback from typing import Any, Iterable @@ -215,7 +216,7 @@ def datahub_on_success_callback(context, *args, **kwargs): for inlet in inlets: datajob.inlets.append(inlet.urn) - # We have to use _oulets because outlets is empty + # We have to use _outlets because outlets is empty for outlet in task._outlets: datajob.outlets.append(outlet.urn) @@ -389,13 +390,10 @@ def _patch_policy(settings): def _patch_datahub_policy(): - try: + with contextlib.suppress(ImportError): import airflow_local_settings _patch_policy(airflow_local_settings) - except ImportError: - pass - from airflow.models.dagbag import settings _patch_policy(settings) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/integration_test_dummy.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/integration_test_dummy.py index f4f53619168f89..10cf3ad0a608ae 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/integration_test_dummy.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/integration_test_dummy.py @@ -1,2 +1,2 @@ def test_dummy(): - assert True + pass diff --git a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_dummy.py b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_dummy.py index f4f53619168f89..10cf3ad0a608ae 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_dummy.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_dummy.py @@ -1,2 +1,2 @@ def test_dummy(): - assert True + pass diff --git a/metadata-ingestion/examples/library/data_quality_mcpw_rest.py b/metadata-ingestion/examples/library/data_quality_mcpw_rest.py index 7672d634f58468..077ca550e880eb 100644 --- a/metadata-ingestion/examples/library/data_quality_mcpw_rest.py +++ b/metadata-ingestion/examples/library/data_quality_mcpw_rest.py @@ -47,7 +47,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None: aspect=assertionResult, ) - # Emit BatchAssertion Result! (timseries aspect) + # Emit BatchAssertion Result! (timeseries aspect) emitter.emit_mcp(dataset_assertionRunEvent_mcp) diff --git a/metadata-ingestion/examples/library/dataset_add_column_tag.py b/metadata-ingestion/examples/library/dataset_add_column_tag.py index f5243ce28a5f01..8a15d33ff78779 100644 --- a/metadata-ingestion/examples/library/dataset_add_column_tag.py +++ b/metadata-ingestion/examples/library/dataset_add_column_tag.py @@ -23,18 +23,15 @@ def get_simple_field_path_from_v2_field_path(field_path: str) -> str: """A helper function to extract simple . path notation from the v2 field path""" - if field_path.startswith("[version=2.0]"): - # this is a v2 field path - tokens = [ - t - for t in field_path.split(".") - if not (t.startswith("[") or t.endswith("]")) - ] - path = ".".join(tokens) - return path - else: + if not field_path.startswith("[version=2.0]"): # not a v2, we assume this is a simple path return field_path + # this is a v2 field path + tokens = [ + t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]")) + ] + + return ".".join(tokens) # Inputs -> the column, dataset and the tag to set diff --git a/metadata-ingestion/examples/library/dataset_add_column_term.py b/metadata-ingestion/examples/library/dataset_add_column_term.py index ff1cad48a9f0c0..d656b5bd4502e7 100644 --- a/metadata-ingestion/examples/library/dataset_add_column_term.py +++ b/metadata-ingestion/examples/library/dataset_add_column_term.py @@ -23,18 +23,15 @@ def get_simple_field_path_from_v2_field_path(field_path: str) -> str: """A helper function to extract simple . path notation from the v2 field path""" - if field_path.startswith("[version=2.0]"): - # this is a v2 field path - tokens = [ - t - for t in field_path.split(".") - if not (t.startswith("[") or t.endswith("]")) - ] - path = ".".join(tokens) - return path - else: + if not field_path.startswith("[version=2.0]"): # not a v2, we assume this is a simple path return field_path + # this is a v2 field path + tokens = [ + t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]")) + ] + + return ".".join(tokens) # Inputs -> the column, dataset and the term to set diff --git a/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py b/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py index 11f73d36cb29d9..d1c934cba40409 100644 --- a/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py +++ b/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py @@ -10,13 +10,11 @@ ) from datahub.metadata.schema_classes import ChangeTypeClass -# Construct upstream tables. -upstream_tables: List[UpstreamClass] = [] upstream_table_1 = UpstreamClass( dataset=builder.make_dataset_urn("bigquery", "upstream_table_1", "PROD"), type=DatasetLineageTypeClass.TRANSFORMED, ) -upstream_tables.append(upstream_table_1) +upstream_tables: List[UpstreamClass] = [upstream_table_1] upstream_table_2 = UpstreamClass( dataset=builder.make_dataset_urn("bigquery", "upstream_table_2", "PROD"), type=DatasetLineageTypeClass.TRANSFORMED, diff --git a/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_simple.py b/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_simple.py index 7212282156d8b9..1871a8af09e50c 100644 --- a/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_simple.py +++ b/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_simple.py @@ -1,5 +1,5 @@ import uuid -from datetime import datetime +from datetime import datetime, timezone from datahub.api.entities.datajob import DataFlow, DataJob from datahub.api.entities.dataprocess.dataprocess_instance import ( @@ -36,40 +36,61 @@ jobFlowRun = DataProcessInstance.from_dataflow( dataflow=jobFlow, id=f"{jobFlow.id}-{uuid.uuid4()}" ) -jobFlowRun.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000)) +jobFlowRun.emit_process_start( + emitter, int(datetime.now(timezone.utc).timestamp() * 1000) +) + jobRun = DataProcessInstance.from_datajob( datajob=dataJob, id=f"{jobFlow.id}-{uuid.uuid4()}" ) -jobRun.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000)) +jobRun.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000)) + jobRun.emit_process_end( - emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS + emitter, + int(datetime.now(timezone.utc).timestamp() * 1000), + result=InstanceRunResult.SUCCESS, ) + job2Run = DataProcessInstance.from_datajob( datajob=dataJob2, id=f"{jobFlow.id}-{uuid.uuid4()}" ) -job2Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000)) +job2Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000)) + job2Run.emit_process_end( - emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS + emitter, + int(datetime.now(timezone.utc).timestamp() * 1000), + result=InstanceRunResult.SUCCESS, ) + job3Run = DataProcessInstance.from_datajob( datajob=dataJob3, id=f"{jobFlow.id}-{uuid.uuid4()}" ) -job3Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000)) +job3Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000)) + job3Run.emit_process_end( - emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS + emitter, + int(datetime.now(timezone.utc).timestamp() * 1000), + result=InstanceRunResult.SUCCESS, ) + job4Run = DataProcessInstance.from_datajob( datajob=dataJob4, id=f"{jobFlow.id}-{uuid.uuid4()}" ) -job4Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000)) +job4Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000)) + job4Run.emit_process_end( - emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS + emitter, + int(datetime.now(timezone.utc).timestamp() * 1000), + result=InstanceRunResult.SUCCESS, ) + jobFlowRun.emit_process_end( - emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS + emitter, + int(datetime.now(timezone.utc).timestamp() * 1000), + result=InstanceRunResult.SUCCESS, ) diff --git a/metadata-ingestion/examples/transforms/custom_transform_example.py b/metadata-ingestion/examples/transforms/custom_transform_example.py index 85663d971092b5..57560e75cf7e92 100644 --- a/metadata-ingestion/examples/transforms/custom_transform_example.py +++ b/metadata-ingestion/examples/transforms/custom_transform_example.py @@ -61,13 +61,10 @@ def transform_aspect( # type: ignore assert aspect is None or isinstance(aspect, OwnershipClass) if owners_to_add: - ownership = ( - aspect - if aspect - else OwnershipClass( - owners=[], - ) + ownership = aspect or OwnershipClass( + owners=[], ) + ownership.owners.extend(owners_to_add) return ownership diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py index 05d8fb1c5804c6..e5758d159bdee5 100644 --- a/metadata-ingestion/scripts/avro_codegen.py +++ b/metadata-ingestion/scripts/avro_codegen.py @@ -10,11 +10,8 @@ def load_schema_file(schema_file: str) -> str: - with open(schema_file) as f: - raw_schema_text = f.read() - - redo_spaces = json.dumps(json.loads(raw_schema_text), indent=2) - return redo_spaces + raw_schema_text = Path(schema_file).read_text() + return json.dumps(json.loads(raw_schema_text), indent=2) def merge_schemas(schemas: List[str]) -> str: diff --git a/metadata-ingestion/scripts/docgen.py b/metadata-ingestion/scripts/docgen.py index 03434d291d3024..fa59cd20dda1c0 100644 --- a/metadata-ingestion/scripts/docgen.py +++ b/metadata-ingestion/scripts/docgen.py @@ -605,9 +605,10 @@ def generate( os.makedirs(config_dir, exist_ok=True) with open(f"{config_dir}/{plugin_name}_config.json", "w") as f: f.write(source_config_class.schema_json(indent=2)) - - create_or_update(source_documentation, - [platform_id, "plugins", plugin_name, "config_schema"], + + create_or_update( + source_documentation, + [platform_id, "plugins", plugin_name, "config_schema"], source_config_class.schema_json(indent=2) or "", ) @@ -649,7 +650,9 @@ def generate( with open(platform_doc_file, "w") as f: if "name" in platform_docs: - f.write(f"import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n") + f.write( + f"import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n" + ) f.write(f"# {platform_docs['name']}\n") if len(platform_docs["plugins"].keys()) > 1: # More than one plugin used to provide integration with this platform @@ -722,8 +725,10 @@ def generate( f.write("\n```\n") if "config" in plugin_docs: f.write("\n### Config Details\n") - f.write(""" - \n\n""") + f.write( + """ + \n\n""" + ) f.write( "Note that a `.` is used to denote nested fields in the YAML recipe.\n\n" ) @@ -733,7 +738,8 @@ def generate( for doc in plugin_docs["config"]: f.write(doc) f.write("\n\n\n") - f.write(f""" + f.write( + f""" The [JSONSchema](https://json-schema.org/) for this configuration is inlined below.\n\n @@ -741,7 +747,8 @@ def generate( {plugin_docs['config_schema']} ```\n\n -\n\n""") +\n\n""" + ) # insert custom plugin docs after config details f.write(plugin_docs.get("custom_docs", "")) if "classname" in plugin_docs: diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py index c0378d554d5d31..588e66f19a0ef1 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py @@ -142,7 +142,7 @@ def emit( """ Emit the DataFlow entity to Datahub - :param emitter: Datahub Emitter to emit the proccess event + :param emitter: Datahub Emitter to emit the process event :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ for mcp in self.generate_mcp(): diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py index 329eca7d9cd44b..ec86ad80226312 100644 --- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py +++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py @@ -52,7 +52,7 @@ class DataJob: properties Dict[str, str]: Custom properties to set for the DataProcessInstance url (Optional[str]): Url which points to the DataJob at the orchestrator inlets (List[str]): List of urns the DataProcessInstance consumes - outlest (List[str]): List of urns the DataProcessInstance produces + outlets (List[str]): List of urns the DataProcessInstance produces input_datajob_urns: List[DataJobUrn] = field(default_factory=list) """ @@ -179,7 +179,7 @@ def emit( """ Emit the DataJob entity to Datahub - :param emitter: Datahub Emitter to emit the proccess event + :param emitter: Datahub Emitter to emit the process event :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used :rtype: None """ diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py index 859e5700a51c3b..9b107d701ab02d 100644 --- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py +++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py @@ -55,7 +55,7 @@ class DataProcessInstance: template_urn (Optional[Union[DataJobUrn, DataFlowUrn]]): The parent DataJob or DataFlow which was instantiated if applicable parent_instance (Optional[DataProcessInstanceUrn]): The parent execution's urn if applicable properties Dict[str, str]: Custom properties to set for the DataProcessInstance - url (Optional[str]): Url which points to the exection at the orchestrator + url (Optional[str]): Url which points to the execution at the orchestrator inlets (List[str]): List of entities the DataProcessInstance consumes outlets (List[str]): List of entities the DataProcessInstance produces """ @@ -118,10 +118,10 @@ def emit_process_start( """ :rtype: None - :param emitter: Datahub Emitter to emit the proccess event + :param emitter: Datahub Emitter to emit the process event :param start_timestamp_millis: (int) the execution start time in milliseconds :param attempt: the number of attempt of the execution with the same execution id - :param emit_template: (bool) If it is set the template of the execution (datajob, datflow) will be emitted as well. + :param emit_template: (bool) If it is set the template of the execution (datajob, dataflow) will be emitted as well. :param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used """ if emit_template and self.template_urn is not None: @@ -312,8 +312,8 @@ def from_datajob( :param datajob: (DataJob) the datajob from generate the DataProcessInstance :param id: (str) the id for the DataProcessInstance - :param clone_inlets: (bool) wheather to clone datajob's inlets - :param clone_outlets: (bool) wheather to clone datajob's outlets + :param clone_inlets: (bool) whether to clone datajob's inlets + :param clone_outlets: (bool) whether to clone datajob's outlets :return: DataProcessInstance """ dpi: DataProcessInstance = DataProcessInstance( diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py index eebc548b9ab976..3e1a401db6b36c 100644 --- a/metadata-ingestion/src/datahub/cli/cli_utils.py +++ b/metadata-ingestion/src/datahub/cli/cli_utils.py @@ -137,7 +137,7 @@ def get_details_from_config(): gms_token = gms_config.token return gms_host, gms_token except yaml.YAMLError as exc: - click.secho(f"{DATAHUB_CONFIG_PATH} malformatted, error: {exc}", bold=True) + click.secho(f"{DATAHUB_CONFIG_PATH} malformed, error: {exc}", bold=True) return None, None @@ -227,13 +227,13 @@ def get_session_and_host(): def test_connection(): (session, host) = get_session_and_host() - url = host + "/config" + url = f"{host}/config" response = session.get(url) response.raise_for_status() def test_connectivity_complain_exit(operation_name: str) -> None: - """Test connectivty to metadata-service, log operation name and exit""" + """Test connectivity to metadata-service, log operation name and exit""" # First test connectivity try: test_connection() @@ -330,10 +330,7 @@ def post_delete_references_endpoint( path: str, cached_session_host: Optional[Tuple[Session, str]] = None, ) -> Tuple[int, List[Dict]]: - if not cached_session_host: - session, gms_host = get_session_and_host() - else: - session, gms_host = cached_session_host + session, gms_host = cached_session_host or get_session_and_host() url = gms_host + path payload = json.dumps(payload_obj) @@ -349,10 +346,7 @@ def post_delete_endpoint( path: str, cached_session_host: Optional[Tuple[Session, str]] = None, ) -> typing.Tuple[str, int]: - if not cached_session_host: - session, gms_host = get_session_and_host() - else: - session, gms_host = cached_session_host + session, gms_host = cached_session_host or get_session_and_host() url = gms_host + path return post_delete_endpoint_with_session_and_url(session, url, payload_obj) @@ -402,9 +396,7 @@ def get_urns_by_filter( "condition": "EQUAL", } ) - if platform is not None and ( - entity_type_lower == "chart" or entity_type_lower == "dashboard" - ): + if platform is not None and entity_type_lower in {"chart", "dashboard"}: filter_criteria.append( { "field": "tool", @@ -512,10 +504,7 @@ def batch_get_ids( session, gms_host = get_session_and_host() endpoint: str = "/entitiesV2" url = gms_host + endpoint - ids_to_get = [] - for id in ids: - ids_to_get.append(Urn.url_encode(id)) - + ids_to_get = [Urn.url_encode(id) for id in ids] response = session.get( f"{url}?ids=List({','.join(ids_to_get)})", ) @@ -572,11 +561,7 @@ def get_entity( aspect: Optional[List] = None, cached_session_host: Optional[Tuple[Session, str]] = None, ) -> Dict: - if not cached_session_host: - session, gms_host = get_session_and_host() - else: - session, gms_host = cached_session_host - + session, gms_host = cached_session_host or get_session_and_host() if urn.startswith("urn%3A"): # we assume the urn is already encoded encoded_urn: str = urn @@ -589,7 +574,7 @@ def get_entity( endpoint: str = f"/entitiesV2/{encoded_urn}" if aspect and len(aspect): - endpoint = endpoint + "?aspects=List(" + ",".join(aspect) + ")" + endpoint = f"{endpoint}?aspects=List(" + ",".join(aspect) + ")" response = session.get(gms_host + endpoint) response.raise_for_status() @@ -602,12 +587,8 @@ def post_entity( aspect_name: str, aspect_value: Dict, cached_session_host: Optional[Tuple[Session, str]] = None, -) -> Dict: - if not cached_session_host: - session, gms_host = get_session_and_host() - else: - session, gms_host = cached_session_host - +) -> int: + session, gms_host = cached_session_host or get_session_and_host() endpoint: str = "/aspects/?action=ingestProposal" proposal = { @@ -704,10 +685,7 @@ def get_latest_timeseries_aspect_values( timeseries_aspect_name: str, cached_session_host: Optional[Tuple[Session, str]], ) -> Dict: - if not cached_session_host: - session, gms_host = get_session_and_host() - else: - session, gms_host = cached_session_host + session, gms_host = cached_session_host or get_session_and_host() query_body = { "urn": entity_urn, "entity": guess_entity_type(entity_urn), @@ -758,14 +736,9 @@ def get_aspects_for_entity( aspect_value["aspect"]["value"] = json.loads( aspect_value["aspect"]["value"] ) - aspect_list.update( - # Follow the convention used for non-timeseries aspects. - { - aspect_cls.RECORD_SCHEMA.fullname.replace( - "pegasus2avro.", "" - ): aspect_value - } - ) + aspect_list[ + aspect_cls.RECORD_SCHEMA.fullname.replace("pegasus2avro.", "") + ] = aspect_value aspect_map: Dict[str, Union[dict, DictWrapper]] = {} for a in aspect_list.values(): @@ -789,4 +762,4 @@ def get_aspects_for_entity( if aspects: return {k: v for (k, v) in aspect_map.items() if k in aspects} else: - return {k: v for (k, v) in aspect_map.items()} + return dict(aspect_map) diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py index 1ca0ac864693b1..a11fa3e6703520 100644 --- a/metadata-ingestion/src/datahub/cli/delete_cli.py +++ b/metadata-ingestion/src/datahub/cli/delete_cli.py @@ -182,7 +182,7 @@ def delete( else: # log warn include_removed + hard is the only way to work if include_removed and soft: - logger.warn( + logger.warning( "A filtered delete including soft deleted entities is redundant, because it is a soft delete by default. Please use --include-removed in conjunction with --hard" ) # Filter based delete @@ -234,16 +234,16 @@ def delete_with_filters( logger.info(f"datahub configured with {gms_host}") emitter = rest_emitter.DatahubRestEmitter(gms_server=gms_host, token=token) batch_deletion_result = DeletionResult() - urns = [ - u - for u in cli_utils.get_urns_by_filter( + urns = list( + cli_utils.get_urns_by_filter( env=env, platform=platform, search_query=search_query, entity_type=entity_type, include_removed=include_removed, ) - ] + ) + logger.info( f"Filter matched {len(urns)} entities. Sample: {choices(urns, k=min(5, len(urns)))}" ) @@ -284,12 +284,12 @@ def _delete_one_urn( if soft: # Add removed aspect - if not cached_emitter: + if cached_emitter: + emitter = cached_emitter + else: _, gms_host = cli_utils.get_session_and_host() token = cli_utils.get_token() emitter = rest_emitter.DatahubRestEmitter(gms_server=gms_host, token=token) - else: - emitter = cached_emitter if not dry_run: emitter.emit_mcp( MetadataChangeProposalWrapper( @@ -305,18 +305,19 @@ def _delete_one_urn( ) else: logger.info(f"[Dry-run] Would soft-delete {urn}") + elif not dry_run: + payload_obj = {"urn": urn} + urn, rows_affected = cli_utils.post_delete_endpoint( + payload_obj, + "/entities?action=delete", + cached_session_host=cached_session_host, + ) + deletion_result.num_records = rows_affected else: - if not dry_run: - payload_obj = {"urn": urn} - urn, rows_affected = cli_utils.post_delete_endpoint( - payload_obj, - "/entities?action=delete", - cached_session_host=cached_session_host, - ) - deletion_result.num_records = rows_affected - else: - logger.info(f"[Dry-run] Would hard-delete {urn}") - deletion_result.num_records = UNKNOWN_NUM_RECORDS # since we don't know how many rows will be affected + logger.info(f"[Dry-run] Would hard-delete {urn}") + deletion_result.num_records = ( + UNKNOWN_NUM_RECORDS # since we don't know how many rows will be affected + ) deletion_result.end() return deletion_result diff --git a/metadata-ingestion/src/datahub/cli/docker_check.py b/metadata-ingestion/src/datahub/cli/docker_check.py index e530f4d19616f3..25719cef2334d9 100644 --- a/metadata-ingestion/src/datahub/cli/docker_check.py +++ b/metadata-ingestion/src/datahub/cli/docker_check.py @@ -88,10 +88,11 @@ def check_local_docker_containers(preflight_only: bool = False) -> List[str]: if len(containers) == 0: issues.append("quickstart.sh or dev.sh is not running") else: - existing_containers = set(container.name for container in containers) + existing_containers = {container.name for container in containers} missing_containers = set(REQUIRED_CONTAINERS) - existing_containers - for missing in missing_containers: - issues.append(f"{missing} container is not present") + issues.extend( + f"{missing} container is not present" for missing in missing_containers + ) # Check that the containers are running and healthy. for container in containers: diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py index 55d3a0cdc04760..ecb7d80e30fcfa 100644 --- a/metadata-ingestion/src/datahub/cli/ingest_cli.py +++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py @@ -79,7 +79,7 @@ def ingest() -> None: type=bool, is_flag=True, default=False, - help="Supress display of variable values in logs by supressing elaborae stacktrace (stackprinter) during ingestion failures", + help="Suppress display of variable values in logs by suppressing elaborate stacktrace (stackprinter) during ingestion failures", ) @click.pass_context @upgrade.check_upgrade @@ -313,14 +313,14 @@ def rollback( current_time = now.strftime("%Y-%m-%d %H:%M:%S") try: - folder_name = report_dir + "/" + current_time + folder_name = f"{report_dir}/{current_time}" - ingestion_config_file_name = folder_name + "/config.json" + ingestion_config_file_name = f"{folder_name}/config.json" os.makedirs(os.path.dirname(ingestion_config_file_name), exist_ok=True) with open(ingestion_config_file_name, "w") as file_handle: json.dump({"run_id": run_id}, file_handle) - csv_file_name = folder_name + "/unsafe_entities.csv" + csv_file_name = f"{folder_name}/unsafe_entities.csv" with open(csv_file_name, "w") as file_handle: writer = csv.writer(file_handle) writer.writerow(["urn"]) @@ -329,4 +329,4 @@ def rollback( except IOError as e: print(e) - sys.exit("Unable to write reports to " + report_dir) + sys.exit(f"Unable to write reports to {report_dir}") diff --git a/metadata-ingestion/src/datahub/cli/migration_utils.py b/metadata-ingestion/src/datahub/cli/migration_utils.py index b383e2849b6b9b..79546e07ac056a 100644 --- a/metadata-ingestion/src/datahub/cli/migration_utils.py +++ b/metadata-ingestion/src/datahub/cli/migration_utils.py @@ -218,8 +218,8 @@ def modify_urn_list_for_aspect( new_urn: str, ) -> DictWrapper: - if hasattr(UrnListModifier, aspect_name + "_modifier"): - modifier = getattr(UrnListModifier, aspect_name + "_modifier") + if hasattr(UrnListModifier, f"{aspect_name}_modifier"): + modifier = getattr(UrnListModifier, f"{aspect_name}_modifier") return modifier( aspect=aspect, relationship_type=relationship_type, diff --git a/metadata-ingestion/src/datahub/cli/timeline_cli.py b/metadata-ingestion/src/datahub/cli/timeline_cli.py index 40c5af4e1e78a0..579dff5425a112 100644 --- a/metadata-ingestion/src/datahub/cli/timeline_cli.py +++ b/metadata-ingestion/src/datahub/cli/timeline_cli.py @@ -19,46 +19,40 @@ def pretty_field_path(field_path: str) -> str: - if field_path.startswith("[version=2.0]"): + if not field_path.startswith("[version=2.0]"): + return field_path # breakpoint() # parse schema field - tokens = [ - t - for t in field_path.split(".") - if not (t.startswith("[") or t.endswith("]")) - ] - path = ".".join(tokens) - return path - else: - return field_path + tokens = [ + t + for t in field_path.split(".") + if not t.startswith("[") and not t.endswith("]") + ] + + return ".".join(tokens) def pretty_id(id: Optional[str]) -> str: if not id: return "" - else: - # breakpoint() - assert id is not None - if id.startswith("urn:li:datasetField:") or id.startswith( - "urn:li:schemaField:" - ): - # parse schema field - schema_field_key = schema_field_urn_to_key( - id.replace("urn:li:datasetField", "urn:li:schemaField") - ) - if schema_field_key: - assert schema_field_key is not None - field_path = schema_field_key.fieldPath - - return f"{colored('field','cyan')}:{colored(pretty_field_path(field_path),'white')}" - if id.startswith("[version=2.0]"): - return f"{colored('field','cyan')}:{colored(pretty_field_path(id),'white')}" - - if id.startswith("urn:li:dataset"): - # parse dataset urn - dataset_key = dataset_urn_to_key(id) - if dataset_key: - return f"{colored('dataset','cyan')}:{colored(dataset_key.platform,'white')}:{colored(dataset_key.name,'white')}" + # breakpoint() + assert id is not None + if id.startswith("urn:li:datasetField:") or id.startswith("urn:li:schemaField:"): + schema_field_key = schema_field_urn_to_key( + id.replace("urn:li:datasetField", "urn:li:schemaField") + ) + if schema_field_key: + assert schema_field_key is not None + field_path = schema_field_key.fieldPath + + return f"{colored('field','cyan')}:{colored(pretty_field_path(field_path),'white')}" + if id.startswith("[version=2.0]"): + return f"{colored('field','cyan')}:{colored(pretty_field_path(id),'white')}" + + if id.startswith("urn:li:dataset"): + dataset_key = dataset_urn_to_key(id) + if dataset_key: + return f"{colored('dataset','cyan')}:{colored(dataset_key.platform,'white')}:{colored(dataset_key.name,'white')}" # failed to prettify, return original return id @@ -196,10 +190,10 @@ def timeline( ) change_color = ( "green" - if change_txn.get("semVerChange") == "MINOR" - or change_txn.get("semVerChange") == "PATCH" + if change_txn.get("semVerChange") in ["MINOR", "PATCH"] else "red" ) + print( f"{colored(change_instant,'cyan')} - {colored(change_txn['semVer'],change_color)}" ) diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py index 716572babc1e41..be3c6a13d3599b 100644 --- a/metadata-ingestion/src/datahub/configuration/common.py +++ b/metadata-ingestion/src/datahub/configuration/common.py @@ -39,10 +39,7 @@ class OperationalError(PipelineExecutionError): def __init__(self, message: str, info: dict = None): self.message = message - if info: - self.info = info - else: - self.info = {} + self.info = info or {} class ConfigurationError(MetaError): @@ -128,10 +125,7 @@ def alphabet_pattern(self) -> Pattern: @property def regex_flags(self) -> int: - if self.ignoreCase: - return re.IGNORECASE - else: - return 0 + return re.IGNORECASE if self.ignoreCase else 0 @classmethod def allow_all(cls) -> "AllowDenyPattern": @@ -142,11 +136,10 @@ def allowed(self, string: str) -> bool: if re.match(deny_pattern, string, self.regex_flags): return False - for allow_pattern in self.allow: - if re.match(allow_pattern, string, self.regex_flags): - return True - - return False + return any( + re.match(allow_pattern, string, self.regex_flags) + for allow_pattern in self.allow + ) def is_fully_specified_allow_list(self) -> bool: """ @@ -155,10 +148,9 @@ def is_fully_specified_allow_list(self) -> bool: pattern into a 'search for the ones that are allowed' pattern, which can be much more efficient in some cases. """ - for allow_pattern in self.allow: - if not self.alphabet_pattern.match(allow_pattern): - return False - return True + return all( + self.alphabet_pattern.match(allow_pattern) for allow_pattern in self.allow + ) def get_allowed_list(self) -> List[str]: """Return the list of allowed strings as a list, after taking into account deny patterns, if possible""" @@ -181,16 +173,12 @@ def all(cls) -> "KeyValuePattern": return KeyValuePattern() def value(self, string: str) -> List[str]: - for key in self.rules.keys(): - if re.match(key, string): - return self.rules[key] - return [] + return next( + (self.rules[key] for key in self.rules.keys() if re.match(key, string)), [] + ) def matched(self, string: str) -> bool: - for key in self.rules.keys(): - if re.match(key, string): - return True - return False + return any(re.match(key, string) for key in self.rules.keys()) def is_fully_specified_key(self) -> bool: """ @@ -199,10 +187,7 @@ def is_fully_specified_key(self) -> bool: pattern into a 'search for the ones that are allowed' pattern, which can be much more efficient in some cases. """ - for key in self.rules.keys(): - if not self.alphabet_pattern.match(key): - return True - return False + return any(not self.alphabet_pattern.match(key) for key in self.rules.keys()) def get(self) -> Dict[str, List[str]]: """Return the list of allowed strings as a list, after taking into account deny patterns, if possible""" diff --git a/metadata-ingestion/src/datahub/configuration/import_resolver.py b/metadata-ingestion/src/datahub/configuration/import_resolver.py index 56e232d0403241..19627c7b8c9569 100644 --- a/metadata-ingestion/src/datahub/configuration/import_resolver.py +++ b/metadata-ingestion/src/datahub/configuration/import_resolver.py @@ -8,9 +8,7 @@ def _pydantic_resolver(v: Union[T, str]) -> T: - if isinstance(v, str): - return import_path(v) - return v + return import_path(v) if isinstance(v, str) else v def pydantic_resolve_key(field: str) -> classmethod: diff --git a/metadata-ingestion/src/datahub/configuration/kafka.py b/metadata-ingestion/src/datahub/configuration/kafka.py index 876db21086e88e..197322a2e566e1 100644 --- a/metadata-ingestion/src/datahub/configuration/kafka.py +++ b/metadata-ingestion/src/datahub/configuration/kafka.py @@ -27,7 +27,7 @@ def bootstrap_host_colon_port_comma(cls, val: str) -> str: else: host = entry assert re.match( - # This regex is quite loose. Many invalid hostnames or IPs will slip through, + # This regex is quite loose. Many invalid hostname's or IPs will slip through, # but it serves as a good first line of validation. We defer to Kafka for the # remaining validation. r"^[\w\-\.\:]+$", diff --git a/metadata-ingestion/src/datahub/configuration/yaml.py b/metadata-ingestion/src/datahub/configuration/yaml.py index ee710b07bab3d2..1f1172836f7448 100644 --- a/metadata-ingestion/src/datahub/configuration/yaml.py +++ b/metadata-ingestion/src/datahub/configuration/yaml.py @@ -9,5 +9,4 @@ class YamlConfigurationMechanism(ConfigurationMechanism): """Ability to load configuration from yaml files""" def load_config(self, config_fp: IO) -> dict: - config = yaml.safe_load(config_fp) - return config + return yaml.safe_load(config_fp) diff --git a/metadata-ingestion/src/datahub/emitter/kafka_emitter.py b/metadata-ingestion/src/datahub/emitter/kafka_emitter.py index f2dc663cf0677a..001097a2e42f5b 100644 --- a/metadata-ingestion/src/datahub/emitter/kafka_emitter.py +++ b/metadata-ingestion/src/datahub/emitter/kafka_emitter.py @@ -49,12 +49,11 @@ def validate_topic_routes(cls: "KafkaEmitterConfig", values: dict) -> dict: raise ConfigurationError( "Using both topic and topic_routes configuration for Kafka is not supported. Use only topic_routes" ) - else: - logger.warning( - "Looks like you're using the deprecated `topic` configuration. Please migrate to `topic_routes`." - ) - # upgrade topic provided to topic_routes mce entry - values["topic_routes"][MCE_KEY] = values["topic"] + logger.warning( + "Looks like you're using the deprecated `topic` configuration. Please migrate to `topic_routes`." + ) + # upgrade topic provided to topic_routes mce entry + values["topic_routes"][MCE_KEY] = values["topic"] return values @@ -70,8 +69,7 @@ def __init__(self, config: KafkaEmitterConfig): def convert_mce_to_dict( mce: MetadataChangeEvent, ctx: SerializationContext ) -> dict: - tuple_encoding = mce.to_obj(tuples=True) - return tuple_encoding + return mce.to_obj(tuples=True) mce_avro_serializer = AvroSerializer( schema_str=getMetadataChangeEventSchema(), @@ -83,8 +81,7 @@ def convert_mcp_to_dict( mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper], ctx: SerializationContext, ) -> dict: - tuple_encoding = mcp.to_obj(tuples=True) - return tuple_encoding + return mcp.to_obj(tuples=True) mcp_avro_serializer = AvroSerializer( schema_str=getMetadataChangeProposalSchema(), diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py index bf295c3e367931..5936546090acbc 100644 --- a/metadata-ingestion/src/datahub/emitter/mce_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py @@ -104,8 +104,8 @@ def schema_field_urn_to_key(schema_field_urn: str) -> Optional[SchemaFieldKeyCla pattern = r"urn:li:schemaField:\((.*),(.*)\)" results = re.search(pattern, schema_field_urn) if results is not None: - dataset_urn: str = results.group(1) - field_path: str = results.group(2) + dataset_urn: str = results[1] + field_path: str = results[2] return SchemaFieldKeyClass(parent=dataset_urn, fieldPath=field_path) return None @@ -114,9 +114,7 @@ def dataset_urn_to_key(dataset_urn: str) -> Optional[DatasetKeyClass]: pattern = r"urn:li:dataset:\(urn:li:dataPlatform:(.*),(.*),(.*)\)" results = re.search(pattern, dataset_urn) if results is not None: - return DatasetKeyClass( - platform=results.group(1), name=results.group(2), origin=results.group(3) - ) + return DatasetKeyClass(platform=results[1], name=results[2], origin=results[3]) return None @@ -128,9 +126,7 @@ def container_new_urn_to_key(dataset_urn: str) -> Optional[ContainerKeyClass]: pattern = r"urn:dh:container:0:\((.*)\)" results = re.search(pattern, dataset_urn) if results is not None: - return ContainerKeyClass( - guid=results.group(1), - ) + return ContainerKeyClass(guid=results[1]) return None @@ -146,9 +142,7 @@ def container_urn_to_key(guid: str) -> Optional[ContainerKeyClass]: pattern = r"urn:li:container:(.*)" results = re.search(pattern, guid) if results is not None: - return ContainerKeyClass( - guid=results.group(1), - ) + return ContainerKeyClass(guid=results[1]) return None @@ -156,8 +150,7 @@ def datahub_guid(obj: dict) -> str: obj_str = json.dumps( pre_json_transform(obj), separators=(",", ":"), sort_keys=True ).encode("utf-8") - datahub_guid = md5(obj_str).hexdigest() - return datahub_guid + return md5(obj_str).hexdigest() def make_assertion_urn(assertion_id: str) -> str: diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py index 7aed2e29137492..055db3c6a4ad61 100644 --- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py +++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py @@ -233,7 +233,9 @@ def gen_containers( def add_dataset_to_container( - container_key: KeyType, dataset_urn: str + # FIXME: Union requires two or more type arguments + container_key: KeyType, + dataset_urn: str, ) -> Iterable[Union[MetadataWorkUnit]]: container_urn = make_container_urn( guid=container_key.guid(), diff --git a/metadata-ingestion/src/datahub/emitter/serialization_helper.py b/metadata-ingestion/src/datahub/emitter/serialization_helper.py index 5a348ce267b10f..cad4e9dd3270fc 100644 --- a/metadata-ingestion/src/datahub/emitter/serialization_helper.py +++ b/metadata-ingestion/src/datahub/emitter/serialization_helper.py @@ -16,10 +16,12 @@ def _json_transform(obj: Any, from_pattern: str, to_pattern: str) -> Any: field = obj["fieldDiscriminator"] return {field: _json_transform(obj[field], from_pattern, to_pattern)} - new_obj: Any = {} - for key, value in obj.items(): - if value is not None: - new_obj[key] = _json_transform(value, from_pattern, to_pattern) + new_obj: Any = { + key: _json_transform(value, from_pattern, to_pattern) + for key, value in obj.items() + if value is not None + } + return new_obj elif isinstance(obj, list): new_obj = [_json_transform(item, from_pattern, to_pattern) for item in obj] diff --git a/metadata-ingestion/src/datahub/ingestion/api/committable.py b/metadata-ingestion/src/datahub/ingestion/api/committable.py index f1aada4477f1ab..e41eb24abc2d96 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/committable.py +++ b/metadata-ingestion/src/datahub/ingestion/api/committable.py @@ -55,7 +55,7 @@ def __init__( super(_CommittableConcrete, self).__init__(state_to_commit=state_to_commit) def has_successfully_committed(self) -> bool: - return True if not self.state_to_commit or self.committed else False + return bool(not self.state_to_commit or self.committed) @abstractmethod def get_previous_states( diff --git a/metadata-ingestion/src/datahub/ingestion/api/common.py b/metadata-ingestion/src/datahub/ingestion/api/common.py index 56c21a7f39c627..fd458f9b4fd980 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/common.py +++ b/metadata-ingestion/src/datahub/ingestion/api/common.py @@ -55,8 +55,8 @@ def __init__( self.pipeline_name = pipeline_name self.dry_run_mode = dry_run self.preview_mode = preview_mode - self.reporters: Dict[str, Committable] = dict() - self.checkpointers: Dict[str, Committable] = dict() + self.reporters: Dict[str, Committable] = {} + self.checkpointers: Dict[str, Committable] = {} self._set_dataset_urn_to_lower_if_needed() def _set_dataset_urn_to_lower_if_needed(self) -> None: @@ -81,11 +81,8 @@ def register_reporter(self, committable: Committable) -> None: self.reporters[committable.name] = committable def get_reporters(self) -> Iterable[Committable]: - for committable in self.reporters.values(): - yield committable + yield from self.reporters.values() def get_committables(self) -> Iterable[Tuple[str, Committable]]: - for reporting_item_commitable in self.reporters.items(): - yield reporting_item_commitable - for checkpointing_item_commitable in self.checkpointers.items(): - yield checkpointing_item_commitable + yield from self.reporters.items() + yield from self.checkpointers.items() diff --git a/metadata-ingestion/src/datahub/ingestion/api/decorators.py b/metadata-ingestion/src/datahub/ingestion/api/decorators.py index 9b3f35ae9d8116..20867a8571b24c 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/decorators.py +++ b/metadata-ingestion/src/datahub/ingestion/api/decorators.py @@ -37,8 +37,9 @@ def wrapper(cls: Type) -> Type: setattr( cls, "get_platform_id", - lambda: id if id else platform_name.lower().replace(" ", "-"), + lambda: id or platform_name.lower().replace(" ", "-"), ) + return cls if id and " " in id: @@ -56,7 +57,7 @@ class SupportStatus(Enum): """ INCUBATING = auto() """ - Incubating Sources are ready for DataHub Community adoption but have not been tested for a wide variety of edge-cases. We eagerly solicit feedback from the Community to streghten the connector; minor version changes may arise in future releases. + Incubating Sources are ready for DataHub Community adoption but have not been tested for a wide variety of edge-cases. We eagerly solicit feedback from the Community to strengthen the connector; minor version changes may arise in future releases. """ TESTING = auto() """ diff --git a/metadata-ingestion/src/datahub/ingestion/api/registry.py b/metadata-ingestion/src/datahub/ingestion/api/registry.py index f83921639c227e..a8529817e2500f 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/registry.py +++ b/metadata-ingestion/src/datahub/ingestion/api/registry.py @@ -79,16 +79,15 @@ def register_disabled( def _ensure_not_lazy(self, key: str) -> Union[Type[T], Exception]: path = self._mapping[key] - if isinstance(path, str): - try: - plugin_class = import_path(path) - self.register(key, plugin_class, override=True) - return plugin_class - except (AssertionError, ModuleNotFoundError, ImportError) as e: - self.register_disabled(key, e, override=True) - return e - else: + if not isinstance(path, str): return path + try: + plugin_class = import_path(path) + self.register(key, plugin_class, override=True) + return plugin_class + except (AssertionError, ImportError) as e: + self.register_disabled(key, e, override=True) + return e def is_enabled(self, key: str) -> bool: tp = self._mapping[key] diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/protobuf_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/protobuf_util.py index e5f976ff88dd56..51fdbd8fbdb680 100644 --- a/metadata-ingestion/src/datahub/ingestion/extractor/protobuf_util.py +++ b/metadata-ingestion/src/datahub/ingestion/extractor/protobuf_util.py @@ -365,11 +365,7 @@ def _schema_fields_from_dag( if generations and generations[0]: roots = generations[0] - leafs: List = [] - for node in graph: - if graph.out_degree(node) == 0: - leafs.append(node) - + leafs: List = [node for node in graph if graph.out_degree(node) == 0] type_of_nodes: Dict = nx.get_node_attributes(graph, "node_type") for root in roots: diff --git a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_reporting_provider.py b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_reporting_provider.py index 568c41aac9dbd9..1bb89236cc51ad 100644 --- a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_reporting_provider.py +++ b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_reporting_provider.py @@ -115,7 +115,7 @@ def get_previous_states( ) -> List[ReportingJobStatesMap]: if not last_only: raise NotImplementedError( - "Currently supports retrieving only the last commited state." + "Currently supports retrieving only the last committed state." ) if filter_opt is not None: raise NotImplementedError( diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index 969634e3163910..d0b1e300b9184b 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -54,11 +54,10 @@ def run_id_should_be_semantic( cls, v: Optional[str], values: Dict[str, Any], **kwargs: Any ) -> str: if v == "__DEFAULT_RUN_ID": - if "source" in values: - if hasattr(values["source"], "type"): - source_type = values["source"].type - current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") - return f"{source_type}-{current_time}" + if "source" in values and hasattr(values["source"], "type"): + source_type = values["source"].type + current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S") + return f"{source_type}-{current_time}" return str(uuid.uuid1()) # default run_id if we cannot infer a source type else: @@ -88,12 +87,11 @@ def default_sink_is_datahub_rest(cls, values: Dict[str, Any]) -> Any: def datahub_api_should_use_rest_sink_as_default( cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any ) -> Optional[DatahubClientConfig]: - if v is None: - if "sink" in values and hasattr(values["sink"], "type"): - sink_type = values["sink"].type - if sink_type == "datahub-rest": - sink_config = values["sink"].config - v = DatahubClientConfig.parse_obj(sink_config) + if v is None and "sink" in values and hasattr(values["sink"], "type"): + sink_type = values["sink"].type + if sink_type == "datahub-rest": + sink_config = values["sink"].config + v = DatahubClientConfig.parse_obj(sink_config) return v @@ -268,11 +266,10 @@ def process_commits(self) -> None: if self.source.get_report().failures or self.sink.get_report().failures else False ) - has_warnings: bool = ( - True - if self.source.get_report().warnings or self.sink.get_report().warnings - else False + has_warnings: bool = bool( + self.source.get_report().warnings or self.sink.get_report().warnings ) + for name, committable in self.ctx.get_committables(): commit_policy: CommitPolicy = committable.commit_policy diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py index f931b9039303a0..93d3aa5f6c85d3 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py @@ -77,8 +77,8 @@ def write_record_async( self.report, record_envelope, write_callback ).kafka_callback, ) - elif isinstance(record, MetadataChangeProposalWrapper) or isinstance( - record, MetadataChangeProposalClass + elif isinstance( + record, (MetadataChangeProposalWrapper, MetadataChangeProposalClass) ): self.emitter.emit_mcp_async( record, diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py index 74e536350457b5..d95eb245ccfb50 100644 --- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py +++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py @@ -1,4 +1,5 @@ import concurrent.futures +import contextlib import functools import logging from dataclasses import dataclass @@ -111,13 +112,10 @@ def _write_done_callback( else: # trim exception stacktraces when reporting warnings if "stackTrace" in e.info: - try: + with contextlib.suppress(Exception): e.info["stackTrace"] = "\n".join( - e.info["stackTrace"].split("\n")[0:2] + e.info["stackTrace"].split("\n")[:2] ) - except Exception: - # ignore failures in trimming - pass record = record_envelope.record if isinstance(record, MetadataChangeProposalWrapper): # include information about the entity that failed diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py index 93768a1aa7e73b..4ad1fd7a836c5b 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py +++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py @@ -1059,7 +1059,7 @@ def get_s3_tags() -> Optional[GlobalTagsClass]: ] ) except self.s3_client.exceptions.ClientError: - logger.warn(f"No tags found for bucket={bucket_name}") + logger.warning(f"No tags found for bucket={bucket_name}") if self.source_config.use_s3_object_tags: key_prefix = s3_util.get_key_prefix( table["StorageDescriptor"]["Location"] @@ -1078,7 +1078,7 @@ def get_s3_tags() -> Optional[GlobalTagsClass]: else: # Unlike bucket tags, if an object does not have tags, it will just return an empty array # as opposed to an exception. - logger.warn( + logger.warning( f"No tags found for bucket={bucket_name} key={key_prefix}" ) if len(tags_to_add) == 0: @@ -1097,7 +1097,7 @@ def get_s3_tags() -> Optional[GlobalTagsClass]: [current_tag.tag for current_tag in current_tags.tags] ) else: - logger.warn( + logger.warning( "Could not connect to DatahubApi. No current tags to maintain" ) diff --git a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py index c5a6c13bf695df..74fca0d2654519 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py +++ b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py @@ -255,9 +255,7 @@ def host_colon_port_comma(cls, host_val: str) -> str: @property def http_auth(self) -> Optional[Tuple[str, str]]: - if self.username is None: - return None - return self.username, self.password or "" + return None if self.username is None else (self.username, self.password or "") @platform_name("Elastic Search") diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py index c17ae5c14a85fd..7d4f5360ad4202 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/feast.py +++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py @@ -4,23 +4,23 @@ from pydantic import Field -if sys.version_info >= (3, 7): - from feast import ( - BigQuerySource, - Entity, - Feature, - FeatureStore, - FeatureView, - FileSource, - KafkaSource, - KinesisSource, - OnDemandFeatureView, - ValueType, - ) - from feast.data_source import DataSource, RequestDataSource -else: +if sys.version_info < (3, 7): raise ModuleNotFoundError("The feast plugin requires Python 3.7 or newer.") +from feast import ( + BigQuerySource, + Entity, + Feature, + FeatureStore, + FeatureView, + FileSource, + KafkaSource, + KinesisSource, + OnDemandFeatureView, + ValueType, +) +from feast.data_source import DataSource, RequestDataSource + import datahub.emitter.mce_builder as builder from datahub.configuration.common import ConfigModel from datahub.emitter.mce_builder import DEFAULT_ENV @@ -52,6 +52,7 @@ assert sys.version_info >= (3, 7) # needed for mypy +# FIXME: ValueType module cannot be used as a type _field_type_mapping: Dict[ValueType, str] = { ValueType.UNKNOWN: MLFeatureDataType.UNKNOWN, ValueType.BYTES: MLFeatureDataType.BYTE, @@ -218,6 +219,7 @@ def _get_entity_workunit( def _get_feature_workunit( self, + # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type feature_view: Union[FeatureView, OnDemandFeatureView], feature: Feature, ) -> MetadataWorkUnit: diff --git a/metadata-ingestion/src/datahub/ingestion/source/ldap.py b/metadata-ingestion/src/datahub/ingestion/source/ldap.py index cf751de06d1e7e..5e2d31a8031683 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/ldap.py +++ b/metadata-ingestion/src/datahub/ingestion/source/ldap.py @@ -205,9 +205,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: ) _rtype, rdata, _rmsgid, serverctrls = self.ldap_client.result3(msgid) except ldap.LDAPError as e: - self.report.report_failure( - "ldap-control", "LDAP search failed: {}".format(e) - ) + self.report.report_failure("ldap-control", f"LDAP search failed: {e}") break for dn, attrs in rdata: @@ -265,10 +263,7 @@ def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUn _m_dn, m_attrs = result[1][0] manager_ldap = guess_person_ldap(m_attrs, self.config, self.report) except ldap.LDAPError as e: - self.report.report_warning( - dn, "manager LDAP search failed: {}".format(e) - ) - + self.report.report_warning(dn, f"manager LDAP search failed: {e}") mce = self.build_corp_user_mce(dn, attrs, manager_ldap) if mce: wu = MetadataWorkUnit(dn, mce) diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker.py b/metadata-ingestion/src/datahub/ingestion/source/looker.py index 252d9f553e0572..7c1368f814e4e0 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker.py @@ -216,11 +216,11 @@ def url(self, base_url: str) -> str: # If the base_url contains a port number (like https://company.looker.com:19999) remove the port number m = re.match("^(.*):([0-9]+)$", base_url) if m is not None: - base_url = m.group(1) + base_url = m[1] if self.look_id is not None: - return base_url + "/looks/" + self.look_id + return f"{base_url}/looks/{self.look_id}" else: - return base_url + "/x/" + self.query_slug + return f"{base_url}/x/{self.query_slug}" def get_urn_element_id(self): # A dashboard element can use a look or just a raw query against an explore @@ -270,23 +270,22 @@ def __init__(self, client: Looker31SDK): def get_by_id( self, id: int, transport_options: Optional[TransportOptions] ) -> Optional[LookerUser]: - logger.debug("Will get user {}".format(id)) + logger.debug(f"Will get user {id}") if id in self.user_map: return self.user_map[id] - else: - try: - raw_user: User = self.client.user( - id, - fields=self.fields, - transport_options=transport_options, - ) - looker_user = LookerUser._from_user(raw_user) - self.user_map[id] = looker_user - return looker_user - except SDKError as e: - logger.warn("Could not find user with id {}".format(id)) - logger.warn("Failure was {}".format(e)) - return None + try: + raw_user: User = self.client.user( + id, + fields=self.fields, + transport_options=transport_options, + ) + looker_user = LookerUser._from_user(raw_user) + self.user_map[id] = looker_user + return looker_user + except SDKError as e: + logger.warning(f"Could not find user with id {id}") + logger.warning(f"Failure was {e}") + return None @dataclass @@ -306,8 +305,8 @@ def url(self, base_url): # If the base_url contains a port number (like https://company.looker.com:19999) remove the port number m = re.match("^(.*):([0-9]+)$", base_url) if m is not None: - base_url = m.group(1) - return base_url + "/dashboards/" + self.id + base_url = m[1] + return f"{base_url}/dashboards/{self.id}" def get_urn_dashboard_id(self): return f"dashboards.{self.id}" @@ -350,8 +349,7 @@ def _extract_view_from_field(field: str) -> str: assert ( field.count(".") == 1 ), f"Error: A field must be prefixed by a view name, field is: {field}" - view_name = field.split(".")[0] - return view_name + return field.split(".")[0] def _get_views_from_fields(self, fields: List[str]) -> List[str]: field_set = set(fields) @@ -449,12 +447,9 @@ def _get_looker_dashboard_element( # noqa: C901 raise ValueError("Element ID can't be None") if element.query is not None: - explores = [] fields = self._get_fields_from_query(element.query) - if element.query.view is not None: - # Get the explore from the view directly - explores = [element.query.view] - + # Get the explore from the view directly + explores = [element.query.view] if element.query.view is not None else [] logger.debug( "Element {}: Explores added: {}".format(element.title, explores) ) @@ -957,7 +952,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: else False, ) else: - raise Exception("Unexpected type of event {}".format(event)) + raise Exception(f"Unexpected type of event {event}") self.reporter.report_workunit(workunit) yield workunit diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker_common.py index 0c8c496a9041c0..28f2b23c1a258a 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/looker_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/looker_common.py @@ -110,12 +110,8 @@ class LookerExploreNamingConfig(ConfigModel): def init_naming_pattern(cls, v): if isinstance(v, NamingPattern): return v - else: - assert isinstance(v, str), "pattern must be a string" - naming_pattern = NamingPattern( - allowed_vars=naming_pattern_variables, pattern=v - ) - return naming_pattern + assert isinstance(v, str), "pattern must be a string" + return NamingPattern(allowed_vars=naming_pattern_variables, pattern=v) @validator("explore_naming_pattern", "explore_browse_pattern", always=True) def validate_naming_pattern(cls, v): @@ -143,12 +139,8 @@ class LookerViewNamingConfig(ConfigModel): def init_naming_pattern(cls, v): if isinstance(v, NamingPattern): return v - else: - assert isinstance(v, str), "pattern must be a string" - naming_pattern = NamingPattern( - allowed_vars=naming_pattern_variables, pattern=v - ) - return naming_pattern + assert isinstance(v, str), "pattern must be a string" + return NamingPattern(allowed_vars=naming_pattern_variables, pattern=v) @validator("view_naming_pattern", "view_browse_pattern", always=True) def validate_naming_pattern(cls, v): @@ -314,8 +306,7 @@ def _extract_view_from_field(field: str) -> str: assert ( field.count(".") == 1 ), f"Error: A field must be prefixed by a view name, field is: {field}" - view_name = field.split(".")[0] - return view_name + return field.split(".")[0] @staticmethod def _get_field_type( @@ -336,8 +327,7 @@ def _get_field_type( ) type_class = NullTypeClass - data_type = SchemaFieldDataType(type=type_class()) - return data_type + return SchemaFieldDataType(type=type_class()) @staticmethod def _get_schema( @@ -346,7 +336,7 @@ def _get_schema( view_fields: List[ViewField], reporter: SourceReport, ) -> Optional[SchemaMetadataClass]: - if view_fields == []: + if not view_fields: return None fields, primary_keys = LookerUtil._get_fields_and_primary_keys( view_fields=view_fields, reporter=reporter @@ -633,16 +623,13 @@ def from_api( # noqa: C901 source_file=explore.source_file, ) except SDKError as e: - logger.warn( - "Failed to extract explore {} from model {}.".format( - explore_name, model - ) + logger.warning( + f"Failed to extract explore {explore_name} from model {model}." ) logger.debug( - "Failed to extract explore {} from model {} with {}".format( - explore_name, model, e - ) + f"Failed to extract explore {explore_name} from model {model} with {e}" ) + except AssertionError: reporter.report_warning( key="chart-", @@ -693,7 +680,7 @@ def _get_url(self, base_url): # If the base_url contains a port number (like https://company.looker.com:19999) remove the port number m = re.match("^(.*):([0-9]+)$", base_url) if m is not None: - base_url = m.group(1) + base_url = m[1] return f"{base_url}/explore/{self.model_name}/{self.name}" def _to_metadata_events( # noqa: C901 diff --git a/metadata-ingestion/src/datahub/ingestion/source/lookml.py b/metadata-ingestion/src/datahub/ingestion/source/lookml.py index a08b0bf30bb100..aee876da2f7bac 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/lookml.py +++ b/metadata-ingestion/src/datahub/ingestion/source/lookml.py @@ -130,18 +130,17 @@ def from_looker_connection( ".*": _get_generic_definition, } - if looker_connection.dialect_name is not None: - for extractor_pattern, extracting_function in extractors.items(): - if re.match(extractor_pattern, looker_connection.dialect_name): - (platform, db, schema) = extracting_function(looker_connection) - return cls(platform=platform, default_db=db, default_schema=schema) - raise ConfigurationError( - f"Could not find an appropriate platform for looker_connection: {looker_connection.name} with dialect: {looker_connection.dialect_name}" - ) - else: + if looker_connection.dialect_name is None: raise ConfigurationError( f"Unable to fetch a fully filled out connection for {looker_connection.name}. Please check your API permissions." ) + for extractor_pattern, extracting_function in extractors.items(): + if re.match(extractor_pattern, looker_connection.dialect_name): + (platform, db, schema) = extracting_function(looker_connection) + return cls(platform=platform, default_db=db, default_schema=schema) + raise ConfigurationError( + f"Could not find an appropriate platform for looker_connection: {looker_connection.name} with dialect: {looker_connection.dialect_name}" + ) class LookMLSourceConfig(LookerCommonConfig): @@ -591,7 +590,7 @@ def from_looker_dict( if sql_table_name is not None else None ) - derived_table = looker_view.get("derived_table", None) + derived_table = looker_view.get("derived_table") dimensions = cls._get_fields( looker_view.get("dimensions", []), ViewFieldType.DIMENSION @@ -605,7 +604,7 @@ def from_looker_dict( fields: List[ViewField] = dimensions + dimension_groups + measures # also store the view logic and materialization - view_logic = looker_viewfile.raw_file_content[0:max_file_snippet_length] + view_logic = looker_viewfile.raw_file_content[:max_file_snippet_length] # Parse SQL from derived tables to extract dependencies if derived_table is not None: @@ -630,9 +629,7 @@ def from_looker_dict( if k in ["datagroup_trigger", "sql_trigger_value", "persist_for"]: materialized = True if "materialized_view" in derived_table: - materialized = ( - True if derived_table["materialized_view"] == "yes" else False - ) + materialized = derived_table["materialized_view"] == "yes" view_details = ViewProperties( materialized=materialized, viewLogic=view_logic, viewLanguage=view_lang @@ -653,15 +650,11 @@ def from_looker_dict( ) # If not a derived table, then this view essentially wraps an existing - # object in the database. - if sql_table_name is not None: - # If sql_table_name is set, there is a single dependency in the view, on the sql_table_name. - sql_table_names = [sql_table_name] - else: - # Otherwise, default to the view name as per the docs: - # https://docs.looker.com/reference/view-params/sql_table_name-for-view - sql_table_names = [view_name] - + # object in the database. If sql_table_name is set, there is a single + # dependency in the view, on the sql_table_name. + # Otherwise, default to the view name as per the docs: + # https://docs.looker.com/reference/view-params/sql_table_name-for-view + sql_table_names = [view_name] if sql_table_name is None else [sql_table_name] output_looker_view = LookerView( id=LookerViewId( project_name=project_name, model_name=model_name, view_name=view_name @@ -705,7 +698,7 @@ def _extract_metadata_from_sql_query( # Add those in if we detect that it is missing if not re.search(r"SELECT\s", sql_query, flags=re.I): # add a SELECT clause at the beginning - sql_query = "SELECT " + sql_query + sql_query = f"SELECT {sql_query}" if not re.search(r"FROM\s", sql_query, flags=re.I): # add a FROM clause at the end sql_query = f"{sql_query} FROM {sql_table_name if sql_table_name is not None else view_name}" @@ -714,7 +707,7 @@ def _extract_metadata_from_sql_query( sql_info = cls._get_sql_info(sql_query, sql_parser_path) sql_table_names = sql_info.table_names column_names = sql_info.column_names - if fields == []: + if not fields: # it seems like the view is defined purely as sql, let's try using the column names to populate the schema fields = [ # set types to unknown for now as our sql parser doesn't give us column types yet @@ -843,10 +836,7 @@ def _load_model(self, path: str) -> LookerModel: return looker_model def _platform_names_have_2_parts(self, platform: str) -> bool: - if platform in ["hive", "mysql", "athena"]: - return True - else: - return False + return platform in {"hive", "mysql", "athena"} def _generate_fully_qualified_name( self, sql_table_name: str, connection_def: LookerConnectionDefinition @@ -999,7 +989,6 @@ def _get_custom_properties(self, looker_view: LookerView) -> DatasetPropertiesCl def _build_dataset_mcps( self, looker_view: LookerView ) -> List[MetadataChangeProposalWrapper]: - events = [] subTypeEvent = MetadataChangeProposalWrapper( entityType="dataset", changeType=ChangeTypeClass.UPSERT, @@ -1007,7 +996,7 @@ def _build_dataset_mcps( aspectName="subTypes", aspect=SubTypesClass(typeNames=["view"]), ) - events.append(subTypeEvent) + events = [subTypeEvent] if looker_view.view_details is not None: viewEvent = MetadataChangeProposalWrapper( entityType="dataset", @@ -1048,9 +1037,7 @@ def _build_dataset_mce(self, looker_view: LookerView) -> MetadataChangeEvent: dataset_snapshot.aspects.append(schema_metadata) dataset_snapshot.aspects.append(self._get_custom_properties(looker_view)) - mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot) - - return mce + return MetadataChangeEvent(proposedSnapshot=dataset_snapshot) def get_project_name(self, model_name: str) -> str: if self.source_config.project_name is not None: @@ -1092,7 +1079,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 for file_path in model_files: self.reporter.report_models_scanned() - model_name = file_path.stem[0:-model_suffix_len] + model_name = file_path.stem[:-model_suffix_len] if not self.source_config.model_pattern.allowed(model_name): self.reporter.report_models_dropped(model_name) diff --git a/metadata-ingestion/src/datahub/ingestion/source/metabase.py b/metadata-ingestion/src/datahub/ingestion/source/metabase.py index 93308ff93b3226..98bcbcba591ebd 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/metabase.py +++ b/metadata-ingestion/src/datahub/ingestion/source/metabase.py @@ -1,4 +1,4 @@ -from datetime import datetime +from datetime import datetime, timezone from functools import lru_cache from typing import Dict, Iterable, Optional @@ -199,7 +199,7 @@ def get_timestamp_millis_from_ts_string(ts_str: str) -> int: try: return int(dp.parse(ts_str).timestamp() * 1000) except (dp.ParserError, OverflowError): - return int(datetime.utcnow().timestamp() * 1000) + return int(datetime.now(timezone.utc).timestamp() * 1000) def construct_dashboard_from_api_data( self, dashboard_info: dict @@ -449,7 +449,7 @@ def get_datasource_urn(self, card_details): schema_name, table_name = self.get_source_table_from_id(source_table_id) if table_name: source_paths.add( - f"{schema_name + '.' if schema_name else ''}{table_name}" + f"{f'{schema_name}.' if schema_name else ''}{table_name}" ) else: try: @@ -478,7 +478,7 @@ def get_datasource_urn(self, card_details): # Create dataset URNs dataset_urn = [] - dbname = f"{database_name + '.' if database_name else ''}" + dbname = f"{f'{database_name}.' if database_name else ''}" source_tables = list(map(lambda tbl: f"{dbname}{tbl}", source_paths)) dataset_urn = [ builder.make_dataset_urn_with_platform_instance( diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py index 8d6201867dd8b3..9710fe023a2560 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py +++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py @@ -172,9 +172,9 @@ def construct_schema_pymongo( maximum size of the document that will be considered for generating the schema. """ - doc_size_field = "temporary_doc_size_field" aggregations: List[Dict] = [] if is_version_gte_4_4: + doc_size_field = "temporary_doc_size_field" # create a temporary field to store the size of the document. filter on it and then remove it. aggregations = [ {"$addFields": {doc_size_field: {"$bsonSize": "$$ROOT"}}}, diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index bb8ac443555252..9677d5cbd3b5cb 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -338,8 +338,8 @@ def __init__(self, config: NifiSourceConfig, ctx: PipelineContext) -> None: if self.config.site_url_to_site_name is None: self.config.site_url_to_site_name = {} if ( - not urljoin(self.config.site_url, "/nifi/") - in self.config.site_url_to_site_name + urljoin(self.config.site_url, "/nifi/") + not in self.config.site_url_to_site_name ): self.config.site_url_to_site_name[ urljoin(self.config.site_url, "/nifi/") @@ -620,7 +620,7 @@ def create_nifi_flow(self): if about_response.ok: nifi_version = about_response.json().get("about", {}).get("version") else: - logger.warn("Failed to fetch version for nifi") + logger.warning("Failed to fetch version for nifi") cluster_response = self.session.get( url=urljoin(self.config.site_url, CLUSTER_ENDPOINT) ) @@ -630,7 +630,7 @@ def create_nifi_flow(self): cluster_response.json().get("clusterSummary", {}).get("clustered") ) else: - logger.warn("Failed to fetch cluster summary for flow") + logger.warning("Failed to fetch cluster summary for flow") pg_response = self.session.get( url=urljoin(self.config.site_url, PG_ENDPOINT) + "root" ) @@ -715,7 +715,7 @@ def fetch_provenance_events( attempts = 5 # wait for at most 5 attempts 5*1= 5 seconds while (not provenance.get("finished", False)) and attempts > 0: - logger.warn( + logger.warning( f"Provenance query not completed, attempts left : {attempts}" ) # wait until the uri returns percentcomplete 100 @@ -757,7 +757,7 @@ def fetch_provenance_events( f"provenance events could not be fetched for processor \ {processor.id} of type {processor.name}", ) - logger.warn(provenance_response.text) + logger.warning(provenance_response.text) return def report_warning(self, key: str, reason: str) -> None: @@ -774,7 +774,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901 rootpg = self.nifi_flow.root_process_group flow_name = rootpg.name # self.config.site_name flow_urn = builder.make_data_flow_urn(NIFI, rootpg.id, self.config.env) - flow_properties = dict() + flow_properties = {} if self.nifi_flow.clustered is not None: flow_properties["clustered"] = str(self.nifi_flow.clustered) if self.nifi_flow.version is not None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py index b71cb363b96e46..9548677e1cdc11 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py @@ -118,7 +118,7 @@ class ApiWorkUnit(MetadataWorkUnit): class APISource(Source, ABC): """ - This plugin is meant to gather dataset-like informations about OpenApi Endpoints. + This plugin is meant to gather dataset-like information about OpenApi Endpoints. As example, if by calling GET at the endpoint at `https://test_endpoint.com/api/users/` you obtain as result: ```JSON diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py index 830b6562755eb7..f33654daa15595 100755 --- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py +++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py @@ -20,9 +20,9 @@ def flatten(d: dict, prefix: str = "") -> Generator: for k, v in d.items(): if isinstance(v, dict): - yield from flatten(v, prefix + "." + k) + yield from flatten(v, f"{prefix}.{k}") else: - yield (prefix + "-" + k).strip(".") + yield f"{prefix}-{k}".strip(".") def flatten2list(d: dict) -> list: @@ -53,15 +53,15 @@ def request_call( headers = {"accept": "application/json"} if username is not None and password is not None: - response = requests.get( + return requests.get( url, headers=headers, auth=HTTPBasicAuth(username, password) ) + elif token is not None: - headers["Authorization"] = "Bearer " + token - response = requests.get(url, headers=headers) + headers["Authorization"] = f"Bearer {token}" + return requests.get(url, headers=headers) else: - response = requests.get(url, headers=headers) - return response + return requests.get(url, headers=headers) def get_swag_json( @@ -77,14 +77,13 @@ def get_swag_json( else: response = request_call(url=tot_url, username=username, password=password) - if response.status_code == 200: - try: - dict_data = json.loads(response.content) - except json.JSONDecodeError: # it's not a JSON! - dict_data = yaml.safe_load(response.content) - return dict_data - else: + if response.status_code != 200: raise Exception(f"Unable to retrieve {tot_url}, error {response.status_code}") + try: + dict_data = json.loads(response.content) + except json.JSONDecodeError: # it's not a JSON! + dict_data = yaml.safe_load(response.content) + return dict_data def get_url_basepath(sw_dict: dict) -> str: @@ -95,7 +94,7 @@ def get_url_basepath(sw_dict: dict) -> str: def check_sw_version(sw_dict: dict) -> None: - if "swagger" in sw_dict.keys(): + if "swagger" in sw_dict: v_split = sw_dict["swagger"].split(".") else: v_split = sw_dict["openapi"].split(".") @@ -176,8 +175,7 @@ def get_endpoints(sw_dict: dict) -> dict: # noqa: C901 if "parameters" in p_o["get"].keys(): url_details[p_k]["parameters"] = p_o["get"]["parameters"] - ord_d = dict(sorted(url_details.items())) # sorting for convenience - return ord_d + return dict(sorted(url_details.items())) def guessing_url_name(url: str, examples: dict) -> str: @@ -187,10 +185,7 @@ def guessing_url_name(url: str, examples: dict) -> str: extr_data = {"advancedcomputersearches": {'id': 202, 'name': '_unmanaged'}} -->> guessed_url = /advancedcomputersearches/name/_unmanaged/id/202' """ - if url[0] == "/": - url2op = url[1:] # operational url does not need the very first / - else: - url2op = url + url2op = url[1:] if url[0] == "/" else url divisions = url2op.split("/") # the very first part of the url should stay the same. @@ -211,14 +206,14 @@ def guessing_url_name(url: str, examples: dict) -> str: if div_pos > 0: root = root[: div_pos - 1] # like "base/field" should become "base" - if root in examples.keys(): + if root in examples: # if our root is contained in our samples examples... ex2use = root - elif root[:-1] in examples.keys(): + elif root[:-1] in examples: ex2use = root[:-1] - elif root.replace("/", ".") in examples.keys(): + elif root.replace("/", ".") in examples: ex2use = root.replace("/", ".") - elif root[:-1].replace("/", ".") in examples.keys(): + elif root[:-1].replace("/", ".") in examples: ex2use = root[:-1].replace("/", ".") else: return url @@ -277,8 +272,7 @@ def try_guessing(url: str, examples: dict) -> str: Any non-guessed name will stay as it was (with parenthesis{}) """ url_guess = guessing_url_name(url, examples) # try to fill with known informations - url_guess_id = maybe_theres_simple_id(url_guess) # try to fill IDs with "1"s... - return url_guess_id + return maybe_theres_simple_id(url_guess) def clean_url(url: str) -> str: diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi.py index 5cfba5fa2ec14b..ff0924349c0e91 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/powerbi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi.py @@ -131,7 +131,7 @@ class PowerBiAPIConfig(EnvBasedSourceConfigBase): authority = "https://login.microsoftonline.com/" def get_authority_url(self): - return "{}{}".format(self.authority, self.tenant_id) + return f"{self.authority}{self.tenant_id}" class PowerBiDashboardSourceConfig(PowerBiAPIConfig): @@ -216,7 +216,7 @@ class Table: tables: List[Any] def get_urn_part(self): - return "datasets.{}".format(self.id) + return f"datasets.{self.id}" def __members(self): return (self.id,) @@ -239,7 +239,7 @@ class Report: dataset: Any def get_urn_part(self): - return "reports.{}".format(self.id) + return f"reports.{self.id}" @dataclass class Tile: @@ -257,7 +257,7 @@ class CreatedFrom(Enum): createdFrom: CreatedFrom def get_urn_part(self): - return "charts.{}".format(self.id) + return f"charts.{self.id}" @dataclass class User: @@ -269,7 +269,7 @@ class User: principalType: str def get_urn_part(self): - return "users.{}".format(self.id) + return f"users.{self.id}" def __members(self): return (self.id,) @@ -296,7 +296,7 @@ class Dashboard: users: List[Any] def get_urn_part(self): - return "dashboards.{}".format(self.id) + return f"dashboards.{self.id}" def __members(self): return (self.id,) @@ -322,9 +322,9 @@ def __init__(self, config: PowerBiAPIConfig) -> None: ) # Test connection by generating a access token - LOGGER.info("Trying to connect to {}".format(self.__config.get_authority_url())) + LOGGER.info(f"Trying to connect to {self.__config.get_authority_url()}") self.get_access_token() - LOGGER.info("Able to connect to {}".format(self.__config.get_authority_url())) + LOGGER.info(f"Able to connect to {self.__config.get_authority_url()}") def __get_users(self, workspace_id: str, entity: str, id: str) -> List[User]: """ @@ -338,7 +338,7 @@ def __get_users(self, workspace_id: str, entity: str, id: str) -> List[User]: ENTITY_ID=id, ) # Hit PowerBi - LOGGER.info("Request to URL={}".format(user_list_endpoint)) + LOGGER.info(f"Request to URL={user_list_endpoint}") response = requests.get( url=user_list_endpoint, headers={Constant.Authorization: self.get_access_token()}, @@ -347,13 +347,12 @@ def __get_users(self, workspace_id: str, entity: str, id: str) -> List[User]: # Check if we got response from PowerBi if response.status_code != 200: LOGGER.warning( - "Failed to fetch user list from power-bi for, http_status={}, message={}".format( - response.status_code, response.text - ) + f"Failed to fetch user list from power-bi for, http_status={response.status_code}, message={response.text}" ) - LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id)) - LOGGER.info("{}={}".format(Constant.ENTITY, entity)) - LOGGER.info("{}={}".format(Constant.ID, id)) + + LOGGER.info(f"{Constant.WorkspaceId}={workspace_id}") + LOGGER.info(f"{Constant.ENTITY}={entity}") + LOGGER.info(f"{Constant.ID}={id}") raise ConnectionError("Failed to fetch the user list from the power-bi") users_dict: List[Any] = response.json()[Constant.VALUE] @@ -379,8 +378,8 @@ def __get_report(self, workspace_id: str, report_id: str) -> Any: """ if workspace_id is None or report_id is None: LOGGER.info("Input values are None") - LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id)) - LOGGER.info("{}={}".format(Constant.ReportId, report_id)) + LOGGER.info(f"{Constant.WorkspaceId}={workspace_id}") + LOGGER.info(f"{Constant.ReportId}={report_id}") return None report_get_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.REPORT_GET] @@ -391,7 +390,7 @@ def __get_report(self, workspace_id: str, report_id: str) -> Any: REPORT_ID=report_id, ) # Hit PowerBi - LOGGER.info("Request to report URL={}".format(report_get_endpoint)) + LOGGER.info(f"Request to report URL={report_get_endpoint}") response = requests.get( url=report_get_endpoint, headers={Constant.Authorization: self.get_access_token()}, @@ -401,8 +400,8 @@ def __get_report(self, workspace_id: str, report_id: str) -> Any: if response.status_code != 200: message: str = "Failed to fetch report from power-bi for" LOGGER.warning(message) - LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace_id)) - LOGGER.warning("{}={}".format(Constant.ReportId, report_id)) + LOGGER.warning(f"{Constant.WorkspaceId}={workspace_id}") + LOGGER.warning(f"{Constant.ReportId}={report_id}") raise ConnectionError(message) response_dict = response.json() @@ -440,7 +439,7 @@ def get_access_token(self): self.__access_token = "Bearer {}".format(auth_response.get("access_token")) - LOGGER.debug("{}={}".format(Constant.PBIAccessToken, self.__access_token)) + LOGGER.debug(f"{Constant.PBIAccessToken}={self.__access_token}") return self.__access_token @@ -464,7 +463,7 @@ def get_dashboards(self, workspace: Workspace) -> List[Dashboard]: POWERBI_BASE_URL=self.__config.base_url, WORKSPACE_ID=workspace.id ) # Hit PowerBi - LOGGER.info("Request to URL={}".format(dashboard_list_endpoint)) + LOGGER.info(f"Request to URL={dashboard_list_endpoint}") response = requests.get( url=dashboard_list_endpoint, headers={Constant.Authorization: self.get_access_token()}, @@ -473,7 +472,7 @@ def get_dashboards(self, workspace: Workspace) -> List[Dashboard]: # Check if we got response from PowerBi if response.status_code != 200: LOGGER.warning("Failed to fetch dashboard list from power-bi for") - LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace.id)) + LOGGER.warning(f"{Constant.WorkspaceId}={workspace.id}") raise ConnectionError( "Failed to fetch the dashboard list from the power-bi" ) @@ -505,8 +504,8 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any: """ if workspace_id is None or dataset_id is None: LOGGER.info("Input values are None") - LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id)) - LOGGER.info("{}={}".format(Constant.DatasetId, dataset_id)) + LOGGER.info(f"{Constant.WorkspaceId}={workspace_id}") + LOGGER.info(f"{Constant.DatasetId}={dataset_id}") return None dataset_get_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.DATASET_GET] @@ -517,7 +516,7 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any: DATASET_ID=dataset_id, ) # Hit PowerBi - LOGGER.info("Request to dataset URL={}".format(dataset_get_endpoint)) + LOGGER.info(f"Request to dataset URL={dataset_get_endpoint}") response = requests.get( url=dataset_get_endpoint, headers={Constant.Authorization: self.get_access_token()}, @@ -527,8 +526,8 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any: if response.status_code != 200: message: str = "Failed to fetch dataset from power-bi for" LOGGER.warning(message) - LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace_id)) - LOGGER.warning("{}={}".format(Constant.DatasetId, dataset_id)) + LOGGER.warning(f"{Constant.WorkspaceId}={workspace_id}") + LOGGER.warning(f"{Constant.DatasetId}={dataset_id}") raise ConnectionError(message) response_dict = response.json() @@ -558,7 +557,7 @@ def get_data_source(self, dataset: Dataset) -> Any: DATASET_ID=dataset.id, ) # Hit PowerBi - LOGGER.info("Request to datasource URL={}".format(datasource_get_endpoint)) + LOGGER.info(f"Request to datasource URL={datasource_get_endpoint}") response = requests.get( url=datasource_get_endpoint, headers={Constant.Authorization: self.get_access_token()}, @@ -568,18 +567,17 @@ def get_data_source(self, dataset: Dataset) -> Any: if response.status_code != 200: message: str = "Failed to fetch datasource from power-bi for" LOGGER.warning(message) - LOGGER.warning("{}={}".format(Constant.WorkspaceId, dataset.workspace_id)) - LOGGER.warning("{}={}".format(Constant.DatasetId, dataset.id)) + LOGGER.warning(f"{Constant.WorkspaceId}={dataset.workspace_id}") + LOGGER.warning(f"{Constant.DatasetId}={dataset.id}") raise ConnectionError(message) res = response.json() value = res["value"] if len(value) == 0: LOGGER.info( - "datasource is not found for dataset {}({})".format( - dataset.name, dataset.id - ) + f"datasource is not found for dataset {dataset.name}({dataset.id})" ) + return None # Consider only zero index datasource datasource_dict = value[0] @@ -646,11 +644,7 @@ def new_dataset_or_report(tile_instance: Any) -> dict: report_fields["createdFrom"] = PowerBiAPI.Tile.CreatedFrom.VISUALIZATION LOGGER.info( - "Tile {}({}) is created from {}".format( - tile_instance.get("title"), - tile_instance.get("id"), - report_fields["createdFrom"], - ) + f'Tile {tile_instance.get("title")}({tile_instance.get("id")}) is created from {report_fields["createdFrom"]}' ) return report_fields @@ -721,9 +715,7 @@ def create_scan_job(): ) if res.status_code not in (200, 202): - message = "API({}) return error code {} for workpace id({})".format( - scan_create_endpoint, res.status_code, workspace_id - ) + message = f"API({scan_create_endpoint}) return error code {res.status_code} for workspace id({workspace_id})" LOGGER.warning(message) @@ -740,46 +732,40 @@ def wait_for_scan_to_complete(scan_id: str, timeout: int) -> Boolean: minimum_sleep = 3 if timeout < minimum_sleep: LOGGER.info( - "Setting timeout to minimum_sleep time {} seconds".format( - minimum_sleep - ) + f"Setting timeout to minimum_sleep time {minimum_sleep} seconds" ) timeout = minimum_sleep - max_trial = int(timeout / minimum_sleep) - LOGGER.info("Max trial {}".format(max_trial)) + max_trial = timeout // minimum_sleep + LOGGER.info(f"Max trial {max_trial}") scan_get_endpoint = PowerBiAPI.API_ENDPOINTS[Constant.SCAN_GET] scan_get_endpoint = scan_get_endpoint.format( POWERBI_ADMIN_BASE_URL=self.__config.admin_base_url, SCAN_ID=scan_id ) - LOGGER.info("Hitting URL={}".format(scan_get_endpoint)) + LOGGER.info(f"Hitting URL={scan_get_endpoint}") trail = 1 while True: - LOGGER.info("Trial = {}".format(trail)) + LOGGER.info(f"Trial = {trail}") res = requests.get( scan_get_endpoint, headers={Constant.Authorization: self.get_access_token()}, ) if res.status_code != 200: - message = "API({}) return error code {} for scan id({})".format( - scan_get_endpoint, res.status_code, scan_id - ) + message = f"API({scan_get_endpoint}) return error code {res.status_code} for scan id({scan_id})" LOGGER.warning(message) raise ConnectionError(message) if res.json()["status"].upper() == "Succeeded".upper(): - LOGGER.info( - "Scan result is available for scan id({})".format(scan_id) - ) + LOGGER.info(f"Scan result is available for scan id({scan_id})") return True if trail == max_trial: break - LOGGER.info("Sleeping for {} seconds".format(minimum_sleep)) + LOGGER.info(f"Sleeping for {minimum_sleep} seconds") sleep(minimum_sleep) trail += 1 @@ -788,7 +774,7 @@ def wait_for_scan_to_complete(scan_id: str, timeout: int) -> Boolean: def get_scan_result(scan_id: str) -> dict: LOGGER.info("Fetching scan result") - LOGGER.info("{}={}".format(Constant.SCAN_ID, scan_id)) + LOGGER.info(f"{Constant.SCAN_ID}={scan_id}") scan_result_get_endpoint = PowerBiAPI.API_ENDPOINTS[ Constant.SCAN_RESULT_GET ] @@ -796,15 +782,13 @@ def get_scan_result(scan_id: str) -> dict: POWERBI_ADMIN_BASE_URL=self.__config.admin_base_url, SCAN_ID=scan_id ) - LOGGER.info("Hittin URL={}".format(scan_result_get_endpoint)) + LOGGER.info(f"Hitting URL={scan_result_get_endpoint}") res = requests.get( scan_result_get_endpoint, headers={Constant.Authorization: self.get_access_token()}, ) if res.status_code != 200: - message = "API({}) return error code {} for scan id({})".format( - scan_result_get_endpoint, res.status_code, scan_id - ) + message = f"API({scan_result_get_endpoint}) return error code {res.status_code} for scan id({scan_id})" LOGGER.warning(message) @@ -821,10 +805,9 @@ def json_to_dataset_map(scan_result: dict) -> dict: if datasets is None or len(datasets) == 0: LOGGER.warning( - "Workspace {}({}) does not have datasets".format( - scan_result["name"], scan_result["id"] - ) + f'Workspace {scan_result["name"]}({scan_result["id"]}) does not have datasets' ) + LOGGER.info("Returning empty datasets") return dataset_map @@ -844,18 +827,15 @@ def json_to_dataset_map(scan_result: dict) -> dict: and dataset_instance.datasource.metadata.is_relational is True ): LOGGER.info( - "Processing tables attribute for dataset {}({})".format( - dataset_instance.name, dataset_instance.id - ) + f"Processing tables attribute for dataset {dataset_instance.name}({dataset_instance.id})" ) for table in dataset_dict["tables"]: if "Value.NativeQuery(" in table["source"][0]["expression"]: LOGGER.warning( - "Table {} is created from Custom SQL. Ignoring in processing".format( - table["name"] - ) + f'Table {table["name"]} is created from Custom SQL. Ignoring in processing' ) + continue # PowerBi table name contains schema name and table name. Format is @@ -976,28 +956,24 @@ def __to_datahub_dataset( or dataset.datasource.metadata.is_relational is False ): LOGGER.warning( - "Dataset {}({}) is not created from relational datasource".format( - dataset.name, dataset.id - ) + f"Dataset {dataset.name}({dataset.id}) is not created from relational datasource" ) + return dataset_mcps LOGGER.info( - "Converting dataset={}(id={}) to datahub dataset".format( - dataset.name, dataset.id - ) + f"Converting dataset={dataset.name}(id={dataset.id}) to datahub dataset" ) for table in dataset.tables: # Create an URN for dataset ds_urn = builder.make_dataset_urn( platform=self.__config.dataset_type_mapping[dataset.datasource.type], - name="{}.{}.{}".format( - dataset.datasource.database, table.schema_name, table.name - ), + name=f"{dataset.datasource.database}.{table.schema_name}.{table.name}", env=self.__config.env, ) - LOGGER.info("{}={}".format(Constant.Dataset_URN, ds_urn)) + + LOGGER.info(f"{Constant.Dataset_URN}={ds_urn}") # Create datasetProperties mcp ds_properties = DatasetPropertiesClass(description=table.name) @@ -1206,9 +1182,7 @@ def to_datahub_user( """ LOGGER.info( - "Converting user {}(id={}) to datahub's user".format( - user.displayName, user.id - ) + f"Converting user {user.displayName}(id={user.id}) to datahub's user" ) # Create an URN for user @@ -1266,10 +1240,10 @@ def to_datahub_chart( chart_mcps = [] # Return empty list if input list is empty - if len(tiles) == 0: + if not tiles: return [], [] - LOGGER.info("Converting tiles(count={}) to charts".format(len(tiles))) + LOGGER.info(f"Converting tiles(count={len(tiles)}) to charts") for tile in tiles: if tile is None: @@ -1292,7 +1266,7 @@ def to_datahub_work_units( mcps = [] LOGGER.info( - "Converting dashboard={} to datahub dashboard".format(dashboard.displayName) + f"Converting dashboard={dashboard.displayName} to datahub dashboard" ) # Convert user to CorpUser @@ -1391,12 +1365,10 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: self.reporter.report_dashboards_scanned() self.reporter.report_charts_scanned(count=len(dashboard.tiles)) except Exception as e: - message = "Error ({}) occurred while loading dashboard {}(id={}) tiles.".format( - e, dashboard.displayName, dashboard.id - ) + message = f"Error ({e}) occurred while loading dashboard {dashboard.displayName}(id={dashboard.id}) tiles." + LOGGER.exception(message, e) self.reporter.report_warning(dashboard.id, message) - # Convert PowerBi Dashboard and child entities to Datahub work unit to ingest into Datahub workunits = self.mapper.to_datahub_work_units(dashboard) for workunit in workunits: diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py index e4d9a505ea7210..ffc0253a070b2d 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py @@ -98,7 +98,7 @@ def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext): self.platform: str = "pulsar" self.config: PulsarSourceConfig = config self.report: PulsarSourceReport = PulsarSourceReport() - self.base_url: str = self.config.web_service_url + "/admin/v2" + self.base_url: str = f"{self.config.web_service_url}/admin/v2" self.tenants: List[str] = config.tenants if ( @@ -120,7 +120,7 @@ def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext): if self._is_oauth_authentication_configured(): # Get OpenId configuration from issuer, e.g. token_endpoint oid_config_url = ( - "%s/.well-known/openid-configuration" % self.config.issuer_url + f"{self.config.issuer_url}/.well-known/openid-configuration" ) oid_config_response = requests.get( oid_config_url, verify=False, allow_redirects=False @@ -130,8 +130,7 @@ def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext): self.config.oid_config.update(oid_config_response.json()) else: logger.error( - "Unexpected response while getting discovery document using %s : %s" - % (oid_config_url, oid_config_response) + f"Unexpected response while getting discovery document using {oid_config_url} : {oid_config_response}" ) if "token_endpoint" not in self.config.oid_config: @@ -325,15 +324,14 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: # Report the Pulsar broker version we are communicating with self.report.report_pulsar_version( self.session.get( - "%s/brokers/version" % self.base_url, - timeout=self.config.timeout, + f"{self.base_url}/brokers/version", timeout=self.config.timeout ).text ) # If no tenants are provided, request all tenants from cluster using /admin/v2/tenants endpoint. # Requesting cluster tenant information requires superuser privileges if not self.tenants: - self.tenants = self._get_pulsar_metadata(self.base_url + "/tenants") or [] + self.tenants = self._get_pulsar_metadata(f"{self.base_url}/tenants") or [] # Initialize counters self.report.tenants_scanned = 0 @@ -346,9 +344,10 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: # Get namespaces belonging to a tenant, /admin/v2/%s/namespaces # A tenant admin role has sufficient privileges to perform this action namespaces = ( - self._get_pulsar_metadata(self.base_url + "/namespaces/%s" % tenant) + self._get_pulsar_metadata(f"{self.base_url}/namespaces/{tenant}") or [] ) + for namespace in namespaces: self.report.namespaces_scanned += 1 if self.config.namespace_patterns.allowed(namespace): @@ -406,14 +405,10 @@ def _add_topic_to_checkpoint(self, topic: str) -> None: ) def _is_token_authentication_configured(self) -> bool: - if self.config.token is not None: - return True - return False + return self.config.token is not None def _is_oauth_authentication_configured(self) -> bool: - if self.config.issuer_url is not None: - return True - return False + return self.config.issuer_url is not None def _get_schema_and_fields( self, pulsar_topic: PulsarTopic, is_key_schema: bool @@ -421,10 +416,9 @@ def _get_schema_and_fields( pulsar_schema: Optional[PulsarSchema] = None - schema_url = self.base_url + "/schemas/%s/%s/%s/schema" % ( - pulsar_topic.tenant, - pulsar_topic.namespace, - pulsar_topic.topic, + schema_url = ( + self.base_url + + f"/schemas/{pulsar_topic.tenant}/{pulsar_topic.namespace}/{pulsar_topic.topic}/schema" ) schema_payload = self._get_pulsar_metadata(schema_url) @@ -449,7 +443,7 @@ def _get_schema_fields( ) -> List[SchemaField]: # Parse the schema and convert it to SchemaFields. fields: List[SchemaField] = [] - if schema.schema_type == "AVRO" or schema.schema_type == "JSON": + if schema.schema_type in ["AVRO", "JSON"]: # Extract fields from schema and get the FQN for the schema fields = schema_util.avro_schema_to_mce_fields( schema.schema_str, is_key_schema=is_key_schema @@ -465,6 +459,7 @@ def _get_schema_metadata( self, pulsar_topic: PulsarTopic, platform_urn: str ) -> Tuple[Optional[PulsarSchema], Optional[SchemaMetadata]]: + # FIXME: Type annotations are not working for this function. schema, fields = self._get_schema_and_fields( pulsar_topic=pulsar_topic, is_key_schema=False ) # type: Tuple[Optional[PulsarSchema], List[SchemaField]] diff --git a/metadata-ingestion/src/datahub/ingestion/source/redash.py b/metadata-ingestion/src/datahub/ingestion/source/redash.py index 2abd61849ac260..aa1d093e02e85f 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/redash.py +++ b/metadata-ingestion/src/datahub/ingestion/source/redash.py @@ -203,29 +203,33 @@ def get_full_qualified_name(self, database_name: str, table_name: str) -> str: def get_full_qualified_name(platform: str, database_name: str, table_name: str) -> str: - if platform == "postgres": - full_qualified_name = PostgresQualifiedNameParser().get_full_qualified_name( + if platform == "athena": + return AthenaQualifiedNameParser().get_full_qualified_name( database_name, table_name ) - elif platform == "mysql": - full_qualified_name = MysqlQualifiedNameParser().get_full_qualified_name( + + elif platform == "bigquery": + return BigqueryQualifiedNameParser().get_full_qualified_name( database_name, table_name ) + elif platform == "mssql": - full_qualified_name = MssqlQualifiedNameParser().get_full_qualified_name( + return MssqlQualifiedNameParser().get_full_qualified_name( database_name, table_name ) - elif platform == "athena": - full_qualified_name = AthenaQualifiedNameParser().get_full_qualified_name( + + elif platform == "mysql": + return MysqlQualifiedNameParser().get_full_qualified_name( database_name, table_name ) - elif platform == "bigquery": - full_qualified_name = BigqueryQualifiedNameParser().get_full_qualified_name( + + elif platform == "postgres": + return PostgresQualifiedNameParser().get_full_qualified_name( database_name, table_name ) + else: - full_qualified_name = f"{database_name}.{table_name}" - return full_qualified_name + return f"{database_name}.{table_name}" class RedashConfig(ConfigModel): @@ -405,8 +409,7 @@ def _get_platform_based_on_datasource(self, data_source: Dict) -> str: map = REDASH_DATA_SOURCE_TO_DATAHUB_MAP.get( data_source_type, {"platform": DEFAULT_DATA_SOURCE_PLATFORM} ) - platform = map.get("platform", DEFAULT_DATA_SOURCE_PLATFORM) - return platform + return map.get("platform", DEFAULT_DATA_SOURCE_PLATFORM) return DEFAULT_DATA_SOURCE_PLATFORM def _get_database_name_based_on_datasource( @@ -597,7 +600,7 @@ def _process_dashboard_response( # Tested the same with a Redash instance dashboard_id = dashboard_response["id"] dashboard_data = self.client._get( - "api/dashboards/{}".format(dashboard_id) + f"api/dashboards/{dashboard_id}" ).json() except Exception: # This does not work in our testing but keeping for now because @@ -686,9 +689,7 @@ def _get_chart_snapshot(self, query_data: Dict, viz_data: Dict) -> ChartSnapshot chart_type = self._get_chart_type_from_viz_data(viz_data) query_id = query_data.get("id") chart_url = f"{self.config.connect_uri}/queries/{query_id}#{viz_id}" - description = ( - viz_data.get("description", "") if viz_data.get("description", "") else "" - ) + description = viz_data.get("description", "") or "" data_source_id = query_data.get("data_source_id") data_source = self._get_chart_data_source(data_source_id) data_source_type = data_source.get("type") diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py index 5fed3898df322e..ab979512524ac2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py @@ -423,22 +423,20 @@ def _create_upstream_table_lineage( table_path = None if project and datasource.get("name"): - table_name = table.get("name") if table.get("name") else table["id"] + table_name = table.get("name") or table["id"] table_path = f"{project.replace('/', REPLACE_SLASH_CHAR)}/{datasource['name']}/{table_name}" self.upstream_tables[table_urn] = ( table.get("columns", []), table_path, - table.get("isEmbedded") if table.get("isEmbedded") else False, + table.get("isEmbedded") or False, ) return upstream_tables def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]: - count_on_query = self.config.page_size - custom_sql_filter = "idWithin: {}".format( - json.dumps(self.custom_sql_ids_being_used) - ) + count_on_query = len(self.custom_sql_ids_being_used) + custom_sql_filter = f"idWithin: {json.dumps(self.custom_sql_ids_being_used)}" custom_sql_connection, total_count, has_next_page = self.get_connection_object( custom_sql_graphql_query, "customSQLTablesConnection", custom_sql_filter ) @@ -516,7 +514,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]: dataset_snapshot.aspects.append(schema_metadata) # Browse path - csql_name = csql.get("name") if csql.get("name") else csql_id + csql_name = csql.get("name") or csql_id if project and datasource_name: browse_paths = BrowsePathsClass( @@ -630,7 +628,6 @@ def _get_schema_metadata_for_datasource( self, datasource_fields: List[dict] ) -> Optional[SchemaMetadata]: fields = [] - schema_metadata = None for field in datasource_fields: # check datasource - custom sql relations from a field being referenced self._track_custom_sql_ids(field) @@ -657,8 +654,8 @@ def _get_schema_metadata_for_datasource( ) fields.append(schema_field) - if fields: - schema_metadata = SchemaMetadata( + return ( + SchemaMetadata( schemaName="test", platform=f"urn:li:dataPlatform:{self.platform}", version=0, @@ -666,8 +663,9 @@ def _get_schema_metadata_for_datasource( hash="", platformSchema=OtherSchema(rawSchema=""), ) - - return schema_metadata + if fields + else None + ) def get_metadata_change_event( self, snap_shot: Union["DatasetSnapshot", "DashboardSnapshot", "ChartSnapshot"] @@ -722,9 +720,7 @@ def emit_datasource( aspects=[], ) - datasource_name = ( - datasource.get("name") if datasource.get("name") else datasource_id - ) + datasource_name = datasource.get("name") or datasource_id if is_embedded_ds and workbook and workbook.get("name"): datasource_name = f"{workbook['name']}/{datasource_name}" # Browse path @@ -804,10 +800,8 @@ def emit_datasource( ) def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]: - count_on_query = self.config.page_size - datasource_filter = "idWithin: {}".format( - json.dumps(self.datasource_ids_being_used) - ) + count_on_query = len(self.datasource_ids_being_used) + datasource_filter = f"idWithin: {json.dumps(self.datasource_ids_being_used)}" ( published_datasource_conn, total_count, @@ -958,7 +952,7 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]: chart_snapshot.aspects.append(chart_info) if workbook.get("projectName") and workbook.get("name"): - sheet_name = sheet.get("name") if sheet.get("name") else sheet["id"] + sheet_name = sheet.get("name") or sheet["id"] # Browse path browse_path = BrowsePathsClass( paths=[ @@ -1075,7 +1069,7 @@ def emit_dashboards(self, workbook: Dict) -> Iterable[MetadataWorkUnit]: dashboard_snapshot.aspects.append(dashboard_info_class) if workbook.get("projectName") and workbook.get("name"): - dashboard_name = title if title else dashboard["id"] + dashboard_name = title or dashboard["id"] # browse path browse_paths = BrowsePathsClass( paths=[ @@ -1129,7 +1123,7 @@ def _get_schema(self, schema_provided: str, database: str, fullName: str) -> str def _extract_schema_from_fullName(self, fullName: str) -> str: # fullName is observed to be in format [schemaName].[tableName] # OR simply tableName OR [tableName] - if fullName.startswith("[") and fullName.find("].[") >= 0: + if fullName.startswith("[") and "].[" in fullName: return fullName[1 : fullName.index("]")] return "" diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py index 941cd9a9ab8934..6296af05d14616 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py +++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py @@ -433,8 +433,7 @@ def make_table_urn( # if there are more than 3 tokens, just take the final 3 fully_qualified_table_name = ".".join(fully_qualified_table_name.split(".")[-3:]) - urn = builder.make_dataset_urn(platform, fully_qualified_table_name, env) - return urn + return builder.make_dataset_urn(platform, fully_qualified_table_name, env) def make_description_from_params(description, formula): @@ -451,10 +450,9 @@ def make_description_from_params(description, formula): def get_field_value_in_sheet(field, field_name): if field.get("__typename", "") == "DatasourceField": - field = field.get("remoteField") if field.get("remoteField") else {} + field = field.get("remoteField") or {} - field_value = field.get(field_name, "") - return field_value + return field.get(field_name, "") def get_unique_custom_sql(custom_sql_list: List[dict]) -> List[dict]: @@ -506,6 +504,4 @@ def query_metadata(server, main_query, connection_name, first, offset, qry_filte filter=qry_filter, main_query=main_query, ) - query_result = server.metadata.query(query) - - return query_result + return server.metadata.query(query) diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py index e21c6fc3ea42ba..e1bdf072787cd5 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py @@ -82,7 +82,7 @@ class PulsarSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase): ) exclude_individual_partitions: bool = Field( default=True, - description="Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datesets.", + description="Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.", ) tenants: List[str] = Field( diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py index b00ac9cdfb41b8..984b8fa917d647 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py @@ -90,7 +90,7 @@ class SnowflakeProvisionRoleConfig(ConfigModel): @pydantic.validator("admin_username", always=True) def username_not_empty(cls, v, values, **kwargs): v_str: str = str(v) - if v_str.strip() == "": + if not v_str.strip(): raise ValueError("username is empty") return v @@ -180,60 +180,55 @@ def authenticator_type_is_valid(cls, v, values, field): f"unsupported authenticator type '{v}' was provided," f" use one of {list(VALID_AUTH_TYPES.keys())}" ) - else: - if v == "KEY_PAIR_AUTHENTICATOR": - # If we are using key pair auth, we need the private key path and password to be set - if values.get("private_key_path") is None: - raise ValueError( - f"'private_key_path' was none " - f"but should be set when using {v} authentication" - ) - elif v == "OAUTH_AUTHENTICATOR": - if values.get("oauth_config") is None: - raise ValueError( - f"'oauth_config' is none but should be set when using {v} authentication" - ) - if values.get("oauth_config").provider is None: - raise ValueError( - f"'oauth_config.provider' is none " - f"but should be set when using {v} authentication" - ) - if values.get("oauth_config").client_id is None: - raise ValueError( - f"'oauth_config.client_id' is none " - f"but should be set when using {v} authentication" - ) - if values.get("oauth_config").scopes is None: + if v == "KEY_PAIR_AUTHENTICATOR": + # If we are using key pair auth, we need the private key path and password to be set + if values.get("private_key_path") is None: + raise ValueError( + f"'private_key_path' was none " + f"but should be set when using {v} authentication" + ) + elif v == "OAUTH_AUTHENTICATOR": + if values.get("oauth_config") is None: + raise ValueError( + f"'oauth_config' is none but should be set when using {v} authentication" + ) + if values.get("oauth_config").provider is None: + raise ValueError( + f"'oauth_config.provider' is none " + f"but should be set when using {v} authentication" + ) + if values.get("oauth_config").client_id is None: + raise ValueError( + f"'oauth_config.client_id' is none " + f"but should be set when using {v} authentication" + ) + if values.get("oauth_config").scopes is None: + raise ValueError( + f"'oauth_config.scopes' was none " + f"but should be set when using {v} authentication" + ) + if values.get("oauth_config").authority_url is None: + raise ValueError( + f"'oauth_config.authority_url' was none " + f"but should be set when using {v} authentication" + ) + if values.get("oauth_config").use_certificate is True: + if values.get("oauth_config").base64_encoded_oauth_private_key is None: raise ValueError( - f"'oauth_config.scopes' was none " - f"but should be set when using {v} authentication" + "'base64_encoded_oauth_private_key' was none " + "but should be set when using certificate for oauth_config" ) - if values.get("oauth_config").authority_url is None: + if values.get("oauth").base64_encoded_oauth_public_key is None: raise ValueError( - f"'oauth_config.authority_url' was none " - f"but should be set when using {v} authentication" + "'base64_encoded_oauth_public_key' was none" + "but should be set when using use_certificate true for oauth_config" ) - if values.get("oauth_config").use_certificate is True: - if ( - values.get("oauth_config").base64_encoded_oauth_private_key - is None - ): - raise ValueError( - "'base64_encoded_oauth_private_key' was none " - "but should be set when using certificate for oauth_config" - ) - if values.get("oauth").base64_encoded_oauth_public_key is None: - raise ValueError( - "'base64_encoded_oauth_public_key' was none" - "but should be set when using use_certificate true for oauth_config" - ) - else: - if values.get("oauth_config").client_secret is None: - raise ValueError( - "'oauth_config.client_secret' was none " - "but should be set when using use_certificate false for oauth_config" - ) - logger.info(f"using authenticator type '{v}'") + elif values.get("oauth_config").client_secret is None: + raise ValueError( + "'oauth_config.client_secret' was none " + "but should be set when using use_certificate false for oauth_config" + ) + logger.info(f"using authenticator type '{v}'") return v @pydantic.validator("include_view_lineage") diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py index 05dc636d312c2e..9abee691ca9bf8 100644 --- a/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py +++ b/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py @@ -114,7 +114,7 @@ class BigQueryUsageConfig(BigQueryBaseConfig, DatasetSourceConfigBase, BaseUsage credential: Optional[BigQueryCredential] = pydantic.Field( default=None, - description="Bigquery credential. Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set. See this example recipe for details", + description="Bigquery credential. Required if GOOGLE_APPLICATION_CREDENTIALS environment variable is not set. See this example recipe for details", ) _credentials_path: Optional[str] = pydantic.PrivateAttr(None) temp_table_dataset_prefix: str = pydantic.Field( diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py index ecc1dcfc5fd31f..82cfecbddfed39 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py @@ -132,8 +132,8 @@ def _should_process( return True # fall through, no entity type matched return False - elif isinstance(record, MetadataChangeProposalWrapper) or isinstance( - record, MetadataChangeProposalClass + elif isinstance( + record, (MetadataChangeProposalWrapper, MetadataChangeProposalClass) ): return record.entityType in entity_types diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/mark_dataset_status.py b/metadata-ingestion/src/datahub/ingestion/transformer/mark_dataset_status.py index bae8d0e07a80ab..d833e9bcc75a64 100644 --- a/metadata-ingestion/src/datahub/ingestion/transformer/mark_dataset_status.py +++ b/metadata-ingestion/src/datahub/ingestion/transformer/mark_dataset_status.py @@ -40,6 +40,6 @@ def transform_aspect( self, entity_urn: str, aspect_name: str, aspect: Optional[builder.Aspect] ) -> Optional[builder.Aspect]: assert aspect is None or isinstance(aspect, StatusClass) - status_aspect: StatusClass = aspect if aspect else StatusClass(removed=None) + status_aspect: StatusClass = aspect or StatusClass(removed=None) status_aspect.removed = self.config.removed return status_aspect # type: ignore diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py index 98b344c0a06cc7..5dca3541493156 100644 --- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py +++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py @@ -190,12 +190,11 @@ def _run( result = "DataHub notification succeeded" except Exception as e: result = "DataHub notification failed" - if self.graceful_exceptions: - logger.error(e) - logger.info("Supressing error because graceful_exceptions is set") - else: + if not self.graceful_exceptions: raise + logger.error(e) + logger.info("Suppressing error because graceful_exceptions is set") return {"datahub_notification_result": result} def get_assertions_with_results( @@ -224,7 +223,7 @@ def get_assertions_with_results( for result in validation_result_suite.results: expectation_config = result["expectation_config"] expectation_type = expectation_config["expectation_type"] - success = True if result["success"] else False + success = bool(result["success"]) kwargs = { k: v for k, v in expectation_config["kwargs"].items() if k != "batch_id" } @@ -271,8 +270,6 @@ def get_assertions_with_results( # TODO: Understand why their run time is incorrect. run_time = run_id.run_time.astimezone(timezone.utc) - assertionResults = [] - evaluation_parameters = ( { k: convert_to_string(v) @@ -328,8 +325,7 @@ def get_assertions_with_results( ) if ds.get("partitionSpec") is not None: assertionResult.partitionSpec = ds.get("partitionSpec") - assertionResults.append(assertionResult) - + assertionResults = [assertionResult] assertions_with_results.append( { "assertionUrn": assertionUrn, @@ -631,8 +627,9 @@ def get_dataset_partitions(self, batch_identifier, data_asset): ].batch_request.runtime_parameters["query"] partitionSpec = PartitionSpecClass( type=PartitionTypeClass.QUERY, - partition="Query_" + builder.datahub_guid(query), + partition=f"Query_{builder.datahub_guid(query)}", ) + batchSpec = BatchSpec( nativeBatchId=batch_identifier, query=query, @@ -678,9 +675,9 @@ def get_dataset_partitions(self, batch_identifier, data_asset): return dataset_partitions def get_platform_instance(self, datasource_name): - if self.platform_instance_map and datasource_name in self.platform_instance_map: - return self.platform_instance_map[datasource_name] if self.platform_instance_map: + if datasource_name in self.platform_instance_map: + return self.platform_instance_map[datasource_name] warn( f"Datasource {datasource_name} is not present in platform_instance_map" ) @@ -698,21 +695,21 @@ def make_dataset_urn_from_sqlalchemy_uri( schema_name, table_name = table_name.split(".")[-2:] if data_platform in ["redshift", "postgres"]: - schema_name = schema_name if schema_name else "public" + schema_name = schema_name or "public" if url_instance.database is None: warn( f"DataHubValidationAction failed to locate database name for {data_platform}." ) return None - schema_name = "{}.{}".format(url_instance.database, schema_name) + schema_name = f"{url_instance.database}.{schema_name}" elif data_platform == "mssql": - schema_name = schema_name if schema_name else "dbo" + schema_name = schema_name or "dbo" if url_instance.database is None: warn( f"DataHubValidationAction failed to locate database name for {data_platform}." ) return None - schema_name = "{}.{}".format(url_instance.database, schema_name) + schema_name = f"{url_instance.database}.{schema_name}" elif data_platform in ["trino", "snowflake"]: if schema_name is None or url_instance.database is None: warn( @@ -738,16 +735,16 @@ def make_dataset_urn_from_sqlalchemy_uri( ) ) return None - schema_name = "{}.{}".format(url_instance.host, url_instance.database) + schema_name = f"{url_instance.host}.{url_instance.database}" - schema_name = schema_name if schema_name else url_instance.database + schema_name = schema_name or url_instance.database if schema_name is None: warn( f"DataHubValidationAction failed to locate schema name for {data_platform}." ) return None - dataset_name = "{}.{}".format(schema_name, table_name) + dataset_name = f"{schema_name}.{table_name}" dataset_urn = builder.make_dataset_urn_with_platform_instance( platform=data_platform, diff --git a/metadata-ingestion/src/datahub/telemetry/stats.py b/metadata-ingestion/src/datahub/telemetry/stats.py index ea48aab14c77db..e76580d677588c 100644 --- a/metadata-ingestion/src/datahub/telemetry/stats.py +++ b/metadata-ingestion/src/datahub/telemetry/stats.py @@ -27,9 +27,7 @@ def calculate_percentiles( min(i, size - 1) for i in percentile_indices ] # in case of rounding errors - values = {p: data_sorted[i] for p, i in zip(percentiles, percentile_indices)} - - return values + return {p: data_sorted[i] for p, i in zip(percentiles, percentile_indices)} def discretize(statistic: Union[float, int]) -> int: diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py index b95df169414320..0a346d09373850 100644 --- a/metadata-ingestion/src/datahub/telemetry/telemetry.py +++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py @@ -273,7 +273,7 @@ def get_full_class_name(obj): module = obj.__class__.__module__ if module is None or module == str.__class__.__module__: return obj.__class__.__name__ - return module + "." + obj.__class__.__name__ + return f"{module}.{obj.__class__.__name__}" def with_telemetry(func: Callable[..., T]) -> Callable[..., T]: diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py index 32b56c1cb20218..b6ca0486c3a2cf 100644 --- a/metadata-ingestion/src/datahub/upgrade/upgrade.py +++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py @@ -1,3 +1,4 @@ +import contextlib import logging from datetime import datetime, timedelta, timezone from functools import wraps @@ -221,7 +222,7 @@ def maybe_print_upgrade_message( # noqa: C901 encourage_cli_upgrade = False client_server_compat = 0 encourage_quickstart_upgrade = False - try: + with contextlib.suppress(Exception): version_stats = retrieve_versions(server) if not version_stats: return @@ -261,12 +262,9 @@ def maybe_print_upgrade_message( # noqa: C901 ): encourage_quickstart_upgrade = True - except Exception: - pass - # Compute recommendations and print one if client_server_compat < 0: - try: + with contextlib.suppress(Exception): assert version_stats print( colored("❗Client-Server Incompatible❗", "yellow"), @@ -279,10 +277,8 @@ def maybe_print_upgrade_message( # noqa: C901 "cyan", ), ) - except Exception: - pass elif client_server_compat > 0: - try: + with contextlib.suppress(Exception): assert version_stats print( colored("❗Client-Server Incompatible❗", "red"), @@ -295,12 +291,8 @@ def maybe_print_upgrade_message( # noqa: C901 "cyan", ), ) - except Exception: - pass - - # we only encourage upgrades if we think client_server is currently compatible elif client_server_compat == 0 and encourage_cli_upgrade: - try: + with contextlib.suppress(Exception): print( colored("💡 Upgrade cli!", "yellow"), colored( @@ -308,9 +300,6 @@ def maybe_print_upgrade_message( # noqa: C901 "cyan", ), ) - except Exception: - pass - elif encourage_quickstart_upgrade: try: assert version_stats diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py index fc9680ba642d42..6e8d8da5f3fb82 100644 --- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py +++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py @@ -53,9 +53,12 @@ def _parse_datatype_string( parts = HiveColumnToAvroConverter._ignore_brackets_split(s[4:-1], ",") if len(parts) != 2: raise ValueError( - "The map type string format is: 'map', " - + "but got: %s" % s + ( + "The map type string format is: 'map', " + + f"but got: {s}" + ) ) + kt = HiveColumnToAvroConverter._parse_datatype_string(parts[0]) vt = HiveColumnToAvroConverter._parse_datatype_string(parts[1]) # keys are assumed to be strings in avro map @@ -103,9 +106,12 @@ def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]: name_and_type = HiveColumnToAvroConverter._ignore_brackets_split(part, ":") if len(name_and_type) != 2: raise ValueError( - "The struct field string format is: 'field_name:field_type', " - + "but got: %s" % part + ( + "The struct field string format is: 'field_name:field_type', " + + f"but got: {part}" + ) ) + field_name = name_and_type[0].strip() if field_name.startswith("`"): if field_name[-1] != "`": @@ -117,16 +123,15 @@ def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]: fields.append({"name": field_name, "type": field_type}) if kwargs.get("ustruct_seqn") is not None: - struct_name = "__structn_{}_{}".format( - kwargs["ustruct_seqn"], str(uuid.uuid4()).replace("-", "") - ) + struct_name = f'__structn_{kwargs["ustruct_seqn"]}_{str(uuid.uuid4()).replace("-", "")}' + else: - struct_name = "__struct_{}".format(str(uuid.uuid4()).replace("-", "")) + struct_name = f'__struct_{str(uuid.uuid4()).replace("-", "")}' return { "type": "record", "name": struct_name, "fields": fields, - "native_data_type": "struct<{}>".format(s), + "native_data_type": f"struct<{s}>", } @staticmethod @@ -193,7 +198,7 @@ def _ignore_brackets_split(s: str, separator: str) -> List[str]: buf += c elif c in HiveColumnToAvroConverter._BRACKETS.values(): if level == 0: - raise ValueError("Brackets are not correctly paired: %s" % s) + raise ValueError(f"Brackets are not correctly paired: {s}") level -= 1 buf += c elif c == separator and level > 0: @@ -205,7 +210,7 @@ def _ignore_brackets_split(s: str, separator: str) -> List[str]: buf += c if len(buf) == 0: - raise ValueError("The %s cannot be the last char: %s" % (separator, s)) + raise ValueError(f"The {separator} cannot be the last char: {s}") parts.append(buf) return parts diff --git a/metadata-ingestion/src/datahub/utilities/memory_leak_detector.py b/metadata-ingestion/src/datahub/utilities/memory_leak_detector.py index b5fa3c3a723ea4..ef0db205b72ac9 100644 --- a/metadata-ingestion/src/datahub/utilities/memory_leak_detector.py +++ b/metadata-ingestion/src/datahub/utilities/memory_leak_detector.py @@ -12,7 +12,7 @@ def _trace_has_file(trace: tracemalloc.Traceback, file_pattern: str) -> bool: - for frame_index in range(0, len(trace)): + for frame_index in range(len(trace)): cur_frame = trace[frame_index] if fnmatch.fnmatch(cur_frame.filename, file_pattern): return True @@ -99,8 +99,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any: _init_leak_detection() try: - res = func(*args, **kwargs) - return res + return func(*args, **kwargs) finally: if detect_leaks: _perform_leak_detection() diff --git a/metadata-ingestion/src/datahub/utilities/server_config_util.py b/metadata-ingestion/src/datahub/utilities/server_config_util.py index c919a1356f2642..40841321ad2778 100644 --- a/metadata-ingestion/src/datahub/utilities/server_config_util.py +++ b/metadata-ingestion/src/datahub/utilities/server_config_util.py @@ -3,7 +3,7 @@ from datahub.telemetry.telemetry import set_telemetry_enable # Only to be written to for logging server related information -global_debug: Dict[str, Any] = dict() +global_debug: Dict[str, Any] = {} def set_gms_config(config: Dict) -> Any: diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py index 80ea7cc31455cd..6fe57b297d4528 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py +++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py @@ -1,3 +1,4 @@ +import contextlib import logging import re import unittest @@ -7,15 +8,12 @@ from sqllineage.core.holders import Column, SQLLineageHolder from sqllineage.exceptions import SQLLineageException -try: +with contextlib.suppress(ImportError): import sqlparse from networkx import DiGraph from sqllineage.core import LineageAnalyzer import datahub.utilities.sqllineage_patch -except ImportError: - pass - logger = logging.getLogger(__name__) @@ -97,7 +95,7 @@ def __init__(self, sql_query: str) -> None: logger.error(f"SQL lineage analyzer error '{e}' for query: '{self._sql}") def get_tables(self) -> List[str]: - result: List[str] = list() + result: List[str] = [] if self._sql_holder is None: logger.error("sql holder not present so cannot get tables") return result @@ -135,12 +133,10 @@ def get_columns(self) -> List[str]: result.add(str(column.raw_name)) # Reverting back all the previously renamed words which confuses the parser - result = set(["date" if c == self._DATE_SWAP_TOKEN else c for c in result]) - result = set( - [ - "timestamp" if c == self._TIMESTAMP_SWAP_TOKEN else c - for c in list(result) - ] - ) + result = {"date" if c == self._DATE_SWAP_TOKEN else c for c in result} + result = { + "timestamp" if c == self._TIMESTAMP_SWAP_TOKEN else c for c in list(result) + } + # swap back renamed date column return list(result) diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py index eb0bc0ec8262f4..28b5082ccbb3b2 100644 --- a/metadata-ingestion/src/datahub/utilities/sql_parser.py +++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py @@ -1,3 +1,4 @@ +import contextlib import logging import multiprocessing import re @@ -9,11 +10,8 @@ from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl -try: +with contextlib.suppress(ImportError): from sql_metadata import Parser as MetadataSQLParser -except ImportError: - pass - logger = logging.getLogger(__name__) diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py index 0474f4ec7d3d68..947f5e30d62c89 100644 --- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py +++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py @@ -108,8 +108,7 @@ def get_query_columns(query: Any) -> List[Any]: try: # inner_columns will be more accurate if the column names are unnamed, # since .columns will remove the "duplicates". - inner_columns = list(query.inner_columns) - return inner_columns + return list(query.inner_columns) except AttributeError: return list(query.columns) diff --git a/metadata-ingestion/src/datahub/utilities/urns/urn.py b/metadata-ingestion/src/datahub/utilities/urns/urn.py index 7498cc1532c66e..479e74331fd9b3 100644 --- a/metadata-ingestion/src/datahub/utilities/urns/urn.py +++ b/metadata-ingestion/src/datahub/utilities/urns/urn.py @@ -21,7 +21,7 @@ class Urn: def __init__( self, entity_type: str, entity_id: List[str], urn_domain: str = LI_DOMAIN ): - if len(entity_id) == 0: + if not entity_id: raise InvalidUrnError("Empty entity id.") self._validate_entity_type(entity_type) self._validate_entity_id(entity_id) @@ -122,9 +122,9 @@ def _get_entity_id_from_str(entity_id: str) -> List[str]: part_start = i + 1 if start_paren_count != 0: - raise InvalidUrnError(f"{entity_id}, mismtached paren nesting") + raise InvalidUrnError(f"{entity_id}, mismatched paren nesting") - parts.append(entity_id[part_start : len(entity_id) - 1]) + parts.append(entity_id[part_start:-1]) return parts @@ -151,11 +151,12 @@ def __hash__(self) -> int: return hash((self._domain, self._entity_type) + tuple(self._entity_id)) def __eq__(self, other: object) -> bool: - if not isinstance(other, Urn): - return False - return ( - self._entity_id == other._entity_id - and self._domain == other._domain - and self._entity_type == other._entity_type + ( + self._entity_id == other._entity_id + and self._domain == other._domain + and self._entity_type == other._entity_type + ) + if isinstance(other, Urn) + else False )