From 8cebd5e63a10895f95c2b359c59259ecff8a7c7c Mon Sep 17 00:00:00 2001 From: Harshal Sheth Date: Wed, 18 Jan 2023 13:34:32 -0800 Subject: [PATCH] feat(ingest): reporting revamp, part 1 (#7031) --- .../src/datahub/ingestion/api/report.py | 144 +++++++++++++----- .../datahub/ingestion/api/report_helpers.py | 23 +++ .../src/datahub/ingestion/api/source.py | 36 ++++- .../src/datahub/ingestion/api/workunit.py | 2 +- .../src/datahub/ingestion/run/pipeline.py | 2 +- .../source/bigquery_v2/bigquery_report.py | 1 - .../src/datahub/ingestion/source/nifi.py | 10 +- .../datahub/ingestion/source/unity/proxy.py | 9 -- .../datahub/ingestion/source/unity/report.py | 46 +----- .../datahub/ingestion/source/unity/source.py | 122 ++++++++------- .../tests/unit/config/test_config_model.py | 10 ++ metadata-ingestion/tests/unit/test_report.py | 26 ++++ 12 files changed, 267 insertions(+), 164 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/api/report_helpers.py create mode 100644 metadata-ingestion/tests/unit/test_report.py diff --git a/metadata-ingestion/src/datahub/ingestion/api/report.py b/metadata-ingestion/src/datahub/ingestion/api/report.py index e7432d0d98768a..fcca7675917746 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/report.py +++ b/metadata-ingestion/src/datahub/ingestion/api/report.py @@ -1,12 +1,23 @@ +import dataclasses import json +import logging import pprint import sys from dataclasses import dataclass -from datetime import datetime, timedelta, timezone +from datetime import datetime, timedelta from enum import Enum -from typing import Any, Dict +from typing import Any, Dict, Optional import humanfriendly +import pydantic +from pydantic import BaseModel +from typing_extensions import Literal, Protocol, runtime_checkable + +from datahub.ingestion.api.report_helpers import format_datetime_relative +from datahub.utilities.lossy_collections import LossyList + +logger = logging.getLogger(__name__) +LogLevel = Literal["ERROR", "WARNING", "INFO", "DEBUG"] # The sort_dicts option was added in Python 3.8. if sys.version_info >= (3, 8): @@ -15,58 +26,60 @@ PPRINT_OPTIONS: Dict = {} +@runtime_checkable +class SupportsAsObj(Protocol): + def as_obj(self) -> dict: + ... + + +def _stacklevel_if_supported(level: int) -> dict: + # The logging module added support for stacklevel in Python 3.8. + if sys.version_info >= (3, 8): + return {"stacklevel": level} + else: + return {} + + @dataclass -class Report: +class Report(SupportsAsObj): @staticmethod def to_str(some_val: Any) -> str: if isinstance(some_val, Enum): return some_val.name - elif isinstance(some_val, timedelta): - return humanfriendly.format_timespan(some_val) - elif isinstance(some_val, datetime): - try: - # check if we have a tz_aware object or not (https://stackoverflow.com/questions/5802108/how-to-check-if-a-datetime-object-is-localized-with-pytz) - tz_aware = ( - some_val.tzinfo is not None - and some_val.tzinfo.utcoffset(some_val) is not None - ) - now = datetime.now(timezone.utc) if tz_aware else datetime.now() - diff = now - some_val - if abs(diff) < timedelta(seconds=1): - # the timestamps are close enough that printing a duration isn't useful - return f"{some_val} (now)." - elif diff > timedelta(seconds=0): - # timestamp is in the past - return f"{some_val} ({humanfriendly.format_timespan(diff)} ago)." - else: - # timestamp is in the future - return f"{some_val} (in {humanfriendly.format_timespan(some_val - now)})." - except Exception: - # we don't want to fail reporting because we were unable to pretty print a timestamp - return str(datetime) else: return str(some_val) @staticmethod - def to_dict(some_val: Any) -> Any: + def to_pure_python_obj(some_val: Any) -> Any: """A cheap way to generate a dictionary.""" - if hasattr(some_val, "as_obj"): + + if isinstance(some_val, SupportsAsObj): return some_val.as_obj() - if hasattr(some_val, "dict"): # pydantic models + elif isinstance(some_val, pydantic.BaseModel): return some_val.dict() - if hasattr(some_val, "asdict"): # dataclasses - return some_val.asdict() - if isinstance(some_val, list): - return [Report.to_dict(v) for v in some_val if v is not None] - if isinstance(some_val, dict): + elif dataclasses.is_dataclass(some_val): + return dataclasses.asdict(some_val) + elif isinstance(some_val, list): + return [Report.to_pure_python_obj(v) for v in some_val if v is not None] + elif isinstance(some_val, timedelta): + return humanfriendly.format_timespan(some_val) + elif isinstance(some_val, datetime): + try: + return format_datetime_relative(some_val) + except Exception: + # we don't want to fail reporting because we were unable to pretty print a timestamp + return str(datetime) + elif isinstance(some_val, dict): return { - Report.to_str(k): Report.to_dict(v) + Report.to_str(k): Report.to_pure_python_obj(v) for k, v in some_val.items() if v is not None } - - # fall through option - return Report.to_str(some_val) + elif isinstance(some_val, (int, float, bool)): + return some_val + else: + # fall through option + return Report.to_str(some_val) def compute_stats(self) -> None: """A hook to compute derived stats""" @@ -75,10 +88,10 @@ def compute_stats(self) -> None: def as_obj(self) -> dict: self.compute_stats() return { - str(key): Report.to_dict(value) + str(key): Report.to_pure_python_obj(value) for (key, value) in self.__dict__.items() - if value is not None - and not str(key).startswith("_") # ignore nulls and fields starting with _ + # ignore nulls and fields starting with _ + if value is not None and not str(key).startswith("_") } def as_string(self) -> str: @@ -86,3 +99,52 @@ def as_string(self) -> str: def as_json(self) -> str: return json.dumps(self.as_obj()) + + # TODO add helper method for warning / failure status + counts? + + +class ReportAttribute(BaseModel): + severity: LogLevel = "DEBUG" + help: Optional[str] = None + + @property + def logger_sev(self) -> int: + log_levels = { + "DEBUG": logging.DEBUG, + "INFO": logging.INFO, + "WARNING": logging.WARNING, + "ERROR": logging.ERROR, + } + return log_levels[self.severity] + + def log(self, msg: str) -> None: + logger.log(level=self.logger_sev, msg=msg, **_stacklevel_if_supported(3)) + + +class EntityFilterReport(ReportAttribute): + type: str + + processed_entities: LossyList[str] = pydantic.Field(default_factory=LossyList) + dropped_entities: LossyList[str] = pydantic.Field(default_factory=LossyList) + + def processed(self, entity: str, type: Optional[str] = None) -> None: + self.log(f"Processed {type or self.type} {entity}") + self.processed_entities.append(entity) + + def dropped(self, entity: str, type: Optional[str] = None) -> None: + self.log(f"Filtered {type or self.type} {entity}") + self.dropped_entities.append(entity) + + def as_obj(self) -> dict: + return { + "filtered": self.dropped_entities.as_obj(), + "processed": self.processed_entities.as_obj(), + } + + @staticmethod + def field(type: str, severity: LogLevel = "DEBUG") -> "EntityFilterReport": + """A helper to create a dataclass field.""" + + return dataclasses.field( + default_factory=lambda: EntityFilterReport(type=type, severity=severity) + ) diff --git a/metadata-ingestion/src/datahub/ingestion/api/report_helpers.py b/metadata-ingestion/src/datahub/ingestion/api/report_helpers.py new file mode 100644 index 00000000000000..270dc98f158038 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/api/report_helpers.py @@ -0,0 +1,23 @@ +from datetime import datetime, timedelta, timezone + +import humanfriendly + + +def format_datetime_relative(some_val: datetime) -> str: + """Formats a datetime as a human-readable string relative to now.""" + + # check if we have a tz_aware object or not (https://stackoverflow.com/questions/5802108/how-to-check-if-a-datetime-object-is-localized-with-pytz) + tz_aware = ( + some_val.tzinfo is not None and some_val.tzinfo.utcoffset(some_val) is not None + ) + now = datetime.now(timezone.utc) if tz_aware else datetime.now() + diff = now - some_val + if abs(diff) < timedelta(seconds=1): + # the timestamps are close enough that printing a duration isn't useful + return f"{some_val} (now)" + elif diff > timedelta(seconds=0): + # timestamp is in the past + return f"{some_val} ({humanfriendly.format_timespan(diff)} ago)" + else: + # timestamp is in the future + return f"{some_val} (in {humanfriendly.format_timespan(some_val - now)})" diff --git a/metadata-ingestion/src/datahub/ingestion/api/source.py b/metadata-ingestion/src/datahub/ingestion/api/source.py index 9f3740aa9f3eea..5131734216d5c6 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/source.py +++ b/metadata-ingestion/src/datahub/ingestion/api/source.py @@ -1,15 +1,19 @@ import datetime from abc import ABCMeta, abstractmethod +from collections import defaultdict from dataclasses import dataclass, field from enum import Enum -from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union, cast +from typing import Dict, Generic, Iterable, Optional, Set, Type, TypeVar, Union, cast from pydantic import BaseModel from datahub.configuration.common import ConfigModel +from datahub.emitter.mcp_builder import mcps_from_mce from datahub.ingestion.api.closeable import Closeable from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit from datahub.ingestion.api.report import Report +from datahub.ingestion.api.workunit import MetadataWorkUnit +from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent from datahub.utilities.lossy_collections import LossyDict, LossyList from datahub.utilities.type_annotations import get_class_from_annotation @@ -34,14 +38,40 @@ class SourceCapability(Enum): class SourceReport(Report): events_produced: int = 0 events_produced_per_sec: int = 0 - event_ids: List[str] = field(default_factory=LossyList) + + _urns_seen: Set[str] = field(default_factory=set) + entities: Dict[str, list] = field(default_factory=lambda: defaultdict(LossyList)) + aspects: Dict[str, Dict[str, int]] = field( + default_factory=lambda: defaultdict(lambda: defaultdict(int)) + ) warnings: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict) failures: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict) def report_workunit(self, wu: WorkUnit) -> None: self.events_produced += 1 - self.event_ids.append(wu.id) + + if isinstance(wu, MetadataWorkUnit): + # Specialized entity reporting. + if not isinstance(wu.metadata, MetadataChangeEvent): + mcps = [wu.metadata] + else: + mcps = list(mcps_from_mce(wu.metadata)) + + for mcp in mcps: + urn = mcp.entityUrn + if not urn: # this case is rare + continue + + entityType = mcp.entityType + aspectName = mcp.aspectName + + if urn not in self._urns_seen: + self._urns_seen.add(urn) + self.entities[entityType].append(urn) + + if aspectName: # usually true + self.aspects[entityType][aspectName] += 1 def report_warning(self, key: str, reason: str) -> None: warnings = self.warnings.get(key, LossyList()) diff --git a/metadata-ingestion/src/datahub/ingestion/api/workunit.py b/metadata-ingestion/src/datahub/ingestion/api/workunit.py index 53a77798f756c8..d77c7454302569 100644 --- a/metadata-ingestion/src/datahub/ingestion/api/workunit.py +++ b/metadata-ingestion/src/datahub/ingestion/api/workunit.py @@ -2,7 +2,7 @@ from typing import Iterable, Optional, Union, overload from datahub.emitter.mcp import MetadataChangeProposalWrapper -from datahub.ingestion.api.source import WorkUnit +from datahub.ingestion.api.common import WorkUnit from datahub.metadata.com.linkedin.pegasus2avro.mxe import ( MetadataChangeEvent, MetadataChangeProposal, diff --git a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py index cf91109b2b2110..4a149441aaa364 100644 --- a/metadata-ingestion/src/datahub/ingestion/run/pipeline.py +++ b/metadata-ingestion/src/datahub/ingestion/run/pipeline.py @@ -506,7 +506,7 @@ def pretty_print_summary( click.echo(self.sink.get_report().as_string()) click.echo() workunits_produced = self.source.get_report().events_produced - duration_message = f"in {Report.to_str(self.source.get_report().running_time)}." + duration_message = f"in {humanfriendly.format_timespan(self.source.get_report().running_time)}." if self.source.get_report().failures or self.sink.get_report().failures: num_failures_source = self._approx_all_vals( diff --git a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py index 152394b4ebda81..9790bb5fd57399 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/bigquery_v2/bigquery_report.py @@ -45,7 +45,6 @@ class BigQueryV2Report(ProfilingSqlReport): include_table_lineage: Optional[bool] = None use_date_sharded_audit_log_tables: Optional[bool] = None log_page_size: Optional[pydantic.PositiveInt] = None - use_v2_audit_metadata: Optional[bool] = None use_exported_bigquery_audit_metadata: Optional[bool] = None end_time: Optional[datetime] = None log_entry_start_time: Optional[str] = None diff --git a/metadata-ingestion/src/datahub/ingestion/source/nifi.py b/metadata-ingestion/src/datahub/ingestion/source/nifi.py index aa6d8feee3b265..7b320861b8e1e2 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/nifi.py +++ b/metadata-ingestion/src/datahub/ingestion/source/nifi.py @@ -1074,9 +1074,8 @@ def construct_dataset_workunits( else dataset_platform ) wu = MetadataWorkUnit(id=f"{platform}.{dataset_name}.{mcp.aspectName}", mcp=mcp) - if wu.id not in self.report.event_ids: - self.report.report_workunit(wu) - yield wu + self.report.report_workunit(wu) + yield wu mcp = MetadataChangeProposalWrapper( entityUrn=dataset_urn, @@ -1086,6 +1085,5 @@ def construct_dataset_workunits( ) wu = MetadataWorkUnit(id=f"{platform}.{dataset_name}.{mcp.aspectName}", mcp=mcp) - if wu.id not in self.report.event_ids: - self.report.report_workunit(wu) - yield wu + self.report.report_workunit(wu) + yield wu diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py index c130ae1b3ece39..ef56e90c329754 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/proxy.py @@ -150,8 +150,6 @@ def catalogs(self, metastore: Metastore) -> Iterable[Catalog]: logger.info(f"Catalogs not found for metastore {metastore.name}") return [] - self.report.num_catalogs_to_scan[metastore.id] = len(response["catalogs"]) - for obj in response["catalogs"]: if obj["metastore_id"] == metastore.metastore_id: yield self._create_catalog(metastore, obj) @@ -170,10 +168,6 @@ def schemas(self, catalog: Catalog) -> Iterable[Schema]: logger.info(f"Schemas not found for catalog {catalog.name}") return [] - self.report.num_schemas_to_scan[ - f"{catalog.metastore.metastore_id}.{catalog.name}" - ] = len(response["schemas"]) - for schema in response["schemas"]: yield self._create_schema(catalog, schema) @@ -188,9 +182,6 @@ def tables(self, schema: Schema) -> Iterable[Table]: logger.info(f"Tables not found for schema {schema.name}") return [] - self.report.num_tables_to_scan[ - f"{schema.catalog.metastore.metastore_id}.{schema.catalog.name}.{schema.name}" - ] = len(response["tables"]) for table in response["tables"]: yield self._create_table(schema=schema, obj=table) diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py index 6872bd45d7512e..48affb14a5dfaf 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/report.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/report.py @@ -1,48 +1,14 @@ -from dataclasses import dataclass, field -from typing import Dict +from dataclasses import dataclass +from datahub.ingestion.api.report import EntityFilterReport from datahub.ingestion.source.state.stale_entity_removal_handler import ( StaleEntityRemovalSourceReport, ) -from datahub.utilities.lossy_collections import LossyList -from datahub.utilities.stats_collections import TopKDict @dataclass class UnityCatalogReport(StaleEntityRemovalSourceReport): - scanned_metastore: int = 0 - scanned_catalog: int = 0 - scanned_schema: int = 0 - scanned_table: int = 0 - num_catalogs_to_scan: Dict[str, int] = field(default_factory=TopKDict) - num_schemas_to_scan: Dict[str, int] = field(default_factory=TopKDict) - num_tables_to_scan: Dict[str, int] = field(default_factory=TopKDict) - tables_scanned: int = 0 - views_scanned: int = 0 - filtered: LossyList[str] = field(default_factory=LossyList) - - def increment_scanned_metastore(self, count: int = 1) -> None: - self.scanned_metastore = self.scanned_metastore + count - - def increment_scanned_catalog(self, count: int = 1) -> None: - self.scanned_catalog = self.scanned_catalog + count - - def increment_scanned_schema(self, count: int = 1) -> None: - self.scanned_schema = self.scanned_schema + count - - def increment_scanned_table(self, count: int = 1) -> None: - self.scanned_table = self.scanned_table + count - - def report_dropped(self, ent_name: str) -> None: - self.filtered.append(ent_name) - - def report_entity_scanned(self, name: str, ent_type: str = "table") -> None: - """ - Entity could be a view or a table - """ - if ent_type == "table": - self.tables_scanned += 1 - elif ent_type == "view": - self.views_scanned += 1 - else: - raise KeyError(f"Unknown entity {ent_type}.") + metastores: EntityFilterReport = EntityFilterReport.field(type="metastore") + catalogs: EntityFilterReport = EntityFilterReport.field(type="catalog") + schemas: EntityFilterReport = EntityFilterReport.field(type="schema") + tables: EntityFilterReport = EntityFilterReport.field(type="table/view") diff --git a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py index 3c9f3011204d47..521ce6c547acf1 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/unity/source.py +++ b/metadata-ingestion/src/datahub/ingestion/source/unity/source.py @@ -28,7 +28,6 @@ from datahub.ingestion.api.source import ( CapabilityReport, SourceCapability, - SourceReport, TestableSource, TestConnectionReport, ) @@ -99,7 +98,7 @@ class UnityCatalogSource(StatefulIngestionSourceBase, TestableSource): platform: str = "databricks" platform_instance_name: str - def get_report(self) -> SourceReport: + def get_report(self) -> UnityCatalogReport: return self.report def __init__(self, ctx: PipelineContext, config: UnityCatalogSourceConfig): @@ -175,100 +174,99 @@ def get_workunits_internal(self) -> Iterable[MetadataWorkUnit]: def process_metastores(self) -> Iterable[MetadataWorkUnit]: for metastore in self.unity_catalog_api_proxy.metastores(): if not self.config.metastore_id_pattern.allowed(metastore.metastore_id): - self.report.report_dropped(f"{metastore.metastore_id}.*.*.*") + self.report.metastores.dropped(metastore.metastore_id) continue - logger.info( - f"Started to process metastore: {metastore.metastore_id} ({metastore.name})" - ) + + logger.info(f"Started to process metastore: {metastore.metastore_id}") yield from self.gen_metastore_containers(metastore) yield from self.process_catalogs(metastore) - self.report.increment_scanned_metastore(1) - logger.info( - f"Finished to process metastore: {metastore.metastore_id} ({metastore.name})" - ) + + self.report.metastores.processed(metastore.metastore_id) def process_catalogs( self, metastore: proxy.Metastore ) -> Iterable[MetadataWorkUnit]: for catalog in self.unity_catalog_api_proxy.catalogs(metastore=metastore): if not self.config.catalog_pattern.allowed(catalog.name): - self.report.report_dropped(f"{catalog.name}.*.*") + self.report.catalogs.dropped(catalog.id) continue + yield from self.gen_catalog_containers(catalog) - self.report.increment_scanned_catalog(1) yield from self.process_schemas(catalog) + self.report.catalogs.processed(catalog.id) + def process_schemas(self, catalog: proxy.Catalog) -> Iterable[MetadataWorkUnit]: for schema in self.unity_catalog_api_proxy.schemas(catalog=catalog): if not self.config.schema_pattern.allowed(schema.name): - self.report.report_dropped(f"{catalog.name}.{schema.name}.*") + self.report.schemas.dropped(schema.id) continue yield from self.gen_schema_containers(schema) - self.report.increment_scanned_schema(1) - yield from self.process_tables(schema) + self.report.schemas.processed(schema.id) + def process_tables(self, schema: proxy.Schema) -> Iterable[MetadataWorkUnit]: for table in self.unity_catalog_api_proxy.tables(schema=schema): - if not self.config.table_pattern.allowed( + filter_table_name = ( f"{table.schema.catalog.name}.{table.schema.name}.{table.name}" - ): - self.report.report_dropped( - f"{schema.catalog.name}.{schema.name}.{table.name}" - ) + ) + + if not self.config.table_pattern.allowed(filter_table_name): + self.report.tables.dropped(table.id, type=table.type) continue - dataset_urn: str = make_dataset_urn_with_platform_instance( - platform=self.platform, - platform_instance=self.platform_instance_name, - name=table.id, - ) - yield from self.add_table_to_dataset_container(dataset_urn, schema) + yield from self.process_table(table, schema) - table_props = self._create_table_property_aspect(table) + self.report.tables.processed(table.id, type=table.type) - view_props = None - if table.view_definition: - view_props = self._create_view_property_aspect(table) + def process_table( + self, table: proxy.Table, schema: proxy.Schema + ) -> Iterable[MetadataWorkUnit]: + dataset_urn: str = make_dataset_urn_with_platform_instance( + platform=self.platform, + platform_instance=self.platform_instance_name, + name=table.id, + ) + yield from self.add_table_to_dataset_container(dataset_urn, schema) - sub_type = self._create_table_sub_type_aspect(table) - schema_metadata = self._create_schema_metadata_aspect(table) + table_props = self._create_table_property_aspect(table) - domain = self._get_domain_aspect( - dataset_name=str( - f"{table.schema.catalog.name}.{table.schema.name}.{table.name}" - ) - ) + view_props = None + if table.view_definition: + view_props = self._create_view_property_aspect(table) - if self.config.include_column_lineage: - self.unity_catalog_api_proxy.get_column_lineage(table) - lineage = self._generate_column_lineage_aspect(dataset_urn, table) - else: - self.unity_catalog_api_proxy.table_lineage(table) - lineage = self._generate_lineage_aspect(dataset_urn, table) - - yield from [ - mcp.as_workunit() - for mcp in MetadataChangeProposalWrapper.construct_many( - entityUrn=dataset_urn, - aspects=[ - table_props, - view_props, - sub_type, - schema_metadata, - domain, - lineage, - ], - ) - ] + sub_type = self._create_table_sub_type_aspect(table) + schema_metadata = self._create_schema_metadata_aspect(table) - self.report.report_entity_scanned( - f"{table.schema.catalog.name}.{table.schema.name}.{table.name}", - table.type, + domain = self._get_domain_aspect( + dataset_name=str( + f"{table.schema.catalog.name}.{table.schema.name}.{table.name}" ) + ) - self.report.increment_scanned_table(1) + if self.config.include_column_lineage: + self.unity_catalog_api_proxy.get_column_lineage(table) + lineage = self._generate_column_lineage_aspect(dataset_urn, table) + else: + self.unity_catalog_api_proxy.table_lineage(table) + lineage = self._generate_lineage_aspect(dataset_urn, table) + + yield from [ + mcp.as_workunit() + for mcp in MetadataChangeProposalWrapper.construct_many( + entityUrn=dataset_urn, + aspects=[ + table_props, + view_props, + sub_type, + schema_metadata, + domain, + lineage, + ], + ) + ] def _generate_column_lineage_aspect( self, dataset_urn: str, table: proxy.Table diff --git a/metadata-ingestion/tests/unit/config/test_config_model.py b/metadata-ingestion/tests/unit/config/test_config_model.py index 4b6e17df67497f..2bd3009c783887 100644 --- a/metadata-ingestion/tests/unit/config/test_config_model.py +++ b/metadata-ingestion/tests/unit/config/test_config_model.py @@ -4,6 +4,7 @@ import pytest from datahub.configuration.common import ConfigModel, redact_raw_config +from datahub.ingestion.source.unity.config import UnityCatalogSourceConfig def test_extras_not_allowed(): @@ -72,3 +73,12 @@ def test_config_redaction(): }, "options": "********", } + + +def test_shared_defaults(): + c1 = UnityCatalogSourceConfig(token="s", workspace_url="s") + c2 = UnityCatalogSourceConfig(token="s", workspace_url="s") + + assert c2.catalog_pattern.allow == [".*"] + c1.catalog_pattern.allow += ["foo"] + assert c2.catalog_pattern.allow == [".*"] diff --git a/metadata-ingestion/tests/unit/test_report.py b/metadata-ingestion/tests/unit/test_report.py new file mode 100644 index 00000000000000..4ff1fd97457df7 --- /dev/null +++ b/metadata-ingestion/tests/unit/test_report.py @@ -0,0 +1,26 @@ +import dataclasses + +from datahub.ingestion.api.report import EntityFilterReport, Report, SupportsAsObj + + +@dataclasses.dataclass +class MyReport(Report): + views: EntityFilterReport = EntityFilterReport.field(type="view") + + +def test_entity_filter_report(): + report = MyReport() + assert report.views.type == "view" + assert isinstance(report, SupportsAsObj) + + report2 = MyReport() + + report.views.processed(entity="foo") + report.views.dropped(entity="bar") + + assert ( + report.as_string() == "{'views': {'filtered': ['bar'], 'processed': ['foo']}}" + ) + + # Verify that the reports don't accidentally share any state. + assert report2.as_string() == "{'views': {'filtered': [], 'processed': []}}"