Skip to content

Commit

Permalink
chore: Refactor Python Codebase (datahub-project#5113)
Browse files Browse the repository at this point in the history
  • Loading branch information
Vincent Koc authored and maggiehays committed Aug 1, 2022
1 parent b67aacd commit 78d8a0c
Show file tree
Hide file tree
Showing 71 changed files with 577 additions and 741 deletions.
8 changes: 3 additions & 5 deletions metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import pathlib
import sys
from typing import Dict, Set

Expand All @@ -14,14 +15,11 @@

def get_long_description():
root = os.path.dirname(__file__)
with open(os.path.join(root, "README.md")) as f:
description = f.read()

return description
return pathlib.Path(os.path.join(root, "README.md")).read_text()


base_requirements = {
# Compatability.
# Compatibility.
"dataclasses>=0.6; python_version < '3.7'",
"typing_extensions>=3.10.0.2",
"mypy_extensions>=0.4.3",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import traceback
from typing import Any, Iterable

Expand Down Expand Up @@ -215,7 +216,7 @@ def datahub_on_success_callback(context, *args, **kwargs):
for inlet in inlets:
datajob.inlets.append(inlet.urn)

# We have to use _oulets because outlets is empty
# We have to use _outlets because outlets is empty
for outlet in task._outlets:
datajob.outlets.append(outlet.urn)

Expand Down Expand Up @@ -389,13 +390,10 @@ def _patch_policy(settings):


def _patch_datahub_policy():
try:
with contextlib.suppress(ImportError):
import airflow_local_settings

_patch_policy(airflow_local_settings)
except ImportError:
pass

from airflow.models.dagbag import settings

_patch_policy(settings)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
def test_dummy():
assert True
pass
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
def test_dummy():
assert True
pass
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:
aspect=assertionResult,
)

# Emit BatchAssertion Result! (timseries aspect)
# Emit BatchAssertion Result! (timeseries aspect)
emitter.emit_mcp(dataset_assertionRunEvent_mcp)


Expand Down
17 changes: 7 additions & 10 deletions metadata-ingestion/examples/library/dataset_add_column_tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@

def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
if field_path.startswith("[version=2.0]"):
# this is a v2 field path
tokens = [
t
for t in field_path.split(".")
if not (t.startswith("[") or t.endswith("]"))
]
path = ".".join(tokens)
return path
else:
if not field_path.startswith("[version=2.0]"):
# not a v2, we assume this is a simple path
return field_path
# this is a v2 field path
tokens = [
t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
]

return ".".join(tokens)


# Inputs -> the column, dataset and the tag to set
Expand Down
17 changes: 7 additions & 10 deletions metadata-ingestion/examples/library/dataset_add_column_term.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@

def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
if field_path.startswith("[version=2.0]"):
# this is a v2 field path
tokens = [
t
for t in field_path.split(".")
if not (t.startswith("[") or t.endswith("]"))
]
path = ".".join(tokens)
return path
else:
if not field_path.startswith("[version=2.0]"):
# not a v2, we assume this is a simple path
return field_path
# this is a v2 field path
tokens = [
t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
]

return ".".join(tokens)


# Inputs -> the column, dataset and the term to set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
)
from datahub.metadata.schema_classes import ChangeTypeClass

# Construct upstream tables.
upstream_tables: List[UpstreamClass] = []
upstream_table_1 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_1", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables.append(upstream_table_1)
upstream_tables: List[UpstreamClass] = [upstream_table_1]
upstream_table_2 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_2", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid
from datetime import datetime
from datetime import datetime, timezone

from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
Expand Down Expand Up @@ -36,40 +36,61 @@
jobFlowRun = DataProcessInstance.from_dataflow(
dataflow=jobFlow, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
jobFlowRun.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
jobFlowRun.emit_process_start(
emitter, int(datetime.now(timezone.utc).timestamp() * 1000)
)


jobRun = DataProcessInstance.from_datajob(
datajob=dataJob, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
jobRun.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
jobRun.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))

jobRun.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)


job2Run = DataProcessInstance.from_datajob(
datajob=dataJob2, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
job2Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
job2Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))

job2Run.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)


job3Run = DataProcessInstance.from_datajob(
datajob=dataJob3, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
job3Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
job3Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))

job3Run.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)


job4Run = DataProcessInstance.from_datajob(
datajob=dataJob4, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
job4Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
job4Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))

job4Run.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)


jobFlowRun.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,10 @@ def transform_aspect( # type: ignore
assert aspect is None or isinstance(aspect, OwnershipClass)

if owners_to_add:
ownership = (
aspect
if aspect
else OwnershipClass(
owners=[],
)
ownership = aspect or OwnershipClass(
owners=[],
)

ownership.owners.extend(owners_to_add)

return ownership
7 changes: 2 additions & 5 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@


def load_schema_file(schema_file: str) -> str:
with open(schema_file) as f:
raw_schema_text = f.read()

redo_spaces = json.dumps(json.loads(raw_schema_text), indent=2)
return redo_spaces
raw_schema_text = Path(schema_file).read_text()
return json.dumps(json.loads(raw_schema_text), indent=2)


def merge_schemas(schemas: List[str]) -> str:
Expand Down
23 changes: 15 additions & 8 deletions metadata-ingestion/scripts/docgen.py
Original file line number Diff line number Diff line change
Expand Up @@ -605,9 +605,10 @@ def generate(
os.makedirs(config_dir, exist_ok=True)
with open(f"{config_dir}/{plugin_name}_config.json", "w") as f:
f.write(source_config_class.schema_json(indent=2))

create_or_update(source_documentation,
[platform_id, "plugins", plugin_name, "config_schema"],

create_or_update(
source_documentation,
[platform_id, "plugins", plugin_name, "config_schema"],
source_config_class.schema_json(indent=2) or "",
)

Expand Down Expand Up @@ -649,7 +650,9 @@ def generate(

with open(platform_doc_file, "w") as f:
if "name" in platform_docs:
f.write(f"import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n")
f.write(
f"import Tabs from '@theme/Tabs';\nimport TabItem from '@theme/TabItem';\n\n"
)
f.write(f"# {platform_docs['name']}\n")
if len(platform_docs["plugins"].keys()) > 1:
# More than one plugin used to provide integration with this platform
Expand Down Expand Up @@ -722,8 +725,10 @@ def generate(
f.write("\n```\n")
if "config" in plugin_docs:
f.write("\n### Config Details\n")
f.write("""<Tabs>
<TabItem value="options" label="Options" default>\n\n""")
f.write(
"""<Tabs>
<TabItem value="options" label="Options" default>\n\n"""
)
f.write(
"Note that a `.` is used to denote nested fields in the YAML recipe.\n\n"
)
Expand All @@ -733,15 +738,17 @@ def generate(
for doc in plugin_docs["config"]:
f.write(doc)
f.write("\n</details>\n\n")
f.write(f"""</TabItem>
f.write(
f"""</TabItem>
<TabItem value="schema" label="Schema">
The [JSONSchema](https://json-schema.org/) for this configuration is inlined below.\n\n
```javascript
{plugin_docs['config_schema']}
```\n\n
</TabItem>
</Tabs>\n\n""")
</Tabs>\n\n"""
)
# insert custom plugin docs after config details
f.write(plugin_docs.get("custom_docs", ""))
if "classname" in plugin_docs:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def emit(
"""
Emit the DataFlow entity to Datahub
:param emitter: Datahub Emitter to emit the proccess event
:param emitter: Datahub Emitter to emit the process event
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DataJob:
properties Dict[str, str]: Custom properties to set for the DataProcessInstance
url (Optional[str]): Url which points to the DataJob at the orchestrator
inlets (List[str]): List of urns the DataProcessInstance consumes
outlest (List[str]): List of urns the DataProcessInstance produces
outlets (List[str]): List of urns the DataProcessInstance produces
input_datajob_urns: List[DataJobUrn] = field(default_factory=list)
"""

Expand Down Expand Up @@ -179,7 +179,7 @@ def emit(
"""
Emit the DataJob entity to Datahub
:param emitter: Datahub Emitter to emit the proccess event
:param emitter: Datahub Emitter to emit the process event
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
:rtype: None
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DataProcessInstance:
template_urn (Optional[Union[DataJobUrn, DataFlowUrn]]): The parent DataJob or DataFlow which was instantiated if applicable
parent_instance (Optional[DataProcessInstanceUrn]): The parent execution's urn if applicable
properties Dict[str, str]: Custom properties to set for the DataProcessInstance
url (Optional[str]): Url which points to the exection at the orchestrator
url (Optional[str]): Url which points to the execution at the orchestrator
inlets (List[str]): List of entities the DataProcessInstance consumes
outlets (List[str]): List of entities the DataProcessInstance produces
"""
Expand Down Expand Up @@ -118,10 +118,10 @@ def emit_process_start(
"""
:rtype: None
:param emitter: Datahub Emitter to emit the proccess event
:param emitter: Datahub Emitter to emit the process event
:param start_timestamp_millis: (int) the execution start time in milliseconds
:param attempt: the number of attempt of the execution with the same execution id
:param emit_template: (bool) If it is set the template of the execution (datajob, datflow) will be emitted as well.
:param emit_template: (bool) If it is set the template of the execution (datajob, dataflow) will be emitted as well.
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
"""
if emit_template and self.template_urn is not None:
Expand Down Expand Up @@ -312,8 +312,8 @@ def from_datajob(
:param datajob: (DataJob) the datajob from generate the DataProcessInstance
:param id: (str) the id for the DataProcessInstance
:param clone_inlets: (bool) wheather to clone datajob's inlets
:param clone_outlets: (bool) wheather to clone datajob's outlets
:param clone_inlets: (bool) whether to clone datajob's inlets
:param clone_outlets: (bool) whether to clone datajob's outlets
:return: DataProcessInstance
"""
dpi: DataProcessInstance = DataProcessInstance(
Expand Down
Loading

0 comments on commit 78d8a0c

Please sign in to comment.