Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(ingest): Refactoring container creation to common place #6877

Merged
merged 49 commits into from
Jan 20, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
49 commits
Select commit Hold shift + click to select a range
58ab3cf
Refactoring container creation
treff7es Dec 28, 2022
c809aa8
Adding back accidentally removed imports
treff7es Dec 28, 2022
fae7ec1
Adding back line breaks as well
treff7es Dec 28, 2022
a71f8dc
Merge branch 'master' into sql_common_refactor
treff7es Dec 28, 2022
cb219bd
Remove unneeded line
treff7es Dec 28, 2022
d3e6407
Black formatting
treff7es Dec 28, 2022
93df8fc
Isorting
treff7es Dec 28, 2022
cf074f0
Fixing return types
treff7es Dec 28, 2022
42004e5
Black formatting
treff7es Dec 28, 2022
8fcdb7a
Add option to set container name differently than the key name
treff7es Dec 28, 2022
e985a20
Fix snowflake container generation
treff7es Dec 29, 2022
9af84c9
Fixing 2 tier container generation
treff7es Dec 29, 2022
be580ee
Merge branch 'master' into sql_common_refactor
treff7es Dec 29, 2022
a626a9e
isorting
treff7es Dec 29, 2022
ade9fa2
Removing unused import
treff7es Dec 29, 2022
0da0c55
Fixing presto container generation
treff7es Dec 29, 2022
2268cad
Athena inherits from two tier db
treff7es Dec 29, 2022
480497f
Reverting Athena changes
treff7es Dec 29, 2022
e017f44
Merge branch 'master' into sql_common_refactor
treff7es Dec 29, 2022
d6c1b7d
Merge branch 'master' into sql_common_refactor
treff7es Dec 30, 2022
f3f0ffb
Merge branch 'master' into sql_common_refactor
treff7es Jan 13, 2023
6e5debd
Fixing merge conflict issues
treff7es Jan 13, 2023
f873c46
Blacking
treff7es Jan 13, 2023
2b85722
Simplifying a bit the container creation calls
treff7es Jan 19, 2023
8a8d2c5
Merge branch 'master' into sql_common_refactor
treff7es Jan 19, 2023
47edb2f
Merge branch 'master' into sql_common_refactor
treff7es Jan 19, 2023
e337e4b
addressing pr comment
treff7es Jan 19, 2023
8143042
Making defensive if domain registry is not set
treff7es Jan 19, 2023
8765898
Adding platform to config
treff7es Jan 19, 2023
33a69d3
Merge branch 'master' into sql_common_refactor
treff7es Jan 19, 2023
9d55f72
Fixing linter issues
treff7es Jan 19, 2023
68f82d0
Merge branch 'master' into sql_common_refactor
treff7es Jan 19, 2023
55eebd5
Removing unneeded import
treff7es Jan 19, 2023
804bdcc
Checking domain registry existence
treff7es Jan 20, 2023
1529528
Removing the need for config
treff7es Jan 20, 2023
8d8c3eb
fix dbname in Snowflake containers
treff7es Jan 20, 2023
3835f91
Updating golden files
treff7es Jan 20, 2023
5a23742
Removing unneeded state add
treff7es Jan 20, 2023
b9dc0c3
Updating config
treff7es Jan 20, 2023
bc32fda
Merge branch 'master' into sql_common_refactor
treff7es Jan 20, 2023
31d1efb
Removing unneded config platform set
treff7es Jan 20, 2023
ff45fbf
More fixes
treff7es Jan 20, 2023
10c4ca9
Fixing parameters
treff7es Jan 20, 2023
88a88f7
fixing snowflake test
treff7es Jan 20, 2023
c92559f
Adding exception for debugging
treff7es Jan 20, 2023
2b7763b
Reverting exception raise
treff7es Jan 20, 2023
5e6f909
Extracting out container assignment in presto on hive
treff7es Jan 20, 2023
018a04d
Fixing container generation in presto_on_hive_view
treff7es Jan 20, 2023
df78829
Merge branch 'master' into sql_common_refactor
treff7es Jan 20, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/emitter/mcp_builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ def gen_containers(
)
yield MetadataChangeProposalWrapper(
entityUrn=f"{container_urn}",
# entityKeyAspect=ContainerKeyClass(guid=schema_container_key.guid()),
# entityKeyAspect=ContainerKeyClass(guid=parent_container_key.guid()),
aspect=ContainerProperties(
name=name,
description=description,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,17 +15,13 @@
make_data_platform_urn,
make_dataplatform_instance_urn,
make_dataset_urn_with_platform_instance,
make_domain_urn,
make_tag_urn,
set_dataset_urn_to_lower,
)
from datahub.emitter.mcp_builder import (
BigQueryDatasetKey,
PlatformKey,
ProjectIdKey,
add_dataset_to_container,
add_domain_to_entity_wu,
gen_containers,
wrap_aspect_as_workunit,
)
from datahub.ingestion.api.common import PipelineContext
Expand Down Expand Up @@ -62,6 +58,12 @@
from datahub.ingestion.source.bigquery_v2.lineage import BigqueryLineageExtractor
from datahub.ingestion.source.bigquery_v2.profiler import BigqueryProfiler
from datahub.ingestion.source.bigquery_v2.usage import BigQueryUsageExtractor
from datahub.ingestion.source.sql.sql_utils import (
add_table_to_schema_container,
gen_database_container,
gen_schema_container,
get_domain_wu,
)
from datahub.ingestion.source.state.profiling_state_handler import ProfilingHandler
from datahub.ingestion.source.state.redundant_run_skip_handler import (
RedundantRunSkipHandler,
Expand Down Expand Up @@ -201,6 +203,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
self.config: BigQueryV2Config = config
self.report: BigQueryV2Report = BigQueryV2Report()
self.platform: str = "bigquery"

BigqueryTableIdentifier._BIGQUERY_DEFAULT_SHARDED_TABLE_REGEX = (
self.config.sharded_table_pattern
)
Expand Down Expand Up @@ -231,6 +234,7 @@ def __init__(self, ctx: PipelineContext, config: BigQueryV2Config):
run_id=self.ctx.run_id,
)

self.domain_registry: Optional[DomainRegistry] = None
if self.config.domain:
self.domain_registry = DomainRegistry(
cached_domains=[k for k in self.config.domain], graph=self.ctx.graph
Expand Down Expand Up @@ -447,27 +451,17 @@ def gen_project_id_key(self, database: str) -> PlatformKey:
backcompat_instance_for_guid=self.config.env,
)

def _gen_domain_urn(self, dataset_name: str) -> Optional[str]:
domain_urn: Optional[str] = None

for domain, pattern in self.config.domain.items():
if pattern.allowed(dataset_name):
domain_urn = make_domain_urn(
self.domain_registry.get_domain_urn(domain)
)

return domain_urn

def gen_project_id_containers(self, database: str) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(database)

database_container_key = self.gen_project_id_key(database)

yield from gen_containers(
container_key=database_container_key,
yield from gen_database_container(
database=database,
name=database,
sub_types=["Project"],
domain_urn=domain_urn,
domain_registry=self.domain_registry,
domain_config=self.config.domain,
report=self.report,
database_container_key=database_container_key,
)

def gen_dataset_containers(
Expand All @@ -477,28 +471,22 @@ def gen_dataset_containers(

database_container_key = self.gen_project_id_key(database=project_id)

yield from gen_containers(
schema_container_key,
dataset,
["Dataset"],
database_container_key,
yield from gen_schema_container(
database=project_id,
schema=dataset,
sub_types=["Dataset"],
domain_registry=self.domain_registry,
domain_config=self.config.domain,
report=self.report,
schema_container_key=schema_container_key,
database_container_key=database_container_key,
external_url=BQ_EXTERNAL_DATASET_URL_TEMPLATE.format(
project=project_id, dataset=dataset
)
if self.config.include_external_url
else None,
)

def add_table_to_dataset_container(
self, dataset_urn: str, db_name: str, schema: str
) -> Iterable[MetadataWorkUnit]:
schema_container_key = self.gen_dataset_key(db_name, schema)

yield from add_dataset_to_container(
container_key=schema_container_key,
dataset_urn=dataset_urn,
)

def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]:
logger.info("Getting projects")
conn: bigquery.Client = get_bigquery_client(self.config)
Expand Down Expand Up @@ -772,18 +760,6 @@ def _process_view(

yield from self.gen_view_dataset_workunits(view, project_id, dataset_name)

def _get_domain_wu(
self,
dataset_name: str,
entity_urn: str,
) -> Iterable[MetadataWorkUnit]:
domain_urn = self._gen_domain_urn(dataset_name)
if domain_urn:
yield from add_domain_to_entity_wu(
entity_urn=entity_urn,
domain_urn=domain_urn,
)

def gen_table_dataset_workunits(
self,
table: BigqueryTable,
Expand All @@ -792,10 +768,10 @@ def gen_table_dataset_workunits(
) -> Iterable[MetadataWorkUnit]:
custom_properties: Dict[str, str] = {}
if table.expires:
custom_properties["expiration_date"] = str(str(table.expires))
custom_properties["expiration_date"] = str(table.expires)

if table.time_partitioning:
custom_properties["time_partitioning"] = str(str(table.time_partitioning))
custom_properties["time_partitioning"] = str(table.time_partitioning)

if table.size_in_bytes:
custom_properties["size_in_bytes"] = str(table.size_in_bytes)
Expand Down Expand Up @@ -910,10 +886,10 @@ def gen_dataset_workunits(
if tags_to_add:
yield self.gen_tags_aspect_workunit(dataset_urn, tags_to_add)

yield from self.add_table_to_dataset_container(
dataset_urn,
project_id,
dataset_name,
yield from add_table_to_schema_container(
dataset_urn=dataset_urn,
report=self.report,
parent_container_key=self.gen_dataset_key(project_id, dataset_name),
)
dpi_aspect = self.get_dataplatform_instance_aspect(dataset_urn=dataset_urn)
if dpi_aspect:
Expand All @@ -922,10 +898,14 @@ def gen_dataset_workunits(
subTypes = SubTypes(typeNames=sub_types)
yield wrap_aspect_as_workunit("dataset", dataset_urn, "subTypes", subTypes)

yield from self._get_domain_wu(
dataset_name=str(datahub_dataset_name),
entity_urn=dataset_urn,
)
if self.domain_registry:
yield from get_domain_wu(
dataset_name=str(datahub_dataset_name),
entity_urn=dataset_urn,
domain_registry=self.domain_registry,
domain_config=self.config.domain,
report=self.report,
)

def gen_lineage(
self,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from pydantic import Field, PositiveInt, PrivateAttr, root_validator, validator

from datahub.configuration.common import AllowDenyPattern, ConfigurationError
from datahub.ingestion.source.sql.sql_common import SQLAlchemyConfig
from datahub.ingestion.source.sql.sql_config import SQLAlchemyConfig
from datahub.ingestion.source.state.stateful_ingestion_base import (
LineageStatefulIngestionConfig,
ProfilingStatefulIngestionConfig,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

from datahub.configuration.pattern_utils import is_schema_allowed
from datahub.emitter.mce_builder import make_dataset_urn_with_platform_instance
from datahub.ingestion.api.common import WorkUnit
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.ingestion.source.ge_data_profiler import (
DatahubGEProfiler,
GEProfilerRequest,
Expand Down Expand Up @@ -51,7 +51,9 @@ def __init__(
self.report: SnowflakeV2Report = report
self.logger = logger

def get_workunits(self, databases: List[SnowflakeDatabase]) -> Iterable[WorkUnit]:
def get_workunits(
self, databases: List[SnowflakeDatabase]
) -> Iterable[MetadataWorkUnit]:
# Extra default SQLAlchemy option for better connection pooling and threading.
# https://docs.sqlalchemy.org/en/14/core/pooling.html#sqlalchemy.pool.QueuePool.params.max_overflow
if self.config.profiling.enabled:
Expand Down
Loading