Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: Refactor Python Codebase #5113

Merged
merged 98 commits into from
Jul 6, 2022
Merged
Show file tree
Hide file tree
Changes from 90 commits
Commits
Show all changes
98 commits
Select commit Hold shift + click to select a range
d7df6e9
Spelling errors in code
Jun 7, 2022
bb26c5c
Use pathlib
Jun 7, 2022
0c72919
Spelling
Jun 7, 2022
7eb7aa7
Found bug in code
Jun 7, 2022
fb069ee
Spelling and declare as TODO
Jun 7, 2022
d4c36a0
Test assertions to pass
Jun 7, 2022
97617d3
use contextlib
Jun 7, 2022
2774b73
dataset add col if/else simplification
Jun 7, 2022
387b9d3
lineage emitter UTC timezone
Jun 7, 2022
de786c3
Update lineage_emitter_mcpw_rest.py
Jun 7, 2022
d95e8ba
Update custom_transform_example.py
Jun 7, 2022
20f770e
path read over open
Jun 7, 2022
2b1b6b9
spelling
Jun 7, 2022
5562bfe
escape strings
Jun 7, 2022
2952cef
Update cli_utils.py
Jun 8, 2022
ef6b5bc
Update delete_cli.py
Jun 8, 2022
0c92e2b
Update docker_check.py
Jun 8, 2022
62a6dd8
Escape
Jun 8, 2022
26f758f
Update migration_utils.py
Jun 8, 2022
03c4bf1
Update timeline_cli.py
Jun 8, 2022
5ee83d9
Update common.py
Jun 8, 2022
c78fdd8
Update import_resolver.py
Jun 8, 2022
1420932
Update yaml.py
Jun 8, 2022
16e2c12
Update kafka_emitter.py
Jun 8, 2022
2b0612f
Update mce_builder.py
Jun 8, 2022
85368b5
Update serialization_helper.py
Jun 8, 2022
af20f70
Update committable.py
Jun 8, 2022
9ab0d3d
Update common.py
Jun 8, 2022
daff0e4
Update decorators.py
Jun 8, 2022
6a79b7f
Update urn.py
Jun 8, 2022
3f48494
Update registry.py
Jun 8, 2022
d0b37ef
Update protobuf_util.py
Jun 8, 2022
a58dd74
Update datahub_ingestion_reporting_provider.py
Jun 8, 2022
5342aaf
Update pipeline.py
Jun 8, 2022
759bacd
Update datahub_kafka.py
Jun 8, 2022
b4275e1
Update datahub_rest.py
Jun 8, 2022
f12961d
Update pulsar.py
Jun 8, 2022
d193219
Update snowflake.py
Jun 8, 2022
e7dd056
Update bigquery_usage.py
Jun 8, 2022
6bf6136
Update base_transformer.py
Jun 8, 2022
0a1db39
Update mark_dataset_status.py
Jun 8, 2022
df463c4
Update action.py
Jun 8, 2022
2ab2fdd
Update stats.py
Jun 8, 2022
d188171
Update upgrade.py
Jun 8, 2022
5663dc9
Update telemetry.py
Jun 8, 2022
b57b58b
Update hive_schema_to_avro.py
Jun 8, 2022
d1dda5e
Update mapping.py
Jun 8, 2022
a3839e2
Update memory_leak_detector.py
Jun 8, 2022
1d180c1
Update server_config_util.py
Jun 8, 2022
ab48916
Update sql_lineage_parser_impl.py
Jun 8, 2022
bb37d4d
Update sql_parser.py
Jun 8, 2022
1574080
Update sqlalchemy_query_combiner.py
Jun 8, 2022
705050e
Update powerbi.py
Jun 8, 2022
b7df2f9
Update redash.py
Jun 8, 2022
2c51bc3
Update tableau.py
Jun 8, 2022
4c9705f
Update pulsar.py
Jun 8, 2022
d4afcb7
Update tableau_common.py
Jun 8, 2022
c4b15e9
Update powerbi.py
Jun 8, 2022
3cfd95a
Update openapi_parser.py
Jun 8, 2022
99823bb
Update openapi.py
Jun 8, 2022
1a85c8f
Update nifi.py
Jun 8, 2022
748d115
Update mongodb.py
Jun 8, 2022
23e4329
Update metabase.py
Jun 8, 2022
82509c7
Update lookml.py
Jun 8, 2022
3245d7b
Update looker_common.py
Jun 8, 2022
232dbda
Update looker.py
Jun 8, 2022
bc6e2b0
Update feast.py
Jun 8, 2022
77609ee
Update elastic_search.py
Jun 8, 2022
13f017b
Merge branch 'master' into master
Jun 9, 2022
0dedbc8
Running black
Jun 9, 2022
95d47db
running isort
Jun 9, 2022
4d708b5
lint fix
Jun 9, 2022
ee0bc05
Update timeline_cli.py
Jun 9, 2022
8010744
logger warn to logger warning
Jun 9, 2022
9cdf07c
Merge branch 'master' into master
Jun 9, 2022
07e3084
Merge branch 'master' of github.com:koconder/datahub
Jun 9, 2022
b168138
Update business_glossary.yml
Jun 11, 2022
980826c
Update ldap.py
Jun 11, 2022
b507c4b
Merge branch 'master' into master
Jun 11, 2022
6a1e1b2
Merge branch 'master' of github.com:koconder/datahub
Jun 12, 2022
9e50e6b
Merge branch 'master' into master
Jun 14, 2022
1e60f1b
lint
Jun 14, 2022
7c5e98d
Update cli_utils.py
Jun 20, 2022
b59d2ed
Update cli_utils.py
Jun 20, 2022
4820f62
Update cli_utils.py
Jun 20, 2022
978111f
updates as per request
Jun 24, 2022
391fa28
Update kafka.py
Jun 24, 2022
22611fc
Merge branch 'master' into master
Jun 24, 2022
e299618
Update powerbi.py
Jun 24, 2022
97337aa
Merge branch 'master' of github.com:koconder/datahub
Jun 24, 2022
76150f1
lint
Jun 24, 2022
da217c6
Update common.py
Jun 24, 2022
fb2d1dd
Merge branch 'master' into master
Jun 24, 2022
6ab23f2
Update mapping.py
Jun 27, 2022
c7f8106
Update mapping.py
Jun 27, 2022
93ed609
Merge branch 'master' into master
Jun 29, 2022
6fc0785
Update source.py
Jun 29, 2022
2de8aee
revert changes to functionality
anshbansal Jul 6, 2022
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 3 additions & 5 deletions metadata-ingestion-modules/airflow-plugin/setup.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import os
import pathlib
import sys
from typing import Dict, Set

Expand All @@ -14,14 +15,11 @@

def get_long_description():
root = os.path.dirname(__file__)
with open(os.path.join(root, "README.md")) as f:
description = f.read()

return description
return pathlib.Path(os.path.join(root, "README.md")).read_text()


base_requirements = {
# Compatability.
# Compatibility.
"dataclasses>=0.6; python_version < '3.7'",
"typing_extensions>=3.10.0.2",
"mypy_extensions>=0.4.3",
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import contextlib
import traceback
from typing import Any, Iterable

Expand Down Expand Up @@ -215,7 +216,7 @@ def datahub_on_success_callback(context, *args, **kwargs):
for inlet in inlets:
datajob.inlets.append(inlet.urn)

# We have to use _oulets because outlets is empty
# We have to use _outlets because outlets is empty
for outlet in task._outlets:
datajob.outlets.append(outlet.urn)

Expand Down Expand Up @@ -389,13 +390,10 @@ def _patch_policy(settings):


def _patch_datahub_policy():
try:
with contextlib.suppress(ImportError):
import airflow_local_settings

_patch_policy(airflow_local_settings)
except ImportError:
pass

from airflow.models.dagbag import settings

_patch_policy(settings)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
def test_dummy():
assert True
pass
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
def test_dummy():
vincentkoc marked this conversation as resolved.
Show resolved Hide resolved
assert True
pass
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ def emitAssertionResult(assertionResult: AssertionRunEvent) -> None:
aspect=assertionResult,
)

# Emit BatchAssertion Result! (timseries aspect)
# Emit BatchAssertion Result! (timeseries aspect)
emitter.emit_mcp(dataset_assertionRunEvent_mcp)


Expand Down
17 changes: 7 additions & 10 deletions metadata-ingestion/examples/library/dataset_add_column_tag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@

def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
if field_path.startswith("[version=2.0]"):
# this is a v2 field path
tokens = [
t
for t in field_path.split(".")
if not (t.startswith("[") or t.endswith("]"))
]
path = ".".join(tokens)
return path
else:
if not field_path.startswith("[version=2.0]"):
# not a v2, we assume this is a simple path
return field_path
# this is a v2 field path
tokens = [
t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
]

return ".".join(tokens)


# Inputs -> the column, dataset and the tag to set
Expand Down
17 changes: 7 additions & 10 deletions metadata-ingestion/examples/library/dataset_add_column_term.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,18 +23,15 @@

def get_simple_field_path_from_v2_field_path(field_path: str) -> str:
"""A helper function to extract simple . path notation from the v2 field path"""
if field_path.startswith("[version=2.0]"):
# this is a v2 field path
tokens = [
t
for t in field_path.split(".")
if not (t.startswith("[") or t.endswith("]"))
]
path = ".".join(tokens)
return path
else:
if not field_path.startswith("[version=2.0]"):
# not a v2, we assume this is a simple path
return field_path
# this is a v2 field path
tokens = [
t for t in field_path.split(".") if not (t.startswith("[") or t.endswith("]"))
]

return ".".join(tokens)


# Inputs -> the column, dataset and the term to set
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,11 @@
)
from datahub.metadata.schema_classes import ChangeTypeClass

# Construct upstream tables.
upstream_tables: List[UpstreamClass] = []
upstream_table_1 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_1", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
)
upstream_tables.append(upstream_table_1)
upstream_tables: List[UpstreamClass] = [upstream_table_1]
upstream_table_2 = UpstreamClass(
dataset=builder.make_dataset_urn("bigquery", "upstream_table_2", "PROD"),
type=DatasetLineageTypeClass.TRANSFORMED,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import uuid
from datetime import datetime
from datetime import datetime, timezone

from datahub.api.entities.datajob import DataFlow, DataJob
from datahub.api.entities.dataprocess.dataprocess_instance import (
Expand Down Expand Up @@ -36,40 +36,61 @@
jobFlowRun = DataProcessInstance.from_dataflow(
dataflow=jobFlow, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
jobFlowRun.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
jobFlowRun.emit_process_start(
emitter, int(datetime.now(timezone.utc).timestamp() * 1000)
)


jobRun = DataProcessInstance.from_datajob(
datajob=dataJob, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
jobRun.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
jobRun.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))

jobRun.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)


job2Run = DataProcessInstance.from_datajob(
datajob=dataJob2, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
job2Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
job2Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))

job2Run.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)


job3Run = DataProcessInstance.from_datajob(
datajob=dataJob3, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
job3Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
job3Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))

job3Run.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)


job4Run = DataProcessInstance.from_datajob(
datajob=dataJob4, id=f"{jobFlow.id}-{uuid.uuid4()}"
)
job4Run.emit_process_start(emitter, int(datetime.utcnow().timestamp() * 1000))
job4Run.emit_process_start(emitter, int(datetime.now(timezone.utc).timestamp() * 1000))

job4Run.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)


jobFlowRun.emit_process_end(
emitter, int(datetime.utcnow().timestamp() * 1000), result=InstanceRunResult.SUCCESS
emitter,
int(datetime.now(timezone.utc).timestamp() * 1000),
result=InstanceRunResult.SUCCESS,
)
Original file line number Diff line number Diff line change
Expand Up @@ -61,13 +61,10 @@ def transform_aspect( # type: ignore
assert aspect is None or isinstance(aspect, OwnershipClass)

if owners_to_add:
ownership = (
aspect
if aspect
else OwnershipClass(
owners=[],
)
ownership = aspect or OwnershipClass(
owners=[],
)

ownership.owners.extend(owners_to_add)

return ownership
7 changes: 2 additions & 5 deletions metadata-ingestion/scripts/avro_codegen.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,11 +10,8 @@


def load_schema_file(schema_file: str) -> str:
with open(schema_file) as f:
raw_schema_text = f.read()

redo_spaces = json.dumps(json.loads(raw_schema_text), indent=2)
return redo_spaces
raw_schema_text = Path(schema_file).read_text()
return json.dumps(json.loads(raw_schema_text), indent=2)


def merge_schemas(schemas: List[str]) -> str:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,7 @@ def emit(
"""
Emit the DataFlow entity to Datahub

:param emitter: Datahub Emitter to emit the proccess event
:param emitter: Datahub Emitter to emit the process event
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
"""
for mcp in self.generate_mcp():
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class DataJob:
properties Dict[str, str]: Custom properties to set for the DataProcessInstance
url (Optional[str]): Url which points to the DataJob at the orchestrator
inlets (List[str]): List of urns the DataProcessInstance consumes
outlest (List[str]): List of urns the DataProcessInstance produces
outlets (List[str]): List of urns the DataProcessInstance produces
input_datajob_urns: List[DataJobUrn] = field(default_factory=list)
"""

Expand Down Expand Up @@ -179,7 +179,7 @@ def emit(
"""
Emit the DataJob entity to Datahub

:param emitter: Datahub Emitter to emit the proccess event
:param emitter: Datahub Emitter to emit the process event
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
:rtype: None
"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ class DataProcessInstance:
template_urn (Optional[Union[DataJobUrn, DataFlowUrn]]): The parent DataJob or DataFlow which was instantiated if applicable
parent_instance (Optional[DataProcessInstanceUrn]): The parent execution's urn if applicable
properties Dict[str, str]: Custom properties to set for the DataProcessInstance
url (Optional[str]): Url which points to the exection at the orchestrator
url (Optional[str]): Url which points to the execution at the orchestrator
inlets (List[str]): List of entities the DataProcessInstance consumes
outlets (List[str]): List of entities the DataProcessInstance produces
"""
Expand Down Expand Up @@ -118,10 +118,10 @@ def emit_process_start(
"""

:rtype: None
:param emitter: Datahub Emitter to emit the proccess event
:param emitter: Datahub Emitter to emit the process event
:param start_timestamp_millis: (int) the execution start time in milliseconds
:param attempt: the number of attempt of the execution with the same execution id
:param emit_template: (bool) If it is set the template of the execution (datajob, datflow) will be emitted as well.
:param emit_template: (bool) If it is set the template of the execution (datajob, dataflow) will be emitted as well.
:param callback: (Optional[Callable[[Exception, str], None]]) the callback method for KafkaEmitter if it is used
"""
if emit_template and self.template_urn is not None:
Expand Down Expand Up @@ -312,8 +312,8 @@ def from_datajob(

:param datajob: (DataJob) the datajob from generate the DataProcessInstance
:param id: (str) the id for the DataProcessInstance
:param clone_inlets: (bool) wheather to clone datajob's inlets
:param clone_outlets: (bool) wheather to clone datajob's outlets
:param clone_inlets: (bool) whether to clone datajob's inlets
:param clone_outlets: (bool) whether to clone datajob's outlets
:return: DataProcessInstance
"""
dpi: DataProcessInstance = DataProcessInstance(
Expand Down
Loading