diff --git a/metadata-ingestion-modules/airflow-plugin/setup.py b/metadata-ingestion-modules/airflow-plugin/setup.py
index 8ca4cad470f67e..6d26cf4498bdf9 100644
--- a/metadata-ingestion-modules/airflow-plugin/setup.py
+++ b/metadata-ingestion-modules/airflow-plugin/setup.py
@@ -1,4 +1,5 @@
import os
+import pathlib
import sys
from typing import Dict, Set
@@ -14,14 +15,11 @@
def get_long_description():
root = os.path.dirname(__file__)
- with open(os.path.join(root, "README.md")) as f:
- description = f.read()
-
- return description
+ return pathlib.Path(os.path.join(root, "README.md")).read_text()
base_requirements = {
- # Compatability.
+ # Compatibility.
"dataclasses>=0.6; python_version < '3.7'",
"typing_extensions>=3.10.0.2",
"mypy_extensions>=0.4.3",
diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py
index c893ff61cb9ec3..f7b384c1d32390 100644
--- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py
+++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_plugin.py
@@ -1,3 +1,4 @@
+import contextlib
import traceback
from typing import Any, Iterable
@@ -215,7 +216,7 @@ def datahub_on_success_callback(context, *args, **kwargs):
for inlet in inlets:
datajob.inlets.append(inlet.urn)
- # We have to use _oulets because outlets is empty
+ # We have to use _outlets because outlets is empty
for outlet in task._outlets:
datajob.outlets.append(outlet.urn)
@@ -389,13 +390,10 @@ def _patch_policy(settings):
def _patch_datahub_policy():
- try:
+ with contextlib.suppress(ImportError):
import airflow_local_settings
_patch_policy(airflow_local_settings)
- except ImportError:
- pass
-
from airflow.models.dagbag import settings
_patch_policy(settings)
diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/integration_test_dummy.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/integration_test_dummy.py
index f4f53619168f89..10cf3ad0a608ae 100644
--- a/metadata-ingestion-modules/airflow-plugin/tests/integration/integration_test_dummy.py
+++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/integration_test_dummy.py
@@ -1,2 +1,2 @@
def test_dummy():
- assert True
+ pass
diff --git a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_dummy.py b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_dummy.py
index f4f53619168f89..10cf3ad0a608ae 100644
--- a/metadata-ingestion-modules/airflow-plugin/tests/unit/test_dummy.py
+++ b/metadata-ingestion-modules/airflow-plugin/tests/unit/test_dummy.py
@@ -1,2 +1,2 @@
def test_dummy():
- assert True
+ pass
diff --git a/metadata-ingestion/examples/library/data_quality_mcpw_rest.py b/metadata-ingestion/examples/library/data_quality_mcpw_rest.py
index 7672d634f58468..077ca550e880eb 100644
--- a/metadata-ingestion/examples/library/data_quality_mcpw_rest.py
+++ b/metadata-ingestion/examples/library/data_quality_mcpw_rest.py
@@ -47,7 +47,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:
aspect=assertionResult,
)
- # Emit BatchAssertion Result! (timseries aspect)
+ # Emit BatchAssertion Result! (timeseries aspect)
emitter.emit_mcp(dataset_assertionRunEvent_mcp)
diff --git a/metadata-ingestion/examples/library/dataset_add_column_tag.py b/metadata-ingestion/examples/library/dataset_add_column_tag.py
index f5243ce28a5f01..8a15d33ff78779 100644
--- a/metadata-ingestion/examples/library/dataset_add_column_tag.py
+++ b/metadata-ingestion/examples/library/dataset_add_column_tag.py
@@ -23,18 +23,15 @@
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
- if field_path.startswith("[version=2.0]"):
- # this is a v2 field path
- tokens = [
- t
- for t in field_path.split(".")
- if not (t.startswith("[") or t.endswith("]"))
- ]
- path = ".".join(tokens)
- return path
- else:
+ if not field_path.startswith("[version=2.0]"):
# not a v2, we assume this is a simple path
return field_path
+ # this is a v2 field path
+ tokens = [
+ t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
+ ]
+
+ return ".".join(tokens)
# Inputs -> the column, dataset and the tag to set
diff --git a/metadata-ingestion/examples/library/dataset_add_column_term.py b/metadata-ingestion/examples/library/dataset_add_column_term.py
index ff1cad48a9f0c0..d656b5bd4502e7 100644
--- a/metadata-ingestion/examples/library/dataset_add_column_term.py
+++ b/metadata-ingestion/examples/library/dataset_add_column_term.py
@@ -23,18 +23,15 @@
def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
- if field_path.startswith("[version=2.0]"):
- # this is a v2 field path
- tokens = [
- t
- for t in field_path.split(".")
- if not (t.startswith("[") or t.endswith("]"))
- ]
- path = ".".join(tokens)
- return path
- else:
+ if not field_path.startswith("[version=2.0]"):
# not a v2, we assume this is a simple path
return field_path
+ # this is a v2 field path
+ tokens = [
+ t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
+ ]
+
+ return ".".join(tokens)
# Inputs -> the column, dataset and the term to set
diff --git a/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py b/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py
index 11f73d36cb29d9..d1c934cba40409 100644
--- a/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py
+++ b/metadata-ingestion/examples/library/lineage_emitter_mcpw_rest.py
@@ -10,13 +10,11 @@
)
from datahub.metadata.schema_classes import ChangeTypeClass
-# Construct upstream tables.
-upstream_tables: List[UpstreamClass] = []
upstream_table_1 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_1", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
-upstream_tables.append(upstream_table_1)
+upstream_tables: List[UpstreamClass] = [upstream_table_1]
upstream_table_2 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_2", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
diff --git a/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_simple.py b/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_simple.py
index 7212282156d8b9..1871a8af09e50c 100644
--- a/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_simple.py
+++ b/metadata-ingestion/examples/library/lineage_job_dataflow_new_api_simple.py
@@ -1,5 +1,5 @@
import uuid
-from datetime import datetime
+from datetime import datetime, timezone
from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
@@ -36,40 +36,61 @@
jobFlowRun = DataProcessInstance.from_dataflow(
dataflow=jobFlow, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
-jobFlowRun.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
+jobFlowRun.emit_process_start(
+ emitter, int(datetime.now(timezone.utc).timestamp() * 1000)
+)
+
jobRun = DataProcessInstance.from_datajob(
datajob=dataJob, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
-jobRun.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
+jobRun.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))
+
jobRun.emit_process_end(
- emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
+ emitter,
+ int(datetime.now(timezone.utc).timestamp() * 1000),
+ result=InstanceRunResult.SUCCESS,
)
+
job2Run = DataProcessInstance.from_datajob(
datajob=dataJob2, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
-job2Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
+job2Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))
+
job2Run.emit_process_end(
- emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
+ emitter,
+ int(datetime.now(timezone.utc).timestamp() * 1000),
+ result=InstanceRunResult.SUCCESS,
)
+
job3Run = DataProcessInstance.from_datajob(
datajob=dataJob3, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
-job3Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
+job3Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))
+
job3Run.emit_process_end(
- emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
+ emitter,
+ int(datetime.now(timezone.utc).timestamp() * 1000),
+ result=InstanceRunResult.SUCCESS,
)
+
job4Run = DataProcessInstance.from_datajob(
datajob=dataJob4, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
-job4Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
+job4Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))
+
job4Run.emit_process_end(
- emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
+ emitter,
+ int(datetime.now(timezone.utc).timestamp() * 1000),
+ result=InstanceRunResult.SUCCESS,
)
+
jobFlowRun.emit_process_end(
- emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
+ emitter,
+ int(datetime.now(timezone.utc).timestamp() * 1000),
+ result=InstanceRunResult.SUCCESS,
)
diff --git a/metadata-ingestion/examples/transforms/custom_transform_example.py b/metadata-ingestion/examples/transforms/custom_transform_example.py
index 85663d971092b5..57560e75cf7e92 100644
--- a/metadata-ingestion/examples/transforms/custom_transform_example.py
+++ b/metadata-ingestion/examples/transforms/custom_transform_example.py
@@ -61,13 +61,10 @@ def transform_aspect( # type: ignore
assert aspect is None or isinstance(aspect, OwnershipClass)
if owners_to_add:
- ownership = (
- aspect
- if aspect
- else OwnershipClass(
- owners=[],
- )
+ ownership = aspect or OwnershipClass(
+ owners=[],
)
+
ownership.owners.extend(owners_to_add)
return ownership
diff --git a/metadata-ingestion/scripts/avro_codegen.py b/metadata-ingestion/scripts/avro_codegen.py
index 05d8fb1c5804c6..e5758d159bdee5 100644
--- a/metadata-ingestion/scripts/avro_codegen.py
+++ b/metadata-ingestion/scripts/avro_codegen.py
@@ -10,11 +10,8 @@
def load_schema_file(schema_file: str) -> str:
- with open(schema_file) as f:
- raw_schema_text = f.read()
-
- redo_spaces = json.dumps(json.loads(raw_schema_text), indent=2)
- return redo_spaces
+ raw_schema_text = Path(schema_file).read_text()
+ return json.dumps(json.loads(raw_schema_text), indent=2)
def merge_schemas(schemas: List[str]) -> str:
diff --git a/metadata-ingestion/scripts/docgen.py b/metadata-ingestion/scripts/docgen.py
index 03434d291d3024..fa59cd20dda1c0 100644
--- a/metadata-ingestion/scripts/docgen.py
+++ b/metadata-ingestion/scripts/docgen.py
@@ -605,9 +605,10 @@ def generate(
os.makedirs(config_dir, exist_ok=True)
with open(f"{config_dir}/{plugin_name}_config.json", "w") as f:
f.write(source_config_class.schema_json(indent=2))
-
- create_or_update(source_documentation,
- [platform_id, "plugins", plugin_name, "config_schema"],
+
+ create_or_update(
+ source_documentation,
+ [platform_id, "plugins", plugin_name, "config_schema"],
source_config_class.schema_json(indent=2) or "",
)
@@ -649,7 +650,9 @@ def generate(
with open(platform_doc_file, "w") as f:
if "name" in platform_docs:
- f.write(f"import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n")
+ f.write(
+ f"import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n"
+ )
f.write(f"# {platform_docs['name']}\n")
if len(platform_docs["plugins"].keys()) > 1:
# More than one plugin used to provide integration with this platform
@@ -722,8 +725,10 @@ def generate(
f.write("\n```\n")
if "config" in plugin_docs:
f.write("\n### Config Details\n")
- f.write("""
- \n\n""")
+ f.write(
+ """
+ \n\n"""
+ )
f.write(
"Note that a `.` is used to denote nested fields in the YAML recipe.\n\n"
)
@@ -733,7 +738,8 @@ def generate(
for doc in plugin_docs["config"]:
f.write(doc)
f.write("\n\n\n")
- f.write(f"""
+ f.write(
+ f"""
The [JSONSchema](https://json-schema.org/) for this configuration is inlined below.\n\n
@@ -741,7 +747,8 @@ def generate(
{plugin_docs['config_schema']}
```\n\n
-\n\n""")
+\n\n"""
+ )
# insert custom plugin docs after config details
f.write(plugin_docs.get("custom_docs", ""))
if "classname" in plugin_docs:
diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py
index c0378d554d5d31..588e66f19a0ef1 100644
--- a/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py
+++ b/metadata-ingestion/src/datahub/api/entities/datajob/dataflow.py
@@ -142,7 +142,7 @@ def emit(
"""
Emit the DataFlow entity to Datahub
- :param emitter: Datahub Emitter to emit the proccess event
+ :param emitter: Datahub Emitter to emit the process event
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
diff --git a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py
index 329eca7d9cd44b..ec86ad80226312 100644
--- a/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py
+++ b/metadata-ingestion/src/datahub/api/entities/datajob/datajob.py
@@ -52,7 +52,7 @@ class DataJob:
properties Dict[str, str]: Custom properties to set for the DataProcessInstance
url (Optional[str]): Url which points to the DataJob at the orchestrator
inlets (List[str]): List of urns the DataProcessInstance consumes
- outlest (List[str]): List of urns the DataProcessInstance produces
+ outlets (List[str]): List of urns the DataProcessInstance produces
input_datajob_urns: List[DataJobUrn] = field(default_factory=list)
"""
@@ -179,7 +179,7 @@ def emit(
"""
Emit the DataJob entity to Datahub
- :param emitter: Datahub Emitter to emit the proccess event
+ :param emitter: Datahub Emitter to emit the process event
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
:rtype: None
"""
diff --git a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py
index 859e5700a51c3b..9b107d701ab02d 100644
--- a/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py
+++ b/metadata-ingestion/src/datahub/api/entities/dataprocess/dataprocess_instance.py
@@ -55,7 +55,7 @@ class DataProcessInstance:
template_urn (Optional[Union[DataJobUrn, DataFlowUrn]]): The parent DataJob or DataFlow which was instantiated if applicable
parent_instance (Optional[DataProcessInstanceUrn]): The parent execution's urn if applicable
properties Dict[str, str]: Custom properties to set for the DataProcessInstance
- url (Optional[str]): Url which points to the exection at the orchestrator
+ url (Optional[str]): Url which points to the execution at the orchestrator
inlets (List[str]): List of entities the DataProcessInstance consumes
outlets (List[str]): List of entities the DataProcessInstance produces
"""
@@ -118,10 +118,10 @@ def emit_process_start(
"""
:rtype: None
- :param emitter: Datahub Emitter to emit the proccess event
+ :param emitter: Datahub Emitter to emit the process event
:param start_timestamp_millis: (int) the execution start time in milliseconds
:param attempt: the number of attempt of the execution with the same execution id
- :param emit_template: (bool) If it is set the template of the execution (datajob, datflow) will be emitted as well.
+ :param emit_template: (bool) If it is set the template of the execution (datajob, dataflow) will be emitted as well.
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
"""
if emit_template and self.template_urn is not None:
@@ -312,8 +312,8 @@ def from_datajob(
:param datajob: (DataJob) the datajob from generate the DataProcessInstance
:param id: (str) the id for the DataProcessInstance
- :param clone_inlets: (bool) wheather to clone datajob's inlets
- :param clone_outlets: (bool) wheather to clone datajob's outlets
+ :param clone_inlets: (bool) whether to clone datajob's inlets
+ :param clone_outlets: (bool) whether to clone datajob's outlets
:return: DataProcessInstance
"""
dpi: DataProcessInstance = DataProcessInstance(
diff --git a/metadata-ingestion/src/datahub/cli/cli_utils.py b/metadata-ingestion/src/datahub/cli/cli_utils.py
index eebc548b9ab976..3e1a401db6b36c 100644
--- a/metadata-ingestion/src/datahub/cli/cli_utils.py
+++ b/metadata-ingestion/src/datahub/cli/cli_utils.py
@@ -137,7 +137,7 @@ def get_details_from_config():
gms_token = gms_config.token
return gms_host, gms_token
except yaml.YAMLError as exc:
- click.secho(f"{DATAHUB_CONFIG_PATH} malformatted, error: {exc}", bold=True)
+ click.secho(f"{DATAHUB_CONFIG_PATH} malformed, error: {exc}", bold=True)
return None, None
@@ -227,13 +227,13 @@ def get_session_and_host():
def test_connection():
(session, host) = get_session_and_host()
- url = host + "/config"
+ url = f"{host}/config"
response = session.get(url)
response.raise_for_status()
def test_connectivity_complain_exit(operation_name: str) -> None:
- """Test connectivty to metadata-service, log operation name and exit"""
+ """Test connectivity to metadata-service, log operation name and exit"""
# First test connectivity
try:
test_connection()
@@ -330,10 +330,7 @@ def post_delete_references_endpoint(
path: str,
cached_session_host: Optional[Tuple[Session, str]] = None,
) -> Tuple[int, List[Dict]]:
- if not cached_session_host:
- session, gms_host = get_session_and_host()
- else:
- session, gms_host = cached_session_host
+ session, gms_host = cached_session_host or get_session_and_host()
url = gms_host + path
payload = json.dumps(payload_obj)
@@ -349,10 +346,7 @@ def post_delete_endpoint(
path: str,
cached_session_host: Optional[Tuple[Session, str]] = None,
) -> typing.Tuple[str, int]:
- if not cached_session_host:
- session, gms_host = get_session_and_host()
- else:
- session, gms_host = cached_session_host
+ session, gms_host = cached_session_host or get_session_and_host()
url = gms_host + path
return post_delete_endpoint_with_session_and_url(session, url, payload_obj)
@@ -402,9 +396,7 @@ def get_urns_by_filter(
"condition": "EQUAL",
}
)
- if platform is not None and (
- entity_type_lower == "chart" or entity_type_lower == "dashboard"
- ):
+ if platform is not None and entity_type_lower in {"chart", "dashboard"}:
filter_criteria.append(
{
"field": "tool",
@@ -512,10 +504,7 @@ def batch_get_ids(
session, gms_host = get_session_and_host()
endpoint: str = "/entitiesV2"
url = gms_host + endpoint
- ids_to_get = []
- for id in ids:
- ids_to_get.append(Urn.url_encode(id))
-
+ ids_to_get = [Urn.url_encode(id) for id in ids]
response = session.get(
f"{url}?ids=List({','.join(ids_to_get)})",
)
@@ -572,11 +561,7 @@ def get_entity(
aspect: Optional[List] = None,
cached_session_host: Optional[Tuple[Session, str]] = None,
) -> Dict:
- if not cached_session_host:
- session, gms_host = get_session_and_host()
- else:
- session, gms_host = cached_session_host
-
+ session, gms_host = cached_session_host or get_session_and_host()
if urn.startswith("urn%3A"):
# we assume the urn is already encoded
encoded_urn: str = urn
@@ -589,7 +574,7 @@ def get_entity(
endpoint: str = f"/entitiesV2/{encoded_urn}"
if aspect and len(aspect):
- endpoint = endpoint + "?aspects=List(" + ",".join(aspect) + ")"
+ endpoint = f"{endpoint}?aspects=List(" + ",".join(aspect) + ")"
response = session.get(gms_host + endpoint)
response.raise_for_status()
@@ -602,12 +587,8 @@ def post_entity(
aspect_name: str,
aspect_value: Dict,
cached_session_host: Optional[Tuple[Session, str]] = None,
-) -> Dict:
- if not cached_session_host:
- session, gms_host = get_session_and_host()
- else:
- session, gms_host = cached_session_host
-
+) -> int:
+ session, gms_host = cached_session_host or get_session_and_host()
endpoint: str = "/aspects/?action=ingestProposal"
proposal = {
@@ -704,10 +685,7 @@ def get_latest_timeseries_aspect_values(
timeseries_aspect_name: str,
cached_session_host: Optional[Tuple[Session, str]],
) -> Dict:
- if not cached_session_host:
- session, gms_host = get_session_and_host()
- else:
- session, gms_host = cached_session_host
+ session, gms_host = cached_session_host or get_session_and_host()
query_body = {
"urn": entity_urn,
"entity": guess_entity_type(entity_urn),
@@ -758,14 +736,9 @@ def get_aspects_for_entity(
aspect_value["aspect"]["value"] = json.loads(
aspect_value["aspect"]["value"]
)
- aspect_list.update(
- # Follow the convention used for non-timeseries aspects.
- {
- aspect_cls.RECORD_SCHEMA.fullname.replace(
- "pegasus2avro.", ""
- ): aspect_value
- }
- )
+ aspect_list[
+ aspect_cls.RECORD_SCHEMA.fullname.replace("pegasus2avro.", "")
+ ] = aspect_value
aspect_map: Dict[str, Union[dict, DictWrapper]] = {}
for a in aspect_list.values():
@@ -789,4 +762,4 @@ def get_aspects_for_entity(
if aspects:
return {k: v for (k, v) in aspect_map.items() if k in aspects}
else:
- return {k: v for (k, v) in aspect_map.items()}
+ return dict(aspect_map)
diff --git a/metadata-ingestion/src/datahub/cli/delete_cli.py b/metadata-ingestion/src/datahub/cli/delete_cli.py
index 1ca0ac864693b1..a11fa3e6703520 100644
--- a/metadata-ingestion/src/datahub/cli/delete_cli.py
+++ b/metadata-ingestion/src/datahub/cli/delete_cli.py
@@ -182,7 +182,7 @@ def delete(
else:
# log warn include_removed + hard is the only way to work
if include_removed and soft:
- logger.warn(
+ logger.warning(
"A filtered delete including soft deleted entities is redundant, because it is a soft delete by default. Please use --include-removed in conjunction with --hard"
)
# Filter based delete
@@ -234,16 +234,16 @@ def delete_with_filters(
logger.info(f"datahub configured with {gms_host}")
emitter = rest_emitter.DatahubRestEmitter(gms_server=gms_host, token=token)
batch_deletion_result = DeletionResult()
- urns = [
- u
- for u in cli_utils.get_urns_by_filter(
+ urns = list(
+ cli_utils.get_urns_by_filter(
env=env,
platform=platform,
search_query=search_query,
entity_type=entity_type,
include_removed=include_removed,
)
- ]
+ )
+
logger.info(
f"Filter matched {len(urns)} entities. Sample: {choices(urns, k=min(5, len(urns)))}"
)
@@ -284,12 +284,12 @@ def _delete_one_urn(
if soft:
# Add removed aspect
- if not cached_emitter:
+ if cached_emitter:
+ emitter = cached_emitter
+ else:
_, gms_host = cli_utils.get_session_and_host()
token = cli_utils.get_token()
emitter = rest_emitter.DatahubRestEmitter(gms_server=gms_host, token=token)
- else:
- emitter = cached_emitter
if not dry_run:
emitter.emit_mcp(
MetadataChangeProposalWrapper(
@@ -305,18 +305,19 @@ def _delete_one_urn(
)
else:
logger.info(f"[Dry-run] Would soft-delete {urn}")
+ elif not dry_run:
+ payload_obj = {"urn": urn}
+ urn, rows_affected = cli_utils.post_delete_endpoint(
+ payload_obj,
+ "/entities?action=delete",
+ cached_session_host=cached_session_host,
+ )
+ deletion_result.num_records = rows_affected
else:
- if not dry_run:
- payload_obj = {"urn": urn}
- urn, rows_affected = cli_utils.post_delete_endpoint(
- payload_obj,
- "/entities?action=delete",
- cached_session_host=cached_session_host,
- )
- deletion_result.num_records = rows_affected
- else:
- logger.info(f"[Dry-run] Would hard-delete {urn}")
- deletion_result.num_records = UNKNOWN_NUM_RECORDS # since we don't know how many rows will be affected
+ logger.info(f"[Dry-run] Would hard-delete {urn}")
+ deletion_result.num_records = (
+ UNKNOWN_NUM_RECORDS # since we don't know how many rows will be affected
+ )
deletion_result.end()
return deletion_result
diff --git a/metadata-ingestion/src/datahub/cli/docker_check.py b/metadata-ingestion/src/datahub/cli/docker_check.py
index e530f4d19616f3..25719cef2334d9 100644
--- a/metadata-ingestion/src/datahub/cli/docker_check.py
+++ b/metadata-ingestion/src/datahub/cli/docker_check.py
@@ -88,10 +88,11 @@ def check_local_docker_containers(preflight_only: bool = False) -> List[str]:
if len(containers) == 0:
issues.append("quickstart.sh or dev.sh is not running")
else:
- existing_containers = set(container.name for container in containers)
+ existing_containers = {container.name for container in containers}
missing_containers = set(REQUIRED_CONTAINERS) - existing_containers
- for missing in missing_containers:
- issues.append(f"{missing} container is not present")
+ issues.extend(
+ f"{missing} container is not present" for missing in missing_containers
+ )
# Check that the containers are running and healthy.
for container in containers:
diff --git a/metadata-ingestion/src/datahub/cli/ingest_cli.py b/metadata-ingestion/src/datahub/cli/ingest_cli.py
index 55d3a0cdc04760..ecb7d80e30fcfa 100644
--- a/metadata-ingestion/src/datahub/cli/ingest_cli.py
+++ b/metadata-ingestion/src/datahub/cli/ingest_cli.py
@@ -79,7 +79,7 @@ def ingest() -> None:
type=bool,
is_flag=True,
default=False,
- help="Supress display of variable values in logs by supressing elaborae stacktrace (stackprinter) during ingestion failures",
+ help="Suppress display of variable values in logs by suppressing elaborate stacktrace (stackprinter) during ingestion failures",
)
@click.pass_context
@upgrade.check_upgrade
@@ -313,14 +313,14 @@ def rollback(
current_time = now.strftime("%Y-%m-%d %H:%M:%S")
try:
- folder_name = report_dir + "/" + current_time
+ folder_name = f"{report_dir}/{current_time}"
- ingestion_config_file_name = folder_name + "/config.json"
+ ingestion_config_file_name = f"{folder_name}/config.json"
os.makedirs(os.path.dirname(ingestion_config_file_name), exist_ok=True)
with open(ingestion_config_file_name, "w") as file_handle:
json.dump({"run_id": run_id}, file_handle)
- csv_file_name = folder_name + "/unsafe_entities.csv"
+ csv_file_name = f"{folder_name}/unsafe_entities.csv"
with open(csv_file_name, "w") as file_handle:
writer = csv.writer(file_handle)
writer.writerow(["urn"])
@@ -329,4 +329,4 @@ def rollback(
except IOError as e:
print(e)
- sys.exit("Unable to write reports to " + report_dir)
+ sys.exit(f"Unable to write reports to {report_dir}")
diff --git a/metadata-ingestion/src/datahub/cli/migration_utils.py b/metadata-ingestion/src/datahub/cli/migration_utils.py
index b383e2849b6b9b..79546e07ac056a 100644
--- a/metadata-ingestion/src/datahub/cli/migration_utils.py
+++ b/metadata-ingestion/src/datahub/cli/migration_utils.py
@@ -218,8 +218,8 @@ def modify_urn_list_for_aspect(
new_urn: str,
) -> DictWrapper:
- if hasattr(UrnListModifier, aspect_name + "_modifier"):
- modifier = getattr(UrnListModifier, aspect_name + "_modifier")
+ if hasattr(UrnListModifier, f"{aspect_name}_modifier"):
+ modifier = getattr(UrnListModifier, f"{aspect_name}_modifier")
return modifier(
aspect=aspect,
relationship_type=relationship_type,
diff --git a/metadata-ingestion/src/datahub/cli/timeline_cli.py b/metadata-ingestion/src/datahub/cli/timeline_cli.py
index 40c5af4e1e78a0..579dff5425a112 100644
--- a/metadata-ingestion/src/datahub/cli/timeline_cli.py
+++ b/metadata-ingestion/src/datahub/cli/timeline_cli.py
@@ -19,46 +19,40 @@
def pretty_field_path(field_path: str) -> str:
- if field_path.startswith("[version=2.0]"):
+ if not field_path.startswith("[version=2.0]"):
+ return field_path
# breakpoint()
# parse schema field
- tokens = [
- t
- for t in field_path.split(".")
- if not (t.startswith("[") or t.endswith("]"))
- ]
- path = ".".join(tokens)
- return path
- else:
- return field_path
+ tokens = [
+ t
+ for t in field_path.split(".")
+ if not t.startswith("[") and not t.endswith("]")
+ ]
+
+ return ".".join(tokens)
def pretty_id(id: Optional[str]) -> str:
if not id:
return ""
- else:
- # breakpoint()
- assert id is not None
- if id.startswith("urn:li:datasetField:") or id.startswith(
- "urn:li:schemaField:"
- ):
- # parse schema field
- schema_field_key = schema_field_urn_to_key(
- id.replace("urn:li:datasetField", "urn:li:schemaField")
- )
- if schema_field_key:
- assert schema_field_key is not None
- field_path = schema_field_key.fieldPath
-
- return f"{colored('field','cyan')}:{colored(pretty_field_path(field_path),'white')}"
- if id.startswith("[version=2.0]"):
- return f"{colored('field','cyan')}:{colored(pretty_field_path(id),'white')}"
-
- if id.startswith("urn:li:dataset"):
- # parse dataset urn
- dataset_key = dataset_urn_to_key(id)
- if dataset_key:
- return f"{colored('dataset','cyan')}:{colored(dataset_key.platform,'white')}:{colored(dataset_key.name,'white')}"
+ # breakpoint()
+ assert id is not None
+ if id.startswith("urn:li:datasetField:") or id.startswith("urn:li:schemaField:"):
+ schema_field_key = schema_field_urn_to_key(
+ id.replace("urn:li:datasetField", "urn:li:schemaField")
+ )
+ if schema_field_key:
+ assert schema_field_key is not None
+ field_path = schema_field_key.fieldPath
+
+ return f"{colored('field','cyan')}:{colored(pretty_field_path(field_path),'white')}"
+ if id.startswith("[version=2.0]"):
+ return f"{colored('field','cyan')}:{colored(pretty_field_path(id),'white')}"
+
+ if id.startswith("urn:li:dataset"):
+ dataset_key = dataset_urn_to_key(id)
+ if dataset_key:
+ return f"{colored('dataset','cyan')}:{colored(dataset_key.platform,'white')}:{colored(dataset_key.name,'white')}"
# failed to prettify, return original
return id
@@ -196,10 +190,10 @@ def timeline(
)
change_color = (
"green"
- if change_txn.get("semVerChange") == "MINOR"
- or change_txn.get("semVerChange") == "PATCH"
+ if change_txn.get("semVerChange") in ["MINOR", "PATCH"]
else "red"
)
+
print(
f"{colored(change_instant,'cyan')} - {colored(change_txn['semVer'],change_color)}"
)
diff --git a/metadata-ingestion/src/datahub/configuration/common.py b/metadata-ingestion/src/datahub/configuration/common.py
index 716572babc1e41..be3c6a13d3599b 100644
--- a/metadata-ingestion/src/datahub/configuration/common.py
+++ b/metadata-ingestion/src/datahub/configuration/common.py
@@ -39,10 +39,7 @@ class OperationalError(PipelineExecutionError):
def __init__(self, message: str, info: dict = None):
self.message = message
- if info:
- self.info = info
- else:
- self.info = {}
+ self.info = info or {}
class ConfigurationError(MetaError):
@@ -128,10 +125,7 @@ def alphabet_pattern(self) -> Pattern:
@property
def regex_flags(self) -> int:
- if self.ignoreCase:
- return re.IGNORECASE
- else:
- return 0
+ return re.IGNORECASE if self.ignoreCase else 0
@classmethod
def allow_all(cls) -> "AllowDenyPattern":
@@ -142,11 +136,10 @@ def allowed(self, string: str) -> bool:
if re.match(deny_pattern, string, self.regex_flags):
return False
- for allow_pattern in self.allow:
- if re.match(allow_pattern, string, self.regex_flags):
- return True
-
- return False
+ return any(
+ re.match(allow_pattern, string, self.regex_flags)
+ for allow_pattern in self.allow
+ )
def is_fully_specified_allow_list(self) -> bool:
"""
@@ -155,10 +148,9 @@ def is_fully_specified_allow_list(self) -> bool:
pattern into a 'search for the ones that are allowed' pattern, which can be
much more efficient in some cases.
"""
- for allow_pattern in self.allow:
- if not self.alphabet_pattern.match(allow_pattern):
- return False
- return True
+ return all(
+ self.alphabet_pattern.match(allow_pattern) for allow_pattern in self.allow
+ )
def get_allowed_list(self) -> List[str]:
"""Return the list of allowed strings as a list, after taking into account deny patterns, if possible"""
@@ -181,16 +173,12 @@ def all(cls) -> "KeyValuePattern":
return KeyValuePattern()
def value(self, string: str) -> List[str]:
- for key in self.rules.keys():
- if re.match(key, string):
- return self.rules[key]
- return []
+ return next(
+ (self.rules[key] for key in self.rules.keys() if re.match(key, string)), []
+ )
def matched(self, string: str) -> bool:
- for key in self.rules.keys():
- if re.match(key, string):
- return True
- return False
+ return any(re.match(key, string) for key in self.rules.keys())
def is_fully_specified_key(self) -> bool:
"""
@@ -199,10 +187,7 @@ def is_fully_specified_key(self) -> bool:
pattern into a 'search for the ones that are allowed' pattern, which can be
much more efficient in some cases.
"""
- for key in self.rules.keys():
- if not self.alphabet_pattern.match(key):
- return True
- return False
+ return any(not self.alphabet_pattern.match(key) for key in self.rules.keys())
def get(self) -> Dict[str, List[str]]:
"""Return the list of allowed strings as a list, after taking into account deny patterns, if possible"""
diff --git a/metadata-ingestion/src/datahub/configuration/import_resolver.py b/metadata-ingestion/src/datahub/configuration/import_resolver.py
index 56e232d0403241..19627c7b8c9569 100644
--- a/metadata-ingestion/src/datahub/configuration/import_resolver.py
+++ b/metadata-ingestion/src/datahub/configuration/import_resolver.py
@@ -8,9 +8,7 @@
def _pydantic_resolver(v: Union[T, str]) -> T:
- if isinstance(v, str):
- return import_path(v)
- return v
+ return import_path(v) if isinstance(v, str) else v
def pydantic_resolve_key(field: str) -> classmethod:
diff --git a/metadata-ingestion/src/datahub/configuration/kafka.py b/metadata-ingestion/src/datahub/configuration/kafka.py
index 876db21086e88e..197322a2e566e1 100644
--- a/metadata-ingestion/src/datahub/configuration/kafka.py
+++ b/metadata-ingestion/src/datahub/configuration/kafka.py
@@ -27,7 +27,7 @@ def bootstrap_host_colon_port_comma(cls, val: str) -> str:
else:
host = entry
assert re.match(
- # This regex is quite loose. Many invalid hostnames or IPs will slip through,
+ # This regex is quite loose. Many invalid hostname's or IPs will slip through,
# but it serves as a good first line of validation. We defer to Kafka for the
# remaining validation.
r"^[\w\-\.\:]+$",
diff --git a/metadata-ingestion/src/datahub/configuration/yaml.py b/metadata-ingestion/src/datahub/configuration/yaml.py
index ee710b07bab3d2..1f1172836f7448 100644
--- a/metadata-ingestion/src/datahub/configuration/yaml.py
+++ b/metadata-ingestion/src/datahub/configuration/yaml.py
@@ -9,5 +9,4 @@ class YamlConfigurationMechanism(ConfigurationMechanism):
"""Ability to load configuration from yaml files"""
def load_config(self, config_fp: IO) -> dict:
- config = yaml.safe_load(config_fp)
- return config
+ return yaml.safe_load(config_fp)
diff --git a/metadata-ingestion/src/datahub/emitter/kafka_emitter.py b/metadata-ingestion/src/datahub/emitter/kafka_emitter.py
index f2dc663cf0677a..001097a2e42f5b 100644
--- a/metadata-ingestion/src/datahub/emitter/kafka_emitter.py
+++ b/metadata-ingestion/src/datahub/emitter/kafka_emitter.py
@@ -49,12 +49,11 @@ def validate_topic_routes(cls: "KafkaEmitterConfig", values: dict) -> dict:
raise ConfigurationError(
"Using both topic and topic_routes configuration for Kafka is not supported. Use only topic_routes"
)
- else:
- logger.warning(
- "Looks like you're using the deprecated `topic` configuration. Please migrate to `topic_routes`."
- )
- # upgrade topic provided to topic_routes mce entry
- values["topic_routes"][MCE_KEY] = values["topic"]
+ logger.warning(
+ "Looks like you're using the deprecated `topic` configuration. Please migrate to `topic_routes`."
+ )
+ # upgrade topic provided to topic_routes mce entry
+ values["topic_routes"][MCE_KEY] = values["topic"]
return values
@@ -70,8 +69,7 @@ def __init__(self, config: KafkaEmitterConfig):
def convert_mce_to_dict(
mce: MetadataChangeEvent, ctx: SerializationContext
) -> dict:
- tuple_encoding = mce.to_obj(tuples=True)
- return tuple_encoding
+ return mce.to_obj(tuples=True)
mce_avro_serializer = AvroSerializer(
schema_str=getMetadataChangeEventSchema(),
@@ -83,8 +81,7 @@ def convert_mcp_to_dict(
mcp: Union[MetadataChangeProposal, MetadataChangeProposalWrapper],
ctx: SerializationContext,
) -> dict:
- tuple_encoding = mcp.to_obj(tuples=True)
- return tuple_encoding
+ return mcp.to_obj(tuples=True)
mcp_avro_serializer = AvroSerializer(
schema_str=getMetadataChangeProposalSchema(),
diff --git a/metadata-ingestion/src/datahub/emitter/mce_builder.py b/metadata-ingestion/src/datahub/emitter/mce_builder.py
index bf295c3e367931..5936546090acbc 100644
--- a/metadata-ingestion/src/datahub/emitter/mce_builder.py
+++ b/metadata-ingestion/src/datahub/emitter/mce_builder.py
@@ -104,8 +104,8 @@ def schema_field_urn_to_key(schema_field_urn: str) -> Optional[SchemaFieldKeyCla
pattern = r"urn:li:schemaField:\((.*),(.*)\)"
results = re.search(pattern, schema_field_urn)
if results is not None:
- dataset_urn: str = results.group(1)
- field_path: str = results.group(2)
+ dataset_urn: str = results[1]
+ field_path: str = results[2]
return SchemaFieldKeyClass(parent=dataset_urn, fieldPath=field_path)
return None
@@ -114,9 +114,7 @@ def dataset_urn_to_key(dataset_urn: str) -> Optional[DatasetKeyClass]:
pattern = r"urn:li:dataset:\(urn:li:dataPlatform:(.*),(.*),(.*)\)"
results = re.search(pattern, dataset_urn)
if results is not None:
- return DatasetKeyClass(
- platform=results.group(1), name=results.group(2), origin=results.group(3)
- )
+ return DatasetKeyClass(platform=results[1], name=results[2], origin=results[3])
return None
@@ -128,9 +126,7 @@ def container_new_urn_to_key(dataset_urn: str) -> Optional[ContainerKeyClass]:
pattern = r"urn:dh:container:0:\((.*)\)"
results = re.search(pattern, dataset_urn)
if results is not None:
- return ContainerKeyClass(
- guid=results.group(1),
- )
+ return ContainerKeyClass(guid=results[1])
return None
@@ -146,9 +142,7 @@ def container_urn_to_key(guid: str) -> Optional[ContainerKeyClass]:
pattern = r"urn:li:container:(.*)"
results = re.search(pattern, guid)
if results is not None:
- return ContainerKeyClass(
- guid=results.group(1),
- )
+ return ContainerKeyClass(guid=results[1])
return None
@@ -156,8 +150,7 @@ def datahub_guid(obj: dict) -> str:
obj_str = json.dumps(
pre_json_transform(obj), separators=(",", ":"), sort_keys=True
).encode("utf-8")
- datahub_guid = md5(obj_str).hexdigest()
- return datahub_guid
+ return md5(obj_str).hexdigest()
def make_assertion_urn(assertion_id: str) -> str:
diff --git a/metadata-ingestion/src/datahub/emitter/mcp_builder.py b/metadata-ingestion/src/datahub/emitter/mcp_builder.py
index 7aed2e29137492..055db3c6a4ad61 100644
--- a/metadata-ingestion/src/datahub/emitter/mcp_builder.py
+++ b/metadata-ingestion/src/datahub/emitter/mcp_builder.py
@@ -233,7 +233,9 @@ def gen_containers(
def add_dataset_to_container(
- container_key: KeyType, dataset_urn: str
+ # FIXME: Union requires two or more type arguments
+ container_key: KeyType,
+ dataset_urn: str,
) -> Iterable[Union[MetadataWorkUnit]]:
container_urn = make_container_urn(
guid=container_key.guid(),
diff --git a/metadata-ingestion/src/datahub/emitter/serialization_helper.py b/metadata-ingestion/src/datahub/emitter/serialization_helper.py
index 5a348ce267b10f..cad4e9dd3270fc 100644
--- a/metadata-ingestion/src/datahub/emitter/serialization_helper.py
+++ b/metadata-ingestion/src/datahub/emitter/serialization_helper.py
@@ -16,10 +16,12 @@ def _json_transform(obj: Any, from_pattern: str, to_pattern: str) -> Any:
field = obj["fieldDiscriminator"]
return {field: _json_transform(obj[field], from_pattern, to_pattern)}
- new_obj: Any = {}
- for key, value in obj.items():
- if value is not None:
- new_obj[key] = _json_transform(value, from_pattern, to_pattern)
+ new_obj: Any = {
+ key: _json_transform(value, from_pattern, to_pattern)
+ for key, value in obj.items()
+ if value is not None
+ }
+
return new_obj
elif isinstance(obj, list):
new_obj = [_json_transform(item, from_pattern, to_pattern) for item in obj]
diff --git a/metadata-ingestion/src/datahub/ingestion/api/committable.py b/metadata-ingestion/src/datahub/ingestion/api/committable.py
index f1aada4477f1ab..e41eb24abc2d96 100644
--- a/metadata-ingestion/src/datahub/ingestion/api/committable.py
+++ b/metadata-ingestion/src/datahub/ingestion/api/committable.py
@@ -55,7 +55,7 @@ def __init__(
super(_CommittableConcrete, self).__init__(state_to_commit=state_to_commit)
def has_successfully_committed(self) -> bool:
- return True if not self.state_to_commit or self.committed else False
+ return bool(not self.state_to_commit or self.committed)
@abstractmethod
def get_previous_states(
diff --git a/metadata-ingestion/src/datahub/ingestion/api/common.py b/metadata-ingestion/src/datahub/ingestion/api/common.py
index 56c21a7f39c627..fd458f9b4fd980 100644
--- a/metadata-ingestion/src/datahub/ingestion/api/common.py
+++ b/metadata-ingestion/src/datahub/ingestion/api/common.py
@@ -55,8 +55,8 @@ def __init__(
self.pipeline_name = pipeline_name
self.dry_run_mode = dry_run
self.preview_mode = preview_mode
- self.reporters: Dict[str, Committable] = dict()
- self.checkpointers: Dict[str, Committable] = dict()
+ self.reporters: Dict[str, Committable] = {}
+ self.checkpointers: Dict[str, Committable] = {}
self._set_dataset_urn_to_lower_if_needed()
def _set_dataset_urn_to_lower_if_needed(self) -> None:
@@ -81,11 +81,8 @@ def register_reporter(self, committable: Committable) -> None:
self.reporters[committable.name] = committable
def get_reporters(self) -> Iterable[Committable]:
- for committable in self.reporters.values():
- yield committable
+ yield from self.reporters.values()
def get_committables(self) -> Iterable[Tuple[str, Committable]]:
- for reporting_item_commitable in self.reporters.items():
- yield reporting_item_commitable
- for checkpointing_item_commitable in self.checkpointers.items():
- yield checkpointing_item_commitable
+ yield from self.reporters.items()
+ yield from self.checkpointers.items()
diff --git a/metadata-ingestion/src/datahub/ingestion/api/decorators.py b/metadata-ingestion/src/datahub/ingestion/api/decorators.py
index 9b3f35ae9d8116..20867a8571b24c 100644
--- a/metadata-ingestion/src/datahub/ingestion/api/decorators.py
+++ b/metadata-ingestion/src/datahub/ingestion/api/decorators.py
@@ -37,8 +37,9 @@ def wrapper(cls: Type) -> Type:
setattr(
cls,
"get_platform_id",
- lambda: id if id else platform_name.lower().replace(" ", "-"),
+ lambda: id or platform_name.lower().replace(" ", "-"),
)
+
return cls
if id and " " in id:
@@ -56,7 +57,7 @@ class SupportStatus(Enum):
"""
INCUBATING = auto()
"""
- Incubating Sources are ready for DataHub Community adoption but have not been tested for a wide variety of edge-cases. We eagerly solicit feedback from the Community to streghten the connector; minor version changes may arise in future releases.
+ Incubating Sources are ready for DataHub Community adoption but have not been tested for a wide variety of edge-cases. We eagerly solicit feedback from the Community to strengthen the connector; minor version changes may arise in future releases.
"""
TESTING = auto()
"""
diff --git a/metadata-ingestion/src/datahub/ingestion/api/registry.py b/metadata-ingestion/src/datahub/ingestion/api/registry.py
index f83921639c227e..a8529817e2500f 100644
--- a/metadata-ingestion/src/datahub/ingestion/api/registry.py
+++ b/metadata-ingestion/src/datahub/ingestion/api/registry.py
@@ -79,16 +79,15 @@ def register_disabled(
def _ensure_not_lazy(self, key: str) -> Union[Type[T], Exception]:
path = self._mapping[key]
- if isinstance(path, str):
- try:
- plugin_class = import_path(path)
- self.register(key, plugin_class, override=True)
- return plugin_class
- except (AssertionError, ModuleNotFoundError, ImportError) as e:
- self.register_disabled(key, e, override=True)
- return e
- else:
+ if not isinstance(path, str):
return path
+ try:
+ plugin_class = import_path(path)
+ self.register(key, plugin_class, override=True)
+ return plugin_class
+ except (AssertionError, ImportError) as e:
+ self.register_disabled(key, e, override=True)
+ return e
def is_enabled(self, key: str) -> bool:
tp = self._mapping[key]
diff --git a/metadata-ingestion/src/datahub/ingestion/extractor/protobuf_util.py b/metadata-ingestion/src/datahub/ingestion/extractor/protobuf_util.py
index e5f976ff88dd56..51fdbd8fbdb680 100644
--- a/metadata-ingestion/src/datahub/ingestion/extractor/protobuf_util.py
+++ b/metadata-ingestion/src/datahub/ingestion/extractor/protobuf_util.py
@@ -365,11 +365,7 @@ def _schema_fields_from_dag(
if generations and generations[0]:
roots = generations[0]
- leafs: List = []
- for node in graph:
- if graph.out_degree(node) == 0:
- leafs.append(node)
-
+ leafs: List = [node for node in graph if graph.out_degree(node) == 0]
type_of_nodes: Dict = nx.get_node_attributes(graph, "node_type")
for root in roots:
diff --git a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_reporting_provider.py b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_reporting_provider.py
index 568c41aac9dbd9..1bb89236cc51ad 100644
--- a/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_reporting_provider.py
+++ b/metadata-ingestion/src/datahub/ingestion/reporting/datahub_ingestion_reporting_provider.py
@@ -115,7 +115,7 @@ def get_previous_states(
) -> List[ReportingJobStatesMap]:
if not last_only:
raise NotImplementedError(
- "Currently supports retrieving only the last commited state."
+ "Currently supports retrieving only the last committed state."
)
if filter_opt is not None:
raise NotImplementedError(
diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py
index 969634e3163910..d0b1e300b9184b 100644
--- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py
+++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py
@@ -54,11 +54,10 @@ def run_id_should_be_semantic(
cls, v: Optional[str], values: Dict[str, Any], **kwargs: Any
) -> str:
if v == "__DEFAULT_RUN_ID":
- if "source" in values:
- if hasattr(values["source"], "type"):
- source_type = values["source"].type
- current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
- return f"{source_type}-{current_time}"
+ if "source" in values and hasattr(values["source"], "type"):
+ source_type = values["source"].type
+ current_time = datetime.datetime.now().strftime("%Y_%m_%d-%H_%M_%S")
+ return f"{source_type}-{current_time}"
return str(uuid.uuid1()) # default run_id if we cannot infer a source type
else:
@@ -88,12 +87,11 @@ def default_sink_is_datahub_rest(cls, values: Dict[str, Any]) -> Any:
def datahub_api_should_use_rest_sink_as_default(
cls, v: Optional[DatahubClientConfig], values: Dict[str, Any], **kwargs: Any
) -> Optional[DatahubClientConfig]:
- if v is None:
- if "sink" in values and hasattr(values["sink"], "type"):
- sink_type = values["sink"].type
- if sink_type == "datahub-rest":
- sink_config = values["sink"].config
- v = DatahubClientConfig.parse_obj(sink_config)
+ if v is None and "sink" in values and hasattr(values["sink"], "type"):
+ sink_type = values["sink"].type
+ if sink_type == "datahub-rest":
+ sink_config = values["sink"].config
+ v = DatahubClientConfig.parse_obj(sink_config)
return v
@@ -268,11 +266,10 @@ def process_commits(self) -> None:
if self.source.get_report().failures or self.sink.get_report().failures
else False
)
- has_warnings: bool = (
- True
- if self.source.get_report().warnings or self.sink.get_report().warnings
- else False
+ has_warnings: bool = bool(
+ self.source.get_report().warnings or self.sink.get_report().warnings
)
+
for name, committable in self.ctx.get_committables():
commit_policy: CommitPolicy = committable.commit_policy
diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py
index f931b9039303a0..93d3aa5f6c85d3 100644
--- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py
+++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_kafka.py
@@ -77,8 +77,8 @@ def write_record_async(
self.report, record_envelope, write_callback
).kafka_callback,
)
- elif isinstance(record, MetadataChangeProposalWrapper) or isinstance(
- record, MetadataChangeProposalClass
+ elif isinstance(
+ record, (MetadataChangeProposalWrapper, MetadataChangeProposalClass)
):
self.emitter.emit_mcp_async(
record,
diff --git a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
index 74e536350457b5..d95eb245ccfb50 100644
--- a/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
+++ b/metadata-ingestion/src/datahub/ingestion/sink/datahub_rest.py
@@ -1,4 +1,5 @@
import concurrent.futures
+import contextlib
import functools
import logging
from dataclasses import dataclass
@@ -111,13 +112,10 @@ def _write_done_callback(
else:
# trim exception stacktraces when reporting warnings
if "stackTrace" in e.info:
- try:
+ with contextlib.suppress(Exception):
e.info["stackTrace"] = "\n".join(
- e.info["stackTrace"].split("\n")[0:2]
+ e.info["stackTrace"].split("\n")[:2]
)
- except Exception:
- # ignore failures in trimming
- pass
record = record_envelope.record
if isinstance(record, MetadataChangeProposalWrapper):
# include information about the entity that failed
diff --git a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
index 93768a1aa7e73b..4ad1fd7a836c5b 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/aws/glue.py
@@ -1059,7 +1059,7 @@ def get_s3_tags() -> Optional[GlobalTagsClass]:
]
)
except self.s3_client.exceptions.ClientError:
- logger.warn(f"No tags found for bucket={bucket_name}")
+ logger.warning(f"No tags found for bucket={bucket_name}")
if self.source_config.use_s3_object_tags:
key_prefix = s3_util.get_key_prefix(
table["StorageDescriptor"]["Location"]
@@ -1078,7 +1078,7 @@ def get_s3_tags() -> Optional[GlobalTagsClass]:
else:
# Unlike bucket tags, if an object does not have tags, it will just return an empty array
# as opposed to an exception.
- logger.warn(
+ logger.warning(
f"No tags found for bucket={bucket_name} key={key_prefix}"
)
if len(tags_to_add) == 0:
@@ -1097,7 +1097,7 @@ def get_s3_tags() -> Optional[GlobalTagsClass]:
[current_tag.tag for current_tag in current_tags.tags]
)
else:
- logger.warn(
+ logger.warning(
"Could not connect to DatahubApi. No current tags to maintain"
)
diff --git a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py
index c5a6c13bf695df..74fca0d2654519 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/elastic_search.py
@@ -255,9 +255,7 @@ def host_colon_port_comma(cls, host_val: str) -> str:
@property
def http_auth(self) -> Optional[Tuple[str, str]]:
- if self.username is None:
- return None
- return self.username, self.password or ""
+ return None if self.username is None else (self.username, self.password or "")
@platform_name("Elastic Search")
diff --git a/metadata-ingestion/src/datahub/ingestion/source/feast.py b/metadata-ingestion/src/datahub/ingestion/source/feast.py
index c17ae5c14a85fd..7d4f5360ad4202 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/feast.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/feast.py
@@ -4,23 +4,23 @@
from pydantic import Field
-if sys.version_info >= (3, 7):
- from feast import (
- BigQuerySource,
- Entity,
- Feature,
- FeatureStore,
- FeatureView,
- FileSource,
- KafkaSource,
- KinesisSource,
- OnDemandFeatureView,
- ValueType,
- )
- from feast.data_source import DataSource, RequestDataSource
-else:
+if sys.version_info < (3, 7):
raise ModuleNotFoundError("The feast plugin requires Python 3.7 or newer.")
+from feast import (
+ BigQuerySource,
+ Entity,
+ Feature,
+ FeatureStore,
+ FeatureView,
+ FileSource,
+ KafkaSource,
+ KinesisSource,
+ OnDemandFeatureView,
+ ValueType,
+)
+from feast.data_source import DataSource, RequestDataSource
+
import datahub.emitter.mce_builder as builder
from datahub.configuration.common import ConfigModel
from datahub.emitter.mce_builder import DEFAULT_ENV
@@ -52,6 +52,7 @@
assert sys.version_info >= (3, 7) # needed for mypy
+# FIXME: ValueType module cannot be used as a type
_field_type_mapping: Dict[ValueType, str] = {
ValueType.UNKNOWN: MLFeatureDataType.UNKNOWN,
ValueType.BYTES: MLFeatureDataType.BYTE,
@@ -218,6 +219,7 @@ def _get_entity_workunit(
def _get_feature_workunit(
self,
+ # FIXME: FeatureView and OnDemandFeatureView cannot be used as a type
feature_view: Union[FeatureView, OnDemandFeatureView],
feature: Feature,
) -> MetadataWorkUnit:
diff --git a/metadata-ingestion/src/datahub/ingestion/source/ldap.py b/metadata-ingestion/src/datahub/ingestion/source/ldap.py
index cf751de06d1e7e..5e2d31a8031683 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/ldap.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/ldap.py
@@ -205,9 +205,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
)
_rtype, rdata, _rmsgid, serverctrls = self.ldap_client.result3(msgid)
except ldap.LDAPError as e:
- self.report.report_failure(
- "ldap-control", "LDAP search failed: {}".format(e)
- )
+ self.report.report_failure("ldap-control", f"LDAP search failed: {e}")
break
for dn, attrs in rdata:
@@ -265,10 +263,7 @@ def handle_user(self, dn: str, attrs: Dict[str, Any]) -> Iterable[MetadataWorkUn
_m_dn, m_attrs = result[1][0]
manager_ldap = guess_person_ldap(m_attrs, self.config, self.report)
except ldap.LDAPError as e:
- self.report.report_warning(
- dn, "manager LDAP search failed: {}".format(e)
- )
-
+ self.report.report_warning(dn, f"manager LDAP search failed: {e}")
mce = self.build_corp_user_mce(dn, attrs, manager_ldap)
if mce:
wu = MetadataWorkUnit(dn, mce)
diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker.py b/metadata-ingestion/src/datahub/ingestion/source/looker.py
index 252d9f553e0572..7c1368f814e4e0 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/looker.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/looker.py
@@ -216,11 +216,11 @@ def url(self, base_url: str) -> str:
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
- base_url = m.group(1)
+ base_url = m[1]
if self.look_id is not None:
- return base_url + "/looks/" + self.look_id
+ return f"{base_url}/looks/{self.look_id}"
else:
- return base_url + "/x/" + self.query_slug
+ return f"{base_url}/x/{self.query_slug}"
def get_urn_element_id(self):
# A dashboard element can use a look or just a raw query against an explore
@@ -270,23 +270,22 @@ def __init__(self, client: Looker31SDK):
def get_by_id(
self, id: int, transport_options: Optional[TransportOptions]
) -> Optional[LookerUser]:
- logger.debug("Will get user {}".format(id))
+ logger.debug(f"Will get user {id}")
if id in self.user_map:
return self.user_map[id]
- else:
- try:
- raw_user: User = self.client.user(
- id,
- fields=self.fields,
- transport_options=transport_options,
- )
- looker_user = LookerUser._from_user(raw_user)
- self.user_map[id] = looker_user
- return looker_user
- except SDKError as e:
- logger.warn("Could not find user with id {}".format(id))
- logger.warn("Failure was {}".format(e))
- return None
+ try:
+ raw_user: User = self.client.user(
+ id,
+ fields=self.fields,
+ transport_options=transport_options,
+ )
+ looker_user = LookerUser._from_user(raw_user)
+ self.user_map[id] = looker_user
+ return looker_user
+ except SDKError as e:
+ logger.warning(f"Could not find user with id {id}")
+ logger.warning(f"Failure was {e}")
+ return None
@dataclass
@@ -306,8 +305,8 @@ def url(self, base_url):
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
- base_url = m.group(1)
- return base_url + "/dashboards/" + self.id
+ base_url = m[1]
+ return f"{base_url}/dashboards/{self.id}"
def get_urn_dashboard_id(self):
return f"dashboards.{self.id}"
@@ -350,8 +349,7 @@ def _extract_view_from_field(field: str) -> str:
assert (
field.count(".") == 1
), f"Error: A field must be prefixed by a view name, field is: {field}"
- view_name = field.split(".")[0]
- return view_name
+ return field.split(".")[0]
def _get_views_from_fields(self, fields: List[str]) -> List[str]:
field_set = set(fields)
@@ -449,12 +447,9 @@ def _get_looker_dashboard_element( # noqa: C901
raise ValueError("Element ID can't be None")
if element.query is not None:
- explores = []
fields = self._get_fields_from_query(element.query)
- if element.query.view is not None:
- # Get the explore from the view directly
- explores = [element.query.view]
-
+ # Get the explore from the view directly
+ explores = [element.query.view] if element.query.view is not None else []
logger.debug(
"Element {}: Explores added: {}".format(element.title, explores)
)
@@ -957,7 +952,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
else False,
)
else:
- raise Exception("Unexpected type of event {}".format(event))
+ raise Exception(f"Unexpected type of event {event}")
self.reporter.report_workunit(workunit)
yield workunit
diff --git a/metadata-ingestion/src/datahub/ingestion/source/looker_common.py b/metadata-ingestion/src/datahub/ingestion/source/looker_common.py
index 0c8c496a9041c0..28f2b23c1a258a 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/looker_common.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/looker_common.py
@@ -110,12 +110,8 @@ class LookerExploreNamingConfig(ConfigModel):
def init_naming_pattern(cls, v):
if isinstance(v, NamingPattern):
return v
- else:
- assert isinstance(v, str), "pattern must be a string"
- naming_pattern = NamingPattern(
- allowed_vars=naming_pattern_variables, pattern=v
- )
- return naming_pattern
+ assert isinstance(v, str), "pattern must be a string"
+ return NamingPattern(allowed_vars=naming_pattern_variables, pattern=v)
@validator("explore_naming_pattern", "explore_browse_pattern", always=True)
def validate_naming_pattern(cls, v):
@@ -143,12 +139,8 @@ class LookerViewNamingConfig(ConfigModel):
def init_naming_pattern(cls, v):
if isinstance(v, NamingPattern):
return v
- else:
- assert isinstance(v, str), "pattern must be a string"
- naming_pattern = NamingPattern(
- allowed_vars=naming_pattern_variables, pattern=v
- )
- return naming_pattern
+ assert isinstance(v, str), "pattern must be a string"
+ return NamingPattern(allowed_vars=naming_pattern_variables, pattern=v)
@validator("view_naming_pattern", "view_browse_pattern", always=True)
def validate_naming_pattern(cls, v):
@@ -314,8 +306,7 @@ def _extract_view_from_field(field: str) -> str:
assert (
field.count(".") == 1
), f"Error: A field must be prefixed by a view name, field is: {field}"
- view_name = field.split(".")[0]
- return view_name
+ return field.split(".")[0]
@staticmethod
def _get_field_type(
@@ -336,8 +327,7 @@ def _get_field_type(
)
type_class = NullTypeClass
- data_type = SchemaFieldDataType(type=type_class())
- return data_type
+ return SchemaFieldDataType(type=type_class())
@staticmethod
def _get_schema(
@@ -346,7 +336,7 @@ def _get_schema(
view_fields: List[ViewField],
reporter: SourceReport,
) -> Optional[SchemaMetadataClass]:
- if view_fields == []:
+ if not view_fields:
return None
fields, primary_keys = LookerUtil._get_fields_and_primary_keys(
view_fields=view_fields, reporter=reporter
@@ -633,16 +623,13 @@ def from_api( # noqa: C901
source_file=explore.source_file,
)
except SDKError as e:
- logger.warn(
- "Failed to extract explore {} from model {}.".format(
- explore_name, model
- )
+ logger.warning(
+ f"Failed to extract explore {explore_name} from model {model}."
)
logger.debug(
- "Failed to extract explore {} from model {} with {}".format(
- explore_name, model, e
- )
+ f"Failed to extract explore {explore_name} from model {model} with {e}"
)
+
except AssertionError:
reporter.report_warning(
key="chart-",
@@ -693,7 +680,7 @@ def _get_url(self, base_url):
# If the base_url contains a port number (like https://company.looker.com:19999) remove the port number
m = re.match("^(.*):([0-9]+)$", base_url)
if m is not None:
- base_url = m.group(1)
+ base_url = m[1]
return f"{base_url}/explore/{self.model_name}/{self.name}"
def _to_metadata_events( # noqa: C901
diff --git a/metadata-ingestion/src/datahub/ingestion/source/lookml.py b/metadata-ingestion/src/datahub/ingestion/source/lookml.py
index a08b0bf30bb100..aee876da2f7bac 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/lookml.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/lookml.py
@@ -130,18 +130,17 @@ def from_looker_connection(
".*": _get_generic_definition,
}
- if looker_connection.dialect_name is not None:
- for extractor_pattern, extracting_function in extractors.items():
- if re.match(extractor_pattern, looker_connection.dialect_name):
- (platform, db, schema) = extracting_function(looker_connection)
- return cls(platform=platform, default_db=db, default_schema=schema)
- raise ConfigurationError(
- f"Could not find an appropriate platform for looker_connection: {looker_connection.name} with dialect: {looker_connection.dialect_name}"
- )
- else:
+ if looker_connection.dialect_name is None:
raise ConfigurationError(
f"Unable to fetch a fully filled out connection for {looker_connection.name}. Please check your API permissions."
)
+ for extractor_pattern, extracting_function in extractors.items():
+ if re.match(extractor_pattern, looker_connection.dialect_name):
+ (platform, db, schema) = extracting_function(looker_connection)
+ return cls(platform=platform, default_db=db, default_schema=schema)
+ raise ConfigurationError(
+ f"Could not find an appropriate platform for looker_connection: {looker_connection.name} with dialect: {looker_connection.dialect_name}"
+ )
class LookMLSourceConfig(LookerCommonConfig):
@@ -591,7 +590,7 @@ def from_looker_dict(
if sql_table_name is not None
else None
)
- derived_table = looker_view.get("derived_table", None)
+ derived_table = looker_view.get("derived_table")
dimensions = cls._get_fields(
looker_view.get("dimensions", []), ViewFieldType.DIMENSION
@@ -605,7 +604,7 @@ def from_looker_dict(
fields: List[ViewField] = dimensions + dimension_groups + measures
# also store the view logic and materialization
- view_logic = looker_viewfile.raw_file_content[0:max_file_snippet_length]
+ view_logic = looker_viewfile.raw_file_content[:max_file_snippet_length]
# Parse SQL from derived tables to extract dependencies
if derived_table is not None:
@@ -630,9 +629,7 @@ def from_looker_dict(
if k in ["datagroup_trigger", "sql_trigger_value", "persist_for"]:
materialized = True
if "materialized_view" in derived_table:
- materialized = (
- True if derived_table["materialized_view"] == "yes" else False
- )
+ materialized = derived_table["materialized_view"] == "yes"
view_details = ViewProperties(
materialized=materialized, viewLogic=view_logic, viewLanguage=view_lang
@@ -653,15 +650,11 @@ def from_looker_dict(
)
# If not a derived table, then this view essentially wraps an existing
- # object in the database.
- if sql_table_name is not None:
- # If sql_table_name is set, there is a single dependency in the view, on the sql_table_name.
- sql_table_names = [sql_table_name]
- else:
- # Otherwise, default to the view name as per the docs:
- # https://docs.looker.com/reference/view-params/sql_table_name-for-view
- sql_table_names = [view_name]
-
+ # object in the database. If sql_table_name is set, there is a single
+ # dependency in the view, on the sql_table_name.
+ # Otherwise, default to the view name as per the docs:
+ # https://docs.looker.com/reference/view-params/sql_table_name-for-view
+ sql_table_names = [view_name] if sql_table_name is None else [sql_table_name]
output_looker_view = LookerView(
id=LookerViewId(
project_name=project_name, model_name=model_name, view_name=view_name
@@ -705,7 +698,7 @@ def _extract_metadata_from_sql_query(
# Add those in if we detect that it is missing
if not re.search(r"SELECT\s", sql_query, flags=re.I):
# add a SELECT clause at the beginning
- sql_query = "SELECT " + sql_query
+ sql_query = f"SELECT {sql_query}"
if not re.search(r"FROM\s", sql_query, flags=re.I):
# add a FROM clause at the end
sql_query = f"{sql_query} FROM {sql_table_name if sql_table_name is not None else view_name}"
@@ -714,7 +707,7 @@ def _extract_metadata_from_sql_query(
sql_info = cls._get_sql_info(sql_query, sql_parser_path)
sql_table_names = sql_info.table_names
column_names = sql_info.column_names
- if fields == []:
+ if not fields:
# it seems like the view is defined purely as sql, let's try using the column names to populate the schema
fields = [
# set types to unknown for now as our sql parser doesn't give us column types yet
@@ -843,10 +836,7 @@ def _load_model(self, path: str) -> LookerModel:
return looker_model
def _platform_names_have_2_parts(self, platform: str) -> bool:
- if platform in ["hive", "mysql", "athena"]:
- return True
- else:
- return False
+ return platform in {"hive", "mysql", "athena"}
def _generate_fully_qualified_name(
self, sql_table_name: str, connection_def: LookerConnectionDefinition
@@ -999,7 +989,6 @@ def _get_custom_properties(self, looker_view: LookerView) -> DatasetPropertiesCl
def _build_dataset_mcps(
self, looker_view: LookerView
) -> List[MetadataChangeProposalWrapper]:
- events = []
subTypeEvent = MetadataChangeProposalWrapper(
entityType="dataset",
changeType=ChangeTypeClass.UPSERT,
@@ -1007,7 +996,7 @@ def _build_dataset_mcps(
aspectName="subTypes",
aspect=SubTypesClass(typeNames=["view"]),
)
- events.append(subTypeEvent)
+ events = [subTypeEvent]
if looker_view.view_details is not None:
viewEvent = MetadataChangeProposalWrapper(
entityType="dataset",
@@ -1048,9 +1037,7 @@ def _build_dataset_mce(self, looker_view: LookerView) -> MetadataChangeEvent:
dataset_snapshot.aspects.append(schema_metadata)
dataset_snapshot.aspects.append(self._get_custom_properties(looker_view))
- mce = MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
-
- return mce
+ return MetadataChangeEvent(proposedSnapshot=dataset_snapshot)
def get_project_name(self, model_name: str) -> str:
if self.source_config.project_name is not None:
@@ -1092,7 +1079,7 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
for file_path in model_files:
self.reporter.report_models_scanned()
- model_name = file_path.stem[0:-model_suffix_len]
+ model_name = file_path.stem[:-model_suffix_len]
if not self.source_config.model_pattern.allowed(model_name):
self.reporter.report_models_dropped(model_name)
diff --git a/metadata-ingestion/src/datahub/ingestion/source/metabase.py b/metadata-ingestion/src/datahub/ingestion/source/metabase.py
index 93308ff93b3226..98bcbcba591ebd 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/metabase.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/metabase.py
@@ -1,4 +1,4 @@
-from datetime import datetime
+from datetime import datetime, timezone
from functools import lru_cache
from typing import Dict, Iterable, Optional
@@ -199,7 +199,7 @@ def get_timestamp_millis_from_ts_string(ts_str: str) -> int:
try:
return int(dp.parse(ts_str).timestamp() * 1000)
except (dp.ParserError, OverflowError):
- return int(datetime.utcnow().timestamp() * 1000)
+ return int(datetime.now(timezone.utc).timestamp() * 1000)
def construct_dashboard_from_api_data(
self, dashboard_info: dict
@@ -449,7 +449,7 @@ def get_datasource_urn(self, card_details):
schema_name, table_name = self.get_source_table_from_id(source_table_id)
if table_name:
source_paths.add(
- f"{schema_name + '.' if schema_name else ''}{table_name}"
+ f"{f'{schema_name}.' if schema_name else ''}{table_name}"
)
else:
try:
@@ -478,7 +478,7 @@ def get_datasource_urn(self, card_details):
# Create dataset URNs
dataset_urn = []
- dbname = f"{database_name + '.' if database_name else ''}"
+ dbname = f"{f'{database_name}.' if database_name else ''}"
source_tables = list(map(lambda tbl: f"{dbname}{tbl}", source_paths))
dataset_urn = [
builder.make_dataset_urn_with_platform_instance(
diff --git a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py
index 8d6201867dd8b3..9710fe023a2560 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/mongodb.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/mongodb.py
@@ -172,9 +172,9 @@ def construct_schema_pymongo(
maximum size of the document that will be considered for generating the schema.
"""
- doc_size_field = "temporary_doc_size_field"
aggregations: List[Dict] = []
if is_version_gte_4_4:
+ doc_size_field = "temporary_doc_size_field"
# create a temporary field to store the size of the document. filter on it and then remove it.
aggregations = [
{"$addFields": {doc_size_field: {"$bsonSize": "$$ROOT"}}},
diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py
index bb8ac443555252..9677d5cbd3b5cb 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py
@@ -338,8 +338,8 @@ def __init__(self, config: NifiSourceConfig, ctx: PipelineContext) -> None:
if self.config.site_url_to_site_name is None:
self.config.site_url_to_site_name = {}
if (
- not urljoin(self.config.site_url, "/nifi/")
- in self.config.site_url_to_site_name
+ urljoin(self.config.site_url, "/nifi/")
+ not in self.config.site_url_to_site_name
):
self.config.site_url_to_site_name[
urljoin(self.config.site_url, "/nifi/")
@@ -620,7 +620,7 @@ def create_nifi_flow(self):
if about_response.ok:
nifi_version = about_response.json().get("about", {}).get("version")
else:
- logger.warn("Failed to fetch version for nifi")
+ logger.warning("Failed to fetch version for nifi")
cluster_response = self.session.get(
url=urljoin(self.config.site_url, CLUSTER_ENDPOINT)
)
@@ -630,7 +630,7 @@ def create_nifi_flow(self):
cluster_response.json().get("clusterSummary", {}).get("clustered")
)
else:
- logger.warn("Failed to fetch cluster summary for flow")
+ logger.warning("Failed to fetch cluster summary for flow")
pg_response = self.session.get(
url=urljoin(self.config.site_url, PG_ENDPOINT) + "root"
)
@@ -715,7 +715,7 @@ def fetch_provenance_events(
attempts = 5 # wait for at most 5 attempts 5*1= 5 seconds
while (not provenance.get("finished", False)) and attempts > 0:
- logger.warn(
+ logger.warning(
f"Provenance query not completed, attempts left : {attempts}"
)
# wait until the uri returns percentcomplete 100
@@ -757,7 +757,7 @@ def fetch_provenance_events(
f"provenance events could not be fetched for processor \
{processor.id} of type {processor.name}",
)
- logger.warn(provenance_response.text)
+ logger.warning(provenance_response.text)
return
def report_warning(self, key: str, reason: str) -> None:
@@ -774,7 +774,7 @@ def construct_workunits(self) -> Iterable[MetadataWorkUnit]: # noqa: C901
rootpg = self.nifi_flow.root_process_group
flow_name = rootpg.name # self.config.site_name
flow_urn = builder.make_data_flow_urn(NIFI, rootpg.id, self.config.env)
- flow_properties = dict()
+ flow_properties = {}
if self.nifi_flow.clustered is not None:
flow_properties["clustered"] = str(self.nifi_flow.clustered)
if self.nifi_flow.version is not None:
diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi.py b/metadata-ingestion/src/datahub/ingestion/source/openapi.py
index b71cb363b96e46..9548677e1cdc11 100755
--- a/metadata-ingestion/src/datahub/ingestion/source/openapi.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/openapi.py
@@ -118,7 +118,7 @@ class ApiWorkUnit(MetadataWorkUnit):
class APISource(Source, ABC):
"""
- This plugin is meant to gather dataset-like informations about OpenApi Endpoints.
+ This plugin is meant to gather dataset-like information about OpenApi Endpoints.
As example, if by calling GET at the endpoint at `https://test_endpoint.com/api/users/` you obtain as result:
```JSON
diff --git a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py
index 830b6562755eb7..f33654daa15595 100755
--- a/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/openapi_parser.py
@@ -20,9 +20,9 @@
def flatten(d: dict, prefix: str = "") -> Generator:
for k, v in d.items():
if isinstance(v, dict):
- yield from flatten(v, prefix + "." + k)
+ yield from flatten(v, f"{prefix}.{k}")
else:
- yield (prefix + "-" + k).strip(".")
+ yield f"{prefix}-{k}".strip(".")
def flatten2list(d: dict) -> list:
@@ -53,15 +53,15 @@ def request_call(
headers = {"accept": "application/json"}
if username is not None and password is not None:
- response = requests.get(
+ return requests.get(
url, headers=headers, auth=HTTPBasicAuth(username, password)
)
+
elif token is not None:
- headers["Authorization"] = "Bearer " + token
- response = requests.get(url, headers=headers)
+ headers["Authorization"] = f"Bearer {token}"
+ return requests.get(url, headers=headers)
else:
- response = requests.get(url, headers=headers)
- return response
+ return requests.get(url, headers=headers)
def get_swag_json(
@@ -77,14 +77,13 @@ def get_swag_json(
else:
response = request_call(url=tot_url, username=username, password=password)
- if response.status_code == 200:
- try:
- dict_data = json.loads(response.content)
- except json.JSONDecodeError: # it's not a JSON!
- dict_data = yaml.safe_load(response.content)
- return dict_data
- else:
+ if response.status_code != 200:
raise Exception(f"Unable to retrieve {tot_url}, error {response.status_code}")
+ try:
+ dict_data = json.loads(response.content)
+ except json.JSONDecodeError: # it's not a JSON!
+ dict_data = yaml.safe_load(response.content)
+ return dict_data
def get_url_basepath(sw_dict: dict) -> str:
@@ -95,7 +94,7 @@ def get_url_basepath(sw_dict: dict) -> str:
def check_sw_version(sw_dict: dict) -> None:
- if "swagger" in sw_dict.keys():
+ if "swagger" in sw_dict:
v_split = sw_dict["swagger"].split(".")
else:
v_split = sw_dict["openapi"].split(".")
@@ -176,8 +175,7 @@ def get_endpoints(sw_dict: dict) -> dict: # noqa: C901
if "parameters" in p_o["get"].keys():
url_details[p_k]["parameters"] = p_o["get"]["parameters"]
- ord_d = dict(sorted(url_details.items())) # sorting for convenience
- return ord_d
+ return dict(sorted(url_details.items()))
def guessing_url_name(url: str, examples: dict) -> str:
@@ -187,10 +185,7 @@ def guessing_url_name(url: str, examples: dict) -> str:
extr_data = {"advancedcomputersearches": {'id': 202, 'name': '_unmanaged'}}
-->> guessed_url = /advancedcomputersearches/name/_unmanaged/id/202'
"""
- if url[0] == "/":
- url2op = url[1:] # operational url does not need the very first /
- else:
- url2op = url
+ url2op = url[1:] if url[0] == "/" else url
divisions = url2op.split("/")
# the very first part of the url should stay the same.
@@ -211,14 +206,14 @@ def guessing_url_name(url: str, examples: dict) -> str:
if div_pos > 0:
root = root[: div_pos - 1] # like "base/field" should become "base"
- if root in examples.keys():
+ if root in examples:
# if our root is contained in our samples examples...
ex2use = root
- elif root[:-1] in examples.keys():
+ elif root[:-1] in examples:
ex2use = root[:-1]
- elif root.replace("/", ".") in examples.keys():
+ elif root.replace("/", ".") in examples:
ex2use = root.replace("/", ".")
- elif root[:-1].replace("/", ".") in examples.keys():
+ elif root[:-1].replace("/", ".") in examples:
ex2use = root[:-1].replace("/", ".")
else:
return url
@@ -277,8 +272,7 @@ def try_guessing(url: str, examples: dict) -> str:
Any non-guessed name will stay as it was (with parenthesis{})
"""
url_guess = guessing_url_name(url, examples) # try to fill with known informations
- url_guess_id = maybe_theres_simple_id(url_guess) # try to fill IDs with "1"s...
- return url_guess_id
+ return maybe_theres_simple_id(url_guess)
def clean_url(url: str) -> str:
diff --git a/metadata-ingestion/src/datahub/ingestion/source/powerbi.py b/metadata-ingestion/src/datahub/ingestion/source/powerbi.py
index 5cfba5fa2ec14b..ff0924349c0e91 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/powerbi.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/powerbi.py
@@ -131,7 +131,7 @@ class PowerBiAPIConfig(EnvBasedSourceConfigBase):
authority = "https://login.microsoftonline.com/"
def get_authority_url(self):
- return "{}{}".format(self.authority, self.tenant_id)
+ return f"{self.authority}{self.tenant_id}"
class PowerBiDashboardSourceConfig(PowerBiAPIConfig):
@@ -216,7 +216,7 @@ class Table:
tables: List[Any]
def get_urn_part(self):
- return "datasets.{}".format(self.id)
+ return f"datasets.{self.id}"
def __members(self):
return (self.id,)
@@ -239,7 +239,7 @@ class Report:
dataset: Any
def get_urn_part(self):
- return "reports.{}".format(self.id)
+ return f"reports.{self.id}"
@dataclass
class Tile:
@@ -257,7 +257,7 @@ class CreatedFrom(Enum):
createdFrom: CreatedFrom
def get_urn_part(self):
- return "charts.{}".format(self.id)
+ return f"charts.{self.id}"
@dataclass
class User:
@@ -269,7 +269,7 @@ class User:
principalType: str
def get_urn_part(self):
- return "users.{}".format(self.id)
+ return f"users.{self.id}"
def __members(self):
return (self.id,)
@@ -296,7 +296,7 @@ class Dashboard:
users: List[Any]
def get_urn_part(self):
- return "dashboards.{}".format(self.id)
+ return f"dashboards.{self.id}"
def __members(self):
return (self.id,)
@@ -322,9 +322,9 @@ def __init__(self, config: PowerBiAPIConfig) -> None:
)
# Test connection by generating a access token
- LOGGER.info("Trying to connect to {}".format(self.__config.get_authority_url()))
+ LOGGER.info(f"Trying to connect to {self.__config.get_authority_url()}")
self.get_access_token()
- LOGGER.info("Able to connect to {}".format(self.__config.get_authority_url()))
+ LOGGER.info(f"Able to connect to {self.__config.get_authority_url()}")
def __get_users(self, workspace_id: str, entity: str, id: str) -> List[User]:
"""
@@ -338,7 +338,7 @@ def __get_users(self, workspace_id: str, entity: str, id: str) -> List[User]:
ENTITY_ID=id,
)
# Hit PowerBi
- LOGGER.info("Request to URL={}".format(user_list_endpoint))
+ LOGGER.info(f"Request to URL={user_list_endpoint}")
response = requests.get(
url=user_list_endpoint,
headers={Constant.Authorization: self.get_access_token()},
@@ -347,13 +347,12 @@ def __get_users(self, workspace_id: str, entity: str, id: str) -> List[User]:
# Check if we got response from PowerBi
if response.status_code != 200:
LOGGER.warning(
- "Failed to fetch user list from power-bi for, http_status={}, message={}".format(
- response.status_code, response.text
- )
+ f"Failed to fetch user list from power-bi for, http_status={response.status_code}, message={response.text}"
)
- LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id))
- LOGGER.info("{}={}".format(Constant.ENTITY, entity))
- LOGGER.info("{}={}".format(Constant.ID, id))
+
+ LOGGER.info(f"{Constant.WorkspaceId}={workspace_id}")
+ LOGGER.info(f"{Constant.ENTITY}={entity}")
+ LOGGER.info(f"{Constant.ID}={id}")
raise ConnectionError("Failed to fetch the user list from the power-bi")
users_dict: List[Any] = response.json()[Constant.VALUE]
@@ -379,8 +378,8 @@ def __get_report(self, workspace_id: str, report_id: str) -> Any:
"""
if workspace_id is None or report_id is None:
LOGGER.info("Input values are None")
- LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id))
- LOGGER.info("{}={}".format(Constant.ReportId, report_id))
+ LOGGER.info(f"{Constant.WorkspaceId}={workspace_id}")
+ LOGGER.info(f"{Constant.ReportId}={report_id}")
return None
report_get_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.REPORT_GET]
@@ -391,7 +390,7 @@ def __get_report(self, workspace_id: str, report_id: str) -> Any:
REPORT_ID=report_id,
)
# Hit PowerBi
- LOGGER.info("Request to report URL={}".format(report_get_endpoint))
+ LOGGER.info(f"Request to report URL={report_get_endpoint}")
response = requests.get(
url=report_get_endpoint,
headers={Constant.Authorization: self.get_access_token()},
@@ -401,8 +400,8 @@ def __get_report(self, workspace_id: str, report_id: str) -> Any:
if response.status_code != 200:
message: str = "Failed to fetch report from power-bi for"
LOGGER.warning(message)
- LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace_id))
- LOGGER.warning("{}={}".format(Constant.ReportId, report_id))
+ LOGGER.warning(f"{Constant.WorkspaceId}={workspace_id}")
+ LOGGER.warning(f"{Constant.ReportId}={report_id}")
raise ConnectionError(message)
response_dict = response.json()
@@ -440,7 +439,7 @@ def get_access_token(self):
self.__access_token = "Bearer {}".format(auth_response.get("access_token"))
- LOGGER.debug("{}={}".format(Constant.PBIAccessToken, self.__access_token))
+ LOGGER.debug(f"{Constant.PBIAccessToken}={self.__access_token}")
return self.__access_token
@@ -464,7 +463,7 @@ def get_dashboards(self, workspace: Workspace) -> List[Dashboard]:
POWERBI_BASE_URL=self.__config.base_url, WORKSPACE_ID=workspace.id
)
# Hit PowerBi
- LOGGER.info("Request to URL={}".format(dashboard_list_endpoint))
+ LOGGER.info(f"Request to URL={dashboard_list_endpoint}")
response = requests.get(
url=dashboard_list_endpoint,
headers={Constant.Authorization: self.get_access_token()},
@@ -473,7 +472,7 @@ def get_dashboards(self, workspace: Workspace) -> List[Dashboard]:
# Check if we got response from PowerBi
if response.status_code != 200:
LOGGER.warning("Failed to fetch dashboard list from power-bi for")
- LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace.id))
+ LOGGER.warning(f"{Constant.WorkspaceId}={workspace.id}")
raise ConnectionError(
"Failed to fetch the dashboard list from the power-bi"
)
@@ -505,8 +504,8 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
"""
if workspace_id is None or dataset_id is None:
LOGGER.info("Input values are None")
- LOGGER.info("{}={}".format(Constant.WorkspaceId, workspace_id))
- LOGGER.info("{}={}".format(Constant.DatasetId, dataset_id))
+ LOGGER.info(f"{Constant.WorkspaceId}={workspace_id}")
+ LOGGER.info(f"{Constant.DatasetId}={dataset_id}")
return None
dataset_get_endpoint: str = PowerBiAPI.API_ENDPOINTS[Constant.DATASET_GET]
@@ -517,7 +516,7 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
DATASET_ID=dataset_id,
)
# Hit PowerBi
- LOGGER.info("Request to dataset URL={}".format(dataset_get_endpoint))
+ LOGGER.info(f"Request to dataset URL={dataset_get_endpoint}")
response = requests.get(
url=dataset_get_endpoint,
headers={Constant.Authorization: self.get_access_token()},
@@ -527,8 +526,8 @@ def get_dataset(self, workspace_id: str, dataset_id: str) -> Any:
if response.status_code != 200:
message: str = "Failed to fetch dataset from power-bi for"
LOGGER.warning(message)
- LOGGER.warning("{}={}".format(Constant.WorkspaceId, workspace_id))
- LOGGER.warning("{}={}".format(Constant.DatasetId, dataset_id))
+ LOGGER.warning(f"{Constant.WorkspaceId}={workspace_id}")
+ LOGGER.warning(f"{Constant.DatasetId}={dataset_id}")
raise ConnectionError(message)
response_dict = response.json()
@@ -558,7 +557,7 @@ def get_data_source(self, dataset: Dataset) -> Any:
DATASET_ID=dataset.id,
)
# Hit PowerBi
- LOGGER.info("Request to datasource URL={}".format(datasource_get_endpoint))
+ LOGGER.info(f"Request to datasource URL={datasource_get_endpoint}")
response = requests.get(
url=datasource_get_endpoint,
headers={Constant.Authorization: self.get_access_token()},
@@ -568,18 +567,17 @@ def get_data_source(self, dataset: Dataset) -> Any:
if response.status_code != 200:
message: str = "Failed to fetch datasource from power-bi for"
LOGGER.warning(message)
- LOGGER.warning("{}={}".format(Constant.WorkspaceId, dataset.workspace_id))
- LOGGER.warning("{}={}".format(Constant.DatasetId, dataset.id))
+ LOGGER.warning(f"{Constant.WorkspaceId}={dataset.workspace_id}")
+ LOGGER.warning(f"{Constant.DatasetId}={dataset.id}")
raise ConnectionError(message)
res = response.json()
value = res["value"]
if len(value) == 0:
LOGGER.info(
- "datasource is not found for dataset {}({})".format(
- dataset.name, dataset.id
- )
+ f"datasource is not found for dataset {dataset.name}({dataset.id})"
)
+
return None
# Consider only zero index datasource
datasource_dict = value[0]
@@ -646,11 +644,7 @@ def new_dataset_or_report(tile_instance: Any) -> dict:
report_fields["createdFrom"] = PowerBiAPI.Tile.CreatedFrom.VISUALIZATION
LOGGER.info(
- "Tile {}({}) is created from {}".format(
- tile_instance.get("title"),
- tile_instance.get("id"),
- report_fields["createdFrom"],
- )
+ f'Tile {tile_instance.get("title")}({tile_instance.get("id")}) is created from {report_fields["createdFrom"]}'
)
return report_fields
@@ -721,9 +715,7 @@ def create_scan_job():
)
if res.status_code not in (200, 202):
- message = "API({}) return error code {} for workpace id({})".format(
- scan_create_endpoint, res.status_code, workspace_id
- )
+ message = f"API({scan_create_endpoint}) return error code {res.status_code} for workspace id({workspace_id})"
LOGGER.warning(message)
@@ -740,46 +732,40 @@ def wait_for_scan_to_complete(scan_id: str, timeout: int) -> Boolean:
minimum_sleep = 3
if timeout < minimum_sleep:
LOGGER.info(
- "Setting timeout to minimum_sleep time {} seconds".format(
- minimum_sleep
- )
+ f"Setting timeout to minimum_sleep time {minimum_sleep} seconds"
)
timeout = minimum_sleep
- max_trial = int(timeout / minimum_sleep)
- LOGGER.info("Max trial {}".format(max_trial))
+ max_trial = timeout // minimum_sleep
+ LOGGER.info(f"Max trial {max_trial}")
scan_get_endpoint = PowerBiAPI.API_ENDPOINTS[Constant.SCAN_GET]
scan_get_endpoint = scan_get_endpoint.format(
POWERBI_ADMIN_BASE_URL=self.__config.admin_base_url, SCAN_ID=scan_id
)
- LOGGER.info("Hitting URL={}".format(scan_get_endpoint))
+ LOGGER.info(f"Hitting URL={scan_get_endpoint}")
trail = 1
while True:
- LOGGER.info("Trial = {}".format(trail))
+ LOGGER.info(f"Trial = {trail}")
res = requests.get(
scan_get_endpoint,
headers={Constant.Authorization: self.get_access_token()},
)
if res.status_code != 200:
- message = "API({}) return error code {} for scan id({})".format(
- scan_get_endpoint, res.status_code, scan_id
- )
+ message = f"API({scan_get_endpoint}) return error code {res.status_code} for scan id({scan_id})"
LOGGER.warning(message)
raise ConnectionError(message)
if res.json()["status"].upper() == "Succeeded".upper():
- LOGGER.info(
- "Scan result is available for scan id({})".format(scan_id)
- )
+ LOGGER.info(f"Scan result is available for scan id({scan_id})")
return True
if trail == max_trial:
break
- LOGGER.info("Sleeping for {} seconds".format(minimum_sleep))
+ LOGGER.info(f"Sleeping for {minimum_sleep} seconds")
sleep(minimum_sleep)
trail += 1
@@ -788,7 +774,7 @@ def wait_for_scan_to_complete(scan_id: str, timeout: int) -> Boolean:
def get_scan_result(scan_id: str) -> dict:
LOGGER.info("Fetching scan result")
- LOGGER.info("{}={}".format(Constant.SCAN_ID, scan_id))
+ LOGGER.info(f"{Constant.SCAN_ID}={scan_id}")
scan_result_get_endpoint = PowerBiAPI.API_ENDPOINTS[
Constant.SCAN_RESULT_GET
]
@@ -796,15 +782,13 @@ def get_scan_result(scan_id: str) -> dict:
POWERBI_ADMIN_BASE_URL=self.__config.admin_base_url, SCAN_ID=scan_id
)
- LOGGER.info("Hittin URL={}".format(scan_result_get_endpoint))
+ LOGGER.info(f"Hitting URL={scan_result_get_endpoint}")
res = requests.get(
scan_result_get_endpoint,
headers={Constant.Authorization: self.get_access_token()},
)
if res.status_code != 200:
- message = "API({}) return error code {} for scan id({})".format(
- scan_result_get_endpoint, res.status_code, scan_id
- )
+ message = f"API({scan_result_get_endpoint}) return error code {res.status_code} for scan id({scan_id})"
LOGGER.warning(message)
@@ -821,10 +805,9 @@ def json_to_dataset_map(scan_result: dict) -> dict:
if datasets is None or len(datasets) == 0:
LOGGER.warning(
- "Workspace {}({}) does not have datasets".format(
- scan_result["name"], scan_result["id"]
- )
+ f'Workspace {scan_result["name"]}({scan_result["id"]}) does not have datasets'
)
+
LOGGER.info("Returning empty datasets")
return dataset_map
@@ -844,18 +827,15 @@ def json_to_dataset_map(scan_result: dict) -> dict:
and dataset_instance.datasource.metadata.is_relational is True
):
LOGGER.info(
- "Processing tables attribute for dataset {}({})".format(
- dataset_instance.name, dataset_instance.id
- )
+ f"Processing tables attribute for dataset {dataset_instance.name}({dataset_instance.id})"
)
for table in dataset_dict["tables"]:
if "Value.NativeQuery(" in table["source"][0]["expression"]:
LOGGER.warning(
- "Table {} is created from Custom SQL. Ignoring in processing".format(
- table["name"]
- )
+ f'Table {table["name"]} is created from Custom SQL. Ignoring in processing'
)
+
continue
# PowerBi table name contains schema name and table name. Format is
@@ -976,28 +956,24 @@ def __to_datahub_dataset(
or dataset.datasource.metadata.is_relational is False
):
LOGGER.warning(
- "Dataset {}({}) is not created from relational datasource".format(
- dataset.name, dataset.id
- )
+ f"Dataset {dataset.name}({dataset.id}) is not created from relational datasource"
)
+
return dataset_mcps
LOGGER.info(
- "Converting dataset={}(id={}) to datahub dataset".format(
- dataset.name, dataset.id
- )
+ f"Converting dataset={dataset.name}(id={dataset.id}) to datahub dataset"
)
for table in dataset.tables:
# Create an URN for dataset
ds_urn = builder.make_dataset_urn(
platform=self.__config.dataset_type_mapping[dataset.datasource.type],
- name="{}.{}.{}".format(
- dataset.datasource.database, table.schema_name, table.name
- ),
+ name=f"{dataset.datasource.database}.{table.schema_name}.{table.name}",
env=self.__config.env,
)
- LOGGER.info("{}={}".format(Constant.Dataset_URN, ds_urn))
+
+ LOGGER.info(f"{Constant.Dataset_URN}={ds_urn}")
# Create datasetProperties mcp
ds_properties = DatasetPropertiesClass(description=table.name)
@@ -1206,9 +1182,7 @@ def to_datahub_user(
"""
LOGGER.info(
- "Converting user {}(id={}) to datahub's user".format(
- user.displayName, user.id
- )
+ f"Converting user {user.displayName}(id={user.id}) to datahub's user"
)
# Create an URN for user
@@ -1266,10 +1240,10 @@ def to_datahub_chart(
chart_mcps = []
# Return empty list if input list is empty
- if len(tiles) == 0:
+ if not tiles:
return [], []
- LOGGER.info("Converting tiles(count={}) to charts".format(len(tiles)))
+ LOGGER.info(f"Converting tiles(count={len(tiles)}) to charts")
for tile in tiles:
if tile is None:
@@ -1292,7 +1266,7 @@ def to_datahub_work_units(
mcps = []
LOGGER.info(
- "Converting dashboard={} to datahub dashboard".format(dashboard.displayName)
+ f"Converting dashboard={dashboard.displayName} to datahub dashboard"
)
# Convert user to CorpUser
@@ -1391,12 +1365,10 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
self.reporter.report_dashboards_scanned()
self.reporter.report_charts_scanned(count=len(dashboard.tiles))
except Exception as e:
- message = "Error ({}) occurred while loading dashboard {}(id={}) tiles.".format(
- e, dashboard.displayName, dashboard.id
- )
+ message = f"Error ({e}) occurred while loading dashboard {dashboard.displayName}(id={dashboard.id}) tiles."
+
LOGGER.exception(message, e)
self.reporter.report_warning(dashboard.id, message)
-
# Convert PowerBi Dashboard and child entities to Datahub work unit to ingest into Datahub
workunits = self.mapper.to_datahub_work_units(dashboard)
for workunit in workunits:
diff --git a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py
index e4d9a505ea7210..ffc0253a070b2d 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/pulsar.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/pulsar.py
@@ -98,7 +98,7 @@ def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext):
self.platform: str = "pulsar"
self.config: PulsarSourceConfig = config
self.report: PulsarSourceReport = PulsarSourceReport()
- self.base_url: str = self.config.web_service_url + "/admin/v2"
+ self.base_url: str = f"{self.config.web_service_url}/admin/v2"
self.tenants: List[str] = config.tenants
if (
@@ -120,7 +120,7 @@ def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext):
if self._is_oauth_authentication_configured():
# Get OpenId configuration from issuer, e.g. token_endpoint
oid_config_url = (
- "%s/.well-known/openid-configuration" % self.config.issuer_url
+ f"{self.config.issuer_url}/.well-known/openid-configuration"
)
oid_config_response = requests.get(
oid_config_url, verify=False, allow_redirects=False
@@ -130,8 +130,7 @@ def __init__(self, config: PulsarSourceConfig, ctx: PipelineContext):
self.config.oid_config.update(oid_config_response.json())
else:
logger.error(
- "Unexpected response while getting discovery document using %s : %s"
- % (oid_config_url, oid_config_response)
+ f"Unexpected response while getting discovery document using {oid_config_url} : {oid_config_response}"
)
if "token_endpoint" not in self.config.oid_config:
@@ -325,15 +324,14 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Report the Pulsar broker version we are communicating with
self.report.report_pulsar_version(
self.session.get(
- "%s/brokers/version" % self.base_url,
- timeout=self.config.timeout,
+ f"{self.base_url}/brokers/version", timeout=self.config.timeout
).text
)
# If no tenants are provided, request all tenants from cluster using /admin/v2/tenants endpoint.
# Requesting cluster tenant information requires superuser privileges
if not self.tenants:
- self.tenants = self._get_pulsar_metadata(self.base_url + "/tenants") or []
+ self.tenants = self._get_pulsar_metadata(f"{self.base_url}/tenants") or []
# Initialize counters
self.report.tenants_scanned = 0
@@ -346,9 +344,10 @@ def get_workunits(self) -> Iterable[MetadataWorkUnit]:
# Get namespaces belonging to a tenant, /admin/v2/%s/namespaces
# A tenant admin role has sufficient privileges to perform this action
namespaces = (
- self._get_pulsar_metadata(self.base_url + "/namespaces/%s" % tenant)
+ self._get_pulsar_metadata(f"{self.base_url}/namespaces/{tenant}")
or []
)
+
for namespace in namespaces:
self.report.namespaces_scanned += 1
if self.config.namespace_patterns.allowed(namespace):
@@ -406,14 +405,10 @@ def _add_topic_to_checkpoint(self, topic: str) -> None:
)
def _is_token_authentication_configured(self) -> bool:
- if self.config.token is not None:
- return True
- return False
+ return self.config.token is not None
def _is_oauth_authentication_configured(self) -> bool:
- if self.config.issuer_url is not None:
- return True
- return False
+ return self.config.issuer_url is not None
def _get_schema_and_fields(
self, pulsar_topic: PulsarTopic, is_key_schema: bool
@@ -421,10 +416,9 @@ def _get_schema_and_fields(
pulsar_schema: Optional[PulsarSchema] = None
- schema_url = self.base_url + "/schemas/%s/%s/%s/schema" % (
- pulsar_topic.tenant,
- pulsar_topic.namespace,
- pulsar_topic.topic,
+ schema_url = (
+ self.base_url
+ + f"/schemas/{pulsar_topic.tenant}/{pulsar_topic.namespace}/{pulsar_topic.topic}/schema"
)
schema_payload = self._get_pulsar_metadata(schema_url)
@@ -449,7 +443,7 @@ def _get_schema_fields(
) -> List[SchemaField]:
# Parse the schema and convert it to SchemaFields.
fields: List[SchemaField] = []
- if schema.schema_type == "AVRO" or schema.schema_type == "JSON":
+ if schema.schema_type in ["AVRO", "JSON"]:
# Extract fields from schema and get the FQN for the schema
fields = schema_util.avro_schema_to_mce_fields(
schema.schema_str, is_key_schema=is_key_schema
@@ -465,6 +459,7 @@ def _get_schema_metadata(
self, pulsar_topic: PulsarTopic, platform_urn: str
) -> Tuple[Optional[PulsarSchema], Optional[SchemaMetadata]]:
+ # FIXME: Type annotations are not working for this function.
schema, fields = self._get_schema_and_fields(
pulsar_topic=pulsar_topic, is_key_schema=False
) # type: Tuple[Optional[PulsarSchema], List[SchemaField]]
diff --git a/metadata-ingestion/src/datahub/ingestion/source/redash.py b/metadata-ingestion/src/datahub/ingestion/source/redash.py
index 2abd61849ac260..aa1d093e02e85f 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/redash.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/redash.py
@@ -203,29 +203,33 @@ def get_full_qualified_name(self, database_name: str, table_name: str) -> str:
def get_full_qualified_name(platform: str, database_name: str, table_name: str) -> str:
- if platform == "postgres":
- full_qualified_name = PostgresQualifiedNameParser().get_full_qualified_name(
+ if platform == "athena":
+ return AthenaQualifiedNameParser().get_full_qualified_name(
database_name, table_name
)
- elif platform == "mysql":
- full_qualified_name = MysqlQualifiedNameParser().get_full_qualified_name(
+
+ elif platform == "bigquery":
+ return BigqueryQualifiedNameParser().get_full_qualified_name(
database_name, table_name
)
+
elif platform == "mssql":
- full_qualified_name = MssqlQualifiedNameParser().get_full_qualified_name(
+ return MssqlQualifiedNameParser().get_full_qualified_name(
database_name, table_name
)
- elif platform == "athena":
- full_qualified_name = AthenaQualifiedNameParser().get_full_qualified_name(
+
+ elif platform == "mysql":
+ return MysqlQualifiedNameParser().get_full_qualified_name(
database_name, table_name
)
- elif platform == "bigquery":
- full_qualified_name = BigqueryQualifiedNameParser().get_full_qualified_name(
+
+ elif platform == "postgres":
+ return PostgresQualifiedNameParser().get_full_qualified_name(
database_name, table_name
)
+
else:
- full_qualified_name = f"{database_name}.{table_name}"
- return full_qualified_name
+ return f"{database_name}.{table_name}"
class RedashConfig(ConfigModel):
@@ -405,8 +409,7 @@ def _get_platform_based_on_datasource(self, data_source: Dict) -> str:
map = REDASH_DATA_SOURCE_TO_DATAHUB_MAP.get(
data_source_type, {"platform": DEFAULT_DATA_SOURCE_PLATFORM}
)
- platform = map.get("platform", DEFAULT_DATA_SOURCE_PLATFORM)
- return platform
+ return map.get("platform", DEFAULT_DATA_SOURCE_PLATFORM)
return DEFAULT_DATA_SOURCE_PLATFORM
def _get_database_name_based_on_datasource(
@@ -597,7 +600,7 @@ def _process_dashboard_response(
# Tested the same with a Redash instance
dashboard_id = dashboard_response["id"]
dashboard_data = self.client._get(
- "api/dashboards/{}".format(dashboard_id)
+ f"api/dashboards/{dashboard_id}"
).json()
except Exception:
# This does not work in our testing but keeping for now because
@@ -686,9 +689,7 @@ def _get_chart_snapshot(self, query_data: Dict, viz_data: Dict) -> ChartSnapshot
chart_type = self._get_chart_type_from_viz_data(viz_data)
query_id = query_data.get("id")
chart_url = f"{self.config.connect_uri}/queries/{query_id}#{viz_id}"
- description = (
- viz_data.get("description", "") if viz_data.get("description", "") else ""
- )
+ description = viz_data.get("description", "") or ""
data_source_id = query_data.get("data_source_id")
data_source = self._get_chart_data_source(data_source_id)
data_source_type = data_source.get("type")
diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau.py b/metadata-ingestion/src/datahub/ingestion/source/tableau.py
index 5fed3898df322e..ab979512524ac2 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/tableau.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/tableau.py
@@ -423,22 +423,20 @@ def _create_upstream_table_lineage(
table_path = None
if project and datasource.get("name"):
- table_name = table.get("name") if table.get("name") else table["id"]
+ table_name = table.get("name") or table["id"]
table_path = f"{project.replace('/', REPLACE_SLASH_CHAR)}/{datasource['name']}/{table_name}"
self.upstream_tables[table_urn] = (
table.get("columns", []),
table_path,
- table.get("isEmbedded") if table.get("isEmbedded") else False,
+ table.get("isEmbedded") or False,
)
return upstream_tables
def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
- count_on_query = self.config.page_size
- custom_sql_filter = "idWithin: {}".format(
- json.dumps(self.custom_sql_ids_being_used)
- )
+ count_on_query = len(self.custom_sql_ids_being_used)
+ custom_sql_filter = f"idWithin: {json.dumps(self.custom_sql_ids_being_used)}"
custom_sql_connection, total_count, has_next_page = self.get_connection_object(
custom_sql_graphql_query, "customSQLTablesConnection", custom_sql_filter
)
@@ -516,7 +514,7 @@ def emit_custom_sql_datasources(self) -> Iterable[MetadataWorkUnit]:
dataset_snapshot.aspects.append(schema_metadata)
# Browse path
- csql_name = csql.get("name") if csql.get("name") else csql_id
+ csql_name = csql.get("name") or csql_id
if project and datasource_name:
browse_paths = BrowsePathsClass(
@@ -630,7 +628,6 @@ def _get_schema_metadata_for_datasource(
self, datasource_fields: List[dict]
) -> Optional[SchemaMetadata]:
fields = []
- schema_metadata = None
for field in datasource_fields:
# check datasource - custom sql relations from a field being referenced
self._track_custom_sql_ids(field)
@@ -657,8 +654,8 @@ def _get_schema_metadata_for_datasource(
)
fields.append(schema_field)
- if fields:
- schema_metadata = SchemaMetadata(
+ return (
+ SchemaMetadata(
schemaName="test",
platform=f"urn:li:dataPlatform:{self.platform}",
version=0,
@@ -666,8 +663,9 @@ def _get_schema_metadata_for_datasource(
hash="",
platformSchema=OtherSchema(rawSchema=""),
)
-
- return schema_metadata
+ if fields
+ else None
+ )
def get_metadata_change_event(
self, snap_shot: Union["DatasetSnapshot", "DashboardSnapshot", "ChartSnapshot"]
@@ -722,9 +720,7 @@ def emit_datasource(
aspects=[],
)
- datasource_name = (
- datasource.get("name") if datasource.get("name") else datasource_id
- )
+ datasource_name = datasource.get("name") or datasource_id
if is_embedded_ds and workbook and workbook.get("name"):
datasource_name = f"{workbook['name']}/{datasource_name}"
# Browse path
@@ -804,10 +800,8 @@ def emit_datasource(
)
def emit_published_datasources(self) -> Iterable[MetadataWorkUnit]:
- count_on_query = self.config.page_size
- datasource_filter = "idWithin: {}".format(
- json.dumps(self.datasource_ids_being_used)
- )
+ count_on_query = len(self.datasource_ids_being_used)
+ datasource_filter = f"idWithin: {json.dumps(self.datasource_ids_being_used)}"
(
published_datasource_conn,
total_count,
@@ -958,7 +952,7 @@ def emit_sheets_as_charts(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
chart_snapshot.aspects.append(chart_info)
if workbook.get("projectName") and workbook.get("name"):
- sheet_name = sheet.get("name") if sheet.get("name") else sheet["id"]
+ sheet_name = sheet.get("name") or sheet["id"]
# Browse path
browse_path = BrowsePathsClass(
paths=[
@@ -1075,7 +1069,7 @@ def emit_dashboards(self, workbook: Dict) -> Iterable[MetadataWorkUnit]:
dashboard_snapshot.aspects.append(dashboard_info_class)
if workbook.get("projectName") and workbook.get("name"):
- dashboard_name = title if title else dashboard["id"]
+ dashboard_name = title or dashboard["id"]
# browse path
browse_paths = BrowsePathsClass(
paths=[
@@ -1129,7 +1123,7 @@ def _get_schema(self, schema_provided: str, database: str, fullName: str) -> str
def _extract_schema_from_fullName(self, fullName: str) -> str:
# fullName is observed to be in format [schemaName].[tableName]
# OR simply tableName OR [tableName]
- if fullName.startswith("[") and fullName.find("].[") >= 0:
+ if fullName.startswith("[") and "].[" in fullName:
return fullName[1 : fullName.index("]")]
return ""
diff --git a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py
index 941cd9a9ab8934..6296af05d14616 100644
--- a/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py
+++ b/metadata-ingestion/src/datahub/ingestion/source/tableau_common.py
@@ -433,8 +433,7 @@ def make_table_urn(
# if there are more than 3 tokens, just take the final 3
fully_qualified_table_name = ".".join(fully_qualified_table_name.split(".")[-3:])
- urn = builder.make_dataset_urn(platform, fully_qualified_table_name, env)
- return urn
+ return builder.make_dataset_urn(platform, fully_qualified_table_name, env)
def make_description_from_params(description, formula):
@@ -451,10 +450,9 @@ def make_description_from_params(description, formula):
def get_field_value_in_sheet(field, field_name):
if field.get("__typename", "") == "DatasourceField":
- field = field.get("remoteField") if field.get("remoteField") else {}
+ field = field.get("remoteField") or {}
- field_value = field.get(field_name, "")
- return field_value
+ return field.get(field_name, "")
def get_unique_custom_sql(custom_sql_list: List[dict]) -> List[dict]:
@@ -506,6 +504,4 @@ def query_metadata(server, main_query, connection_name, first, offset, qry_filte
filter=qry_filter,
main_query=main_query,
)
- query_result = server.metadata.query(query)
-
- return query_result
+ return server.metadata.query(query)
diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py
index e21c6fc3ea42ba..e1bdf072787cd5 100644
--- a/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py
+++ b/metadata-ingestion/src/datahub/ingestion/source_config/pulsar.py
@@ -82,7 +82,7 @@ class PulsarSourceConfig(StatefulIngestionConfigBase, DatasetSourceConfigBase):
)
exclude_individual_partitions: bool = Field(
default=True,
- description="Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datesets.",
+ description="Extract each individual partitioned topic. e.g. when turned off a topic with 100 partitions will result in 100 Datasets.",
)
tenants: List[str] = Field(
diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py b/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py
index b00ac9cdfb41b8..984b8fa917d647 100644
--- a/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py
+++ b/metadata-ingestion/src/datahub/ingestion/source_config/sql/snowflake.py
@@ -90,7 +90,7 @@ class SnowflakeProvisionRoleConfig(ConfigModel):
@pydantic.validator("admin_username", always=True)
def username_not_empty(cls, v, values, **kwargs):
v_str: str = str(v)
- if v_str.strip() == "":
+ if not v_str.strip():
raise ValueError("username is empty")
return v
@@ -180,60 +180,55 @@ def authenticator_type_is_valid(cls, v, values, field):
f"unsupported authenticator type '{v}' was provided,"
f" use one of {list(VALID_AUTH_TYPES.keys())}"
)
- else:
- if v == "KEY_PAIR_AUTHENTICATOR":
- # If we are using key pair auth, we need the private key path and password to be set
- if values.get("private_key_path") is None:
- raise ValueError(
- f"'private_key_path' was none "
- f"but should be set when using {v} authentication"
- )
- elif v == "OAUTH_AUTHENTICATOR":
- if values.get("oauth_config") is None:
- raise ValueError(
- f"'oauth_config' is none but should be set when using {v} authentication"
- )
- if values.get("oauth_config").provider is None:
- raise ValueError(
- f"'oauth_config.provider' is none "
- f"but should be set when using {v} authentication"
- )
- if values.get("oauth_config").client_id is None:
- raise ValueError(
- f"'oauth_config.client_id' is none "
- f"but should be set when using {v} authentication"
- )
- if values.get("oauth_config").scopes is None:
+ if v == "KEY_PAIR_AUTHENTICATOR":
+ # If we are using key pair auth, we need the private key path and password to be set
+ if values.get("private_key_path") is None:
+ raise ValueError(
+ f"'private_key_path' was none "
+ f"but should be set when using {v} authentication"
+ )
+ elif v == "OAUTH_AUTHENTICATOR":
+ if values.get("oauth_config") is None:
+ raise ValueError(
+ f"'oauth_config' is none but should be set when using {v} authentication"
+ )
+ if values.get("oauth_config").provider is None:
+ raise ValueError(
+ f"'oauth_config.provider' is none "
+ f"but should be set when using {v} authentication"
+ )
+ if values.get("oauth_config").client_id is None:
+ raise ValueError(
+ f"'oauth_config.client_id' is none "
+ f"but should be set when using {v} authentication"
+ )
+ if values.get("oauth_config").scopes is None:
+ raise ValueError(
+ f"'oauth_config.scopes' was none "
+ f"but should be set when using {v} authentication"
+ )
+ if values.get("oauth_config").authority_url is None:
+ raise ValueError(
+ f"'oauth_config.authority_url' was none "
+ f"but should be set when using {v} authentication"
+ )
+ if values.get("oauth_config").use_certificate is True:
+ if values.get("oauth_config").base64_encoded_oauth_private_key is None:
raise ValueError(
- f"'oauth_config.scopes' was none "
- f"but should be set when using {v} authentication"
+ "'base64_encoded_oauth_private_key' was none "
+ "but should be set when using certificate for oauth_config"
)
- if values.get("oauth_config").authority_url is None:
+ if values.get("oauth").base64_encoded_oauth_public_key is None:
raise ValueError(
- f"'oauth_config.authority_url' was none "
- f"but should be set when using {v} authentication"
+ "'base64_encoded_oauth_public_key' was none"
+ "but should be set when using use_certificate true for oauth_config"
)
- if values.get("oauth_config").use_certificate is True:
- if (
- values.get("oauth_config").base64_encoded_oauth_private_key
- is None
- ):
- raise ValueError(
- "'base64_encoded_oauth_private_key' was none "
- "but should be set when using certificate for oauth_config"
- )
- if values.get("oauth").base64_encoded_oauth_public_key is None:
- raise ValueError(
- "'base64_encoded_oauth_public_key' was none"
- "but should be set when using use_certificate true for oauth_config"
- )
- else:
- if values.get("oauth_config").client_secret is None:
- raise ValueError(
- "'oauth_config.client_secret' was none "
- "but should be set when using use_certificate false for oauth_config"
- )
- logger.info(f"using authenticator type '{v}'")
+ elif values.get("oauth_config").client_secret is None:
+ raise ValueError(
+ "'oauth_config.client_secret' was none "
+ "but should be set when using use_certificate false for oauth_config"
+ )
+ logger.info(f"using authenticator type '{v}'")
return v
@pydantic.validator("include_view_lineage")
diff --git a/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py b/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py
index 05dc636d312c2e..9abee691ca9bf8 100644
--- a/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py
+++ b/metadata-ingestion/src/datahub/ingestion/source_config/usage/bigquery_usage.py
@@ -114,7 +114,7 @@ class BigQueryUsageConfig(BigQueryBaseConfig, DatasetSourceConfigBase, BaseUsage
credential: Optional[BigQueryCredential] = pydantic.Field(
default=None,
- description="Bigquery credential. Required if GOOGLE_APPLICATION_CREDENTIALS enviroment variable is not set. See this example recipe for details",
+ description="Bigquery credential. Required if GOOGLE_APPLICATION_CREDENTIALS environment variable is not set. See this example recipe for details",
)
_credentials_path: Optional[str] = pydantic.PrivateAttr(None)
temp_table_dataset_prefix: str = pydantic.Field(
diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py
index ecc1dcfc5fd31f..82cfecbddfed39 100644
--- a/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py
+++ b/metadata-ingestion/src/datahub/ingestion/transformer/base_transformer.py
@@ -132,8 +132,8 @@ def _should_process(
return True
# fall through, no entity type matched
return False
- elif isinstance(record, MetadataChangeProposalWrapper) or isinstance(
- record, MetadataChangeProposalClass
+ elif isinstance(
+ record, (MetadataChangeProposalWrapper, MetadataChangeProposalClass)
):
return record.entityType in entity_types
diff --git a/metadata-ingestion/src/datahub/ingestion/transformer/mark_dataset_status.py b/metadata-ingestion/src/datahub/ingestion/transformer/mark_dataset_status.py
index bae8d0e07a80ab..d833e9bcc75a64 100644
--- a/metadata-ingestion/src/datahub/ingestion/transformer/mark_dataset_status.py
+++ b/metadata-ingestion/src/datahub/ingestion/transformer/mark_dataset_status.py
@@ -40,6 +40,6 @@ def transform_aspect(
self, entity_urn: str, aspect_name: str, aspect: Optional[builder.Aspect]
) -> Optional[builder.Aspect]:
assert aspect is None or isinstance(aspect, StatusClass)
- status_aspect: StatusClass = aspect if aspect else StatusClass(removed=None)
+ status_aspect: StatusClass = aspect or StatusClass(removed=None)
status_aspect.removed = self.config.removed
return status_aspect # type: ignore
diff --git a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py
index 98b344c0a06cc7..5dca3541493156 100644
--- a/metadata-ingestion/src/datahub/integrations/great_expectations/action.py
+++ b/metadata-ingestion/src/datahub/integrations/great_expectations/action.py
@@ -190,12 +190,11 @@ def _run(
result = "DataHub notification succeeded"
except Exception as e:
result = "DataHub notification failed"
- if self.graceful_exceptions:
- logger.error(e)
- logger.info("Supressing error because graceful_exceptions is set")
- else:
+ if not self.graceful_exceptions:
raise
+ logger.error(e)
+ logger.info("Suppressing error because graceful_exceptions is set")
return {"datahub_notification_result": result}
def get_assertions_with_results(
@@ -224,7 +223,7 @@ def get_assertions_with_results(
for result in validation_result_suite.results:
expectation_config = result["expectation_config"]
expectation_type = expectation_config["expectation_type"]
- success = True if result["success"] else False
+ success = bool(result["success"])
kwargs = {
k: v for k, v in expectation_config["kwargs"].items() if k != "batch_id"
}
@@ -271,8 +270,6 @@ def get_assertions_with_results(
# TODO: Understand why their run time is incorrect.
run_time = run_id.run_time.astimezone(timezone.utc)
- assertionResults = []
-
evaluation_parameters = (
{
k: convert_to_string(v)
@@ -328,8 +325,7 @@ def get_assertions_with_results(
)
if ds.get("partitionSpec") is not None:
assertionResult.partitionSpec = ds.get("partitionSpec")
- assertionResults.append(assertionResult)
-
+ assertionResults = [assertionResult]
assertions_with_results.append(
{
"assertionUrn": assertionUrn,
@@ -631,8 +627,9 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
].batch_request.runtime_parameters["query"]
partitionSpec = PartitionSpecClass(
type=PartitionTypeClass.QUERY,
- partition="Query_" + builder.datahub_guid(query),
+ partition=f"Query_{builder.datahub_guid(query)}",
)
+
batchSpec = BatchSpec(
nativeBatchId=batch_identifier,
query=query,
@@ -678,9 +675,9 @@ def get_dataset_partitions(self, batch_identifier, data_asset):
return dataset_partitions
def get_platform_instance(self, datasource_name):
- if self.platform_instance_map and datasource_name in self.platform_instance_map:
- return self.platform_instance_map[datasource_name]
if self.platform_instance_map:
+ if datasource_name in self.platform_instance_map:
+ return self.platform_instance_map[datasource_name]
warn(
f"Datasource {datasource_name} is not present in platform_instance_map"
)
@@ -698,21 +695,21 @@ def make_dataset_urn_from_sqlalchemy_uri(
schema_name, table_name = table_name.split(".")[-2:]
if data_platform in ["redshift", "postgres"]:
- schema_name = schema_name if schema_name else "public"
+ schema_name = schema_name or "public"
if url_instance.database is None:
warn(
f"DataHubValidationAction failed to locate database name for {data_platform}."
)
return None
- schema_name = "{}.{}".format(url_instance.database, schema_name)
+ schema_name = f"{url_instance.database}.{schema_name}"
elif data_platform == "mssql":
- schema_name = schema_name if schema_name else "dbo"
+ schema_name = schema_name or "dbo"
if url_instance.database is None:
warn(
f"DataHubValidationAction failed to locate database name for {data_platform}."
)
return None
- schema_name = "{}.{}".format(url_instance.database, schema_name)
+ schema_name = f"{url_instance.database}.{schema_name}"
elif data_platform in ["trino", "snowflake"]:
if schema_name is None or url_instance.database is None:
warn(
@@ -738,16 +735,16 @@ def make_dataset_urn_from_sqlalchemy_uri(
)
)
return None
- schema_name = "{}.{}".format(url_instance.host, url_instance.database)
+ schema_name = f"{url_instance.host}.{url_instance.database}"
- schema_name = schema_name if schema_name else url_instance.database
+ schema_name = schema_name or url_instance.database
if schema_name is None:
warn(
f"DataHubValidationAction failed to locate schema name for {data_platform}."
)
return None
- dataset_name = "{}.{}".format(schema_name, table_name)
+ dataset_name = f"{schema_name}.{table_name}"
dataset_urn = builder.make_dataset_urn_with_platform_instance(
platform=data_platform,
diff --git a/metadata-ingestion/src/datahub/telemetry/stats.py b/metadata-ingestion/src/datahub/telemetry/stats.py
index ea48aab14c77db..e76580d677588c 100644
--- a/metadata-ingestion/src/datahub/telemetry/stats.py
+++ b/metadata-ingestion/src/datahub/telemetry/stats.py
@@ -27,9 +27,7 @@ def calculate_percentiles(
min(i, size - 1) for i in percentile_indices
] # in case of rounding errors
- values = {p: data_sorted[i] for p, i in zip(percentiles, percentile_indices)}
-
- return values
+ return {p: data_sorted[i] for p, i in zip(percentiles, percentile_indices)}
def discretize(statistic: Union[float, int]) -> int:
diff --git a/metadata-ingestion/src/datahub/telemetry/telemetry.py b/metadata-ingestion/src/datahub/telemetry/telemetry.py
index b95df169414320..0a346d09373850 100644
--- a/metadata-ingestion/src/datahub/telemetry/telemetry.py
+++ b/metadata-ingestion/src/datahub/telemetry/telemetry.py
@@ -273,7 +273,7 @@ def get_full_class_name(obj):
module = obj.__class__.__module__
if module is None or module == str.__class__.__module__:
return obj.__class__.__name__
- return module + "." + obj.__class__.__name__
+ return f"{module}.{obj.__class__.__name__}"
def with_telemetry(func: Callable[..., T]) -> Callable[..., T]:
diff --git a/metadata-ingestion/src/datahub/upgrade/upgrade.py b/metadata-ingestion/src/datahub/upgrade/upgrade.py
index 32b56c1cb20218..b6ca0486c3a2cf 100644
--- a/metadata-ingestion/src/datahub/upgrade/upgrade.py
+++ b/metadata-ingestion/src/datahub/upgrade/upgrade.py
@@ -1,3 +1,4 @@
+import contextlib
import logging
from datetime import datetime, timedelta, timezone
from functools import wraps
@@ -221,7 +222,7 @@ def maybe_print_upgrade_message( # noqa: C901
encourage_cli_upgrade = False
client_server_compat = 0
encourage_quickstart_upgrade = False
- try:
+ with contextlib.suppress(Exception):
version_stats = retrieve_versions(server)
if not version_stats:
return
@@ -261,12 +262,9 @@ def maybe_print_upgrade_message( # noqa: C901
):
encourage_quickstart_upgrade = True
- except Exception:
- pass
-
# Compute recommendations and print one
if client_server_compat < 0:
- try:
+ with contextlib.suppress(Exception):
assert version_stats
print(
colored("❗Client-Server Incompatible❗", "yellow"),
@@ -279,10 +277,8 @@ def maybe_print_upgrade_message( # noqa: C901
"cyan",
),
)
- except Exception:
- pass
elif client_server_compat > 0:
- try:
+ with contextlib.suppress(Exception):
assert version_stats
print(
colored("❗Client-Server Incompatible❗", "red"),
@@ -295,12 +291,8 @@ def maybe_print_upgrade_message( # noqa: C901
"cyan",
),
)
- except Exception:
- pass
-
- # we only encourage upgrades if we think client_server is currently compatible
elif client_server_compat == 0 and encourage_cli_upgrade:
- try:
+ with contextlib.suppress(Exception):
print(
colored("💡 Upgrade cli!", "yellow"),
colored(
@@ -308,9 +300,6 @@ def maybe_print_upgrade_message( # noqa: C901
"cyan",
),
)
- except Exception:
- pass
-
elif encourage_quickstart_upgrade:
try:
assert version_stats
diff --git a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py
index fc9680ba642d42..6e8d8da5f3fb82 100644
--- a/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py
+++ b/metadata-ingestion/src/datahub/utilities/hive_schema_to_avro.py
@@ -53,9 +53,12 @@ def _parse_datatype_string(
parts = HiveColumnToAvroConverter._ignore_brackets_split(s[4:-1], ",")
if len(parts) != 2:
raise ValueError(
- "The map type string format is: 'map', "
- + "but got: %s" % s
+ (
+ "The map type string format is: 'map', "
+ + f"but got: {s}"
+ )
)
+
kt = HiveColumnToAvroConverter._parse_datatype_string(parts[0])
vt = HiveColumnToAvroConverter._parse_datatype_string(parts[1])
# keys are assumed to be strings in avro map
@@ -103,9 +106,12 @@ def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]:
name_and_type = HiveColumnToAvroConverter._ignore_brackets_split(part, ":")
if len(name_and_type) != 2:
raise ValueError(
- "The struct field string format is: 'field_name:field_type', "
- + "but got: %s" % part
+ (
+ "The struct field string format is: 'field_name:field_type', "
+ + f"but got: {part}"
+ )
)
+
field_name = name_and_type[0].strip()
if field_name.startswith("`"):
if field_name[-1] != "`":
@@ -117,16 +123,15 @@ def _parse_struct_fields_string(s: str, **kwargs: Any) -> Dict[str, object]:
fields.append({"name": field_name, "type": field_type})
if kwargs.get("ustruct_seqn") is not None:
- struct_name = "__structn_{}_{}".format(
- kwargs["ustruct_seqn"], str(uuid.uuid4()).replace("-", "")
- )
+ struct_name = f'__structn_{kwargs["ustruct_seqn"]}_{str(uuid.uuid4()).replace("-", "")}'
+
else:
- struct_name = "__struct_{}".format(str(uuid.uuid4()).replace("-", ""))
+ struct_name = f'__struct_{str(uuid.uuid4()).replace("-", "")}'
return {
"type": "record",
"name": struct_name,
"fields": fields,
- "native_data_type": "struct<{}>".format(s),
+ "native_data_type": f"struct<{s}>",
}
@staticmethod
@@ -193,7 +198,7 @@ def _ignore_brackets_split(s: str, separator: str) -> List[str]:
buf += c
elif c in HiveColumnToAvroConverter._BRACKETS.values():
if level == 0:
- raise ValueError("Brackets are not correctly paired: %s" % s)
+ raise ValueError(f"Brackets are not correctly paired: {s}")
level -= 1
buf += c
elif c == separator and level > 0:
@@ -205,7 +210,7 @@ def _ignore_brackets_split(s: str, separator: str) -> List[str]:
buf += c
if len(buf) == 0:
- raise ValueError("The %s cannot be the last char: %s" % (separator, s))
+ raise ValueError(f"The {separator} cannot be the last char: {s}")
parts.append(buf)
return parts
diff --git a/metadata-ingestion/src/datahub/utilities/memory_leak_detector.py b/metadata-ingestion/src/datahub/utilities/memory_leak_detector.py
index b5fa3c3a723ea4..ef0db205b72ac9 100644
--- a/metadata-ingestion/src/datahub/utilities/memory_leak_detector.py
+++ b/metadata-ingestion/src/datahub/utilities/memory_leak_detector.py
@@ -12,7 +12,7 @@
def _trace_has_file(trace: tracemalloc.Traceback, file_pattern: str) -> bool:
- for frame_index in range(0, len(trace)):
+ for frame_index in range(len(trace)):
cur_frame = trace[frame_index]
if fnmatch.fnmatch(cur_frame.filename, file_pattern):
return True
@@ -99,8 +99,7 @@ def wrapper(*args: Any, **kwargs: Any) -> Any:
_init_leak_detection()
try:
- res = func(*args, **kwargs)
- return res
+ return func(*args, **kwargs)
finally:
if detect_leaks:
_perform_leak_detection()
diff --git a/metadata-ingestion/src/datahub/utilities/server_config_util.py b/metadata-ingestion/src/datahub/utilities/server_config_util.py
index c919a1356f2642..40841321ad2778 100644
--- a/metadata-ingestion/src/datahub/utilities/server_config_util.py
+++ b/metadata-ingestion/src/datahub/utilities/server_config_util.py
@@ -3,7 +3,7 @@
from datahub.telemetry.telemetry import set_telemetry_enable
# Only to be written to for logging server related information
-global_debug: Dict[str, Any] = dict()
+global_debug: Dict[str, Any] = {}
def set_gms_config(config: Dict) -> Any:
diff --git a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py
index 80ea7cc31455cd..6fe57b297d4528 100644
--- a/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py
+++ b/metadata-ingestion/src/datahub/utilities/sql_lineage_parser_impl.py
@@ -1,3 +1,4 @@
+import contextlib
import logging
import re
import unittest
@@ -7,15 +8,12 @@
from sqllineage.core.holders import Column, SQLLineageHolder
from sqllineage.exceptions import SQLLineageException
-try:
+with contextlib.suppress(ImportError):
import sqlparse
from networkx import DiGraph
from sqllineage.core import LineageAnalyzer
import datahub.utilities.sqllineage_patch
-except ImportError:
- pass
-
logger = logging.getLogger(__name__)
@@ -97,7 +95,7 @@ def __init__(self, sql_query: str) -> None:
logger.error(f"SQL lineage analyzer error '{e}' for query: '{self._sql}")
def get_tables(self) -> List[str]:
- result: List[str] = list()
+ result: List[str] = []
if self._sql_holder is None:
logger.error("sql holder not present so cannot get tables")
return result
@@ -135,12 +133,10 @@ def get_columns(self) -> List[str]:
result.add(str(column.raw_name))
# Reverting back all the previously renamed words which confuses the parser
- result = set(["date" if c == self._DATE_SWAP_TOKEN else c for c in result])
- result = set(
- [
- "timestamp" if c == self._TIMESTAMP_SWAP_TOKEN else c
- for c in list(result)
- ]
- )
+ result = {"date" if c == self._DATE_SWAP_TOKEN else c for c in result}
+ result = {
+ "timestamp" if c == self._TIMESTAMP_SWAP_TOKEN else c for c in list(result)
+ }
+
# swap back renamed date column
return list(result)
diff --git a/metadata-ingestion/src/datahub/utilities/sql_parser.py b/metadata-ingestion/src/datahub/utilities/sql_parser.py
index eb0bc0ec8262f4..28b5082ccbb3b2 100644
--- a/metadata-ingestion/src/datahub/utilities/sql_parser.py
+++ b/metadata-ingestion/src/datahub/utilities/sql_parser.py
@@ -1,3 +1,4 @@
+import contextlib
import logging
import multiprocessing
import re
@@ -9,11 +10,8 @@
from datahub.utilities.sql_lineage_parser_impl import SqlLineageSQLParserImpl
-try:
+with contextlib.suppress(ImportError):
from sql_metadata import Parser as MetadataSQLParser
-except ImportError:
- pass
-
logger = logging.getLogger(__name__)
diff --git a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py
index 0474f4ec7d3d68..947f5e30d62c89 100644
--- a/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py
+++ b/metadata-ingestion/src/datahub/utilities/sqlalchemy_query_combiner.py
@@ -108,8 +108,7 @@ def get_query_columns(query: Any) -> List[Any]:
try:
# inner_columns will be more accurate if the column names are unnamed,
# since .columns will remove the "duplicates".
- inner_columns = list(query.inner_columns)
- return inner_columns
+ return list(query.inner_columns)
except AttributeError:
return list(query.columns)
diff --git a/metadata-ingestion/src/datahub/utilities/urns/urn.py b/metadata-ingestion/src/datahub/utilities/urns/urn.py
index 7498cc1532c66e..479e74331fd9b3 100644
--- a/metadata-ingestion/src/datahub/utilities/urns/urn.py
+++ b/metadata-ingestion/src/datahub/utilities/urns/urn.py
@@ -21,7 +21,7 @@ class Urn:
def __init__(
self, entity_type: str, entity_id: List[str], urn_domain: str = LI_DOMAIN
):
- if len(entity_id) == 0:
+ if not entity_id:
raise InvalidUrnError("Empty entity id.")
self._validate_entity_type(entity_type)
self._validate_entity_id(entity_id)
@@ -122,9 +122,9 @@ def _get_entity_id_from_str(entity_id: str) -> List[str]:
part_start = i + 1
if start_paren_count != 0:
- raise InvalidUrnError(f"{entity_id}, mismtached paren nesting")
+ raise InvalidUrnError(f"{entity_id}, mismatched paren nesting")
- parts.append(entity_id[part_start : len(entity_id) - 1])
+ parts.append(entity_id[part_start:-1])
return parts
@@ -151,11 +151,12 @@ def __hash__(self) -> int:
return hash((self._domain, self._entity_type) + tuple(self._entity_id))
def __eq__(self, other: object) -> bool:
- if not isinstance(other, Urn):
- return False
-
return (
- self._entity_id == other._entity_id
- and self._domain == other._domain
- and self._entity_type == other._entity_type
+ (
+ self._entity_id == other._entity_id
+ and self._domain == other._domain
+ and self._entity_type == other._entity_type
+ )
+ if isinstance(other, Urn)
+ else False
)