Skip to content

Commit

Permalink
feat(ingest): reporting revamp, part 1 (#7031)
Browse files Browse the repository at this point in the history
  • Loading branch information
hsheth2 authored Jan 18, 2023
1 parent d7aa612 commit e23eb71
Show file tree
Hide file tree
Showing 12 changed files with 267 additions and 164 deletions.
144 changes: 103 additions & 41 deletions metadata-ingestion/src/datahub/ingestion/api/report.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,23 @@
import dataclasses
import json
import logging
import pprint
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta, timezone
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict
from typing import Any, Dict, Optional

import humanfriendly
import pydantic
from pydantic import BaseModel
from typing_extensions import Literal, Protocol, runtime_checkable

from datahub.ingestion.api.report_helpers import format_datetime_relative
from datahub.utilities.lossy_collections import LossyList

logger = logging.getLogger(__name__)
LogLevel = Literal["ERROR", "WARNING", "INFO", "DEBUG"]

# The sort_dicts option was added in Python 3.8.
if sys.version_info >= (3, 8):
Expand All @@ -15,58 +26,60 @@
PPRINT_OPTIONS: Dict = {}


@runtime_checkable
class SupportsAsObj(Protocol):
def as_obj(self) -> dict:
...


def _stacklevel_if_supported(level: int) -> dict:
# The logging module added support for stacklevel in Python 3.8.
if sys.version_info >= (3, 8):
return {"stacklevel": level}
else:
return {}


@dataclass
class Report:
class Report(SupportsAsObj):
@staticmethod
def to_str(some_val: Any) -> str:
if isinstance(some_val, Enum):
return some_val.name
elif isinstance(some_val, timedelta):
return humanfriendly.format_timespan(some_val)
elif isinstance(some_val, datetime):
try:
# check if we have a tz_aware object or not (https://stackoverflow.com/questions/5802108/how-to-check-if-a-datetime-object-is-localized-with-pytz)
tz_aware = (
some_val.tzinfo is not None
and some_val.tzinfo.utcoffset(some_val) is not None
)
now = datetime.now(timezone.utc) if tz_aware else datetime.now()
diff = now - some_val
if abs(diff) < timedelta(seconds=1):
# the timestamps are close enough that printing a duration isn't useful
return f"{some_val} (now)."
elif diff > timedelta(seconds=0):
# timestamp is in the past
return f"{some_val} ({humanfriendly.format_timespan(diff)} ago)."
else:
# timestamp is in the future
return f"{some_val} (in {humanfriendly.format_timespan(some_val - now)})."
except Exception:
# we don't want to fail reporting because we were unable to pretty print a timestamp
return str(datetime)
else:
return str(some_val)

@staticmethod
def to_dict(some_val: Any) -> Any:
def to_pure_python_obj(some_val: Any) -> Any:
"""A cheap way to generate a dictionary."""
if hasattr(some_val, "as_obj"):

if isinstance(some_val, SupportsAsObj):
return some_val.as_obj()
if hasattr(some_val, "dict"): # pydantic models
elif isinstance(some_val, pydantic.BaseModel):
return some_val.dict()
if hasattr(some_val, "asdict"): # dataclasses
return some_val.asdict()
if isinstance(some_val, list):
return [Report.to_dict(v) for v in some_val if v is not None]
if isinstance(some_val, dict):
elif dataclasses.is_dataclass(some_val):
return dataclasses.asdict(some_val)
elif isinstance(some_val, list):
return [Report.to_pure_python_obj(v) for v in some_val if v is not None]
elif isinstance(some_val, timedelta):
return humanfriendly.format_timespan(some_val)
elif isinstance(some_val, datetime):
try:
return format_datetime_relative(some_val)
except Exception:
# we don't want to fail reporting because we were unable to pretty print a timestamp
return str(datetime)
elif isinstance(some_val, dict):
return {
Report.to_str(k): Report.to_dict(v)
Report.to_str(k): Report.to_pure_python_obj(v)
for k, v in some_val.items()
if v is not None
}

# fall through option
return Report.to_str(some_val)
elif isinstance(some_val, (int, float, bool)):
return some_val
else:
# fall through option
return Report.to_str(some_val)

def compute_stats(self) -> None:
"""A hook to compute derived stats"""
Expand All @@ -75,14 +88,63 @@ def compute_stats(self) -> None:
def as_obj(self) -> dict:
self.compute_stats()
return {
str(key): Report.to_dict(value)
str(key): Report.to_pure_python_obj(value)
for (key, value) in self.__dict__.items()
if value is not None
and not str(key).startswith("_") # ignore nulls and fields starting with _
# ignore nulls and fields starting with _
if value is not None and not str(key).startswith("_")
}

def as_string(self) -> str:
return pprint.pformat(self.as_obj(), width=150, **PPRINT_OPTIONS)

def as_json(self) -> str:
return json.dumps(self.as_obj())

# TODO add helper method for warning / failure status + counts?


class ReportAttribute(BaseModel):
severity: LogLevel = "DEBUG"
help: Optional[str] = None

@property
def logger_sev(self) -> int:
log_levels = {
"DEBUG": logging.DEBUG,
"INFO": logging.INFO,
"WARNING": logging.WARNING,
"ERROR": logging.ERROR,
}
return log_levels[self.severity]

def log(self, msg: str) -> None:
logger.log(level=self.logger_sev, msg=msg, **_stacklevel_if_supported(3))


class EntityFilterReport(ReportAttribute):
type: str

processed_entities: LossyList[str] = pydantic.Field(default_factory=LossyList)
dropped_entities: LossyList[str] = pydantic.Field(default_factory=LossyList)

def processed(self, entity: str, type: Optional[str] = None) -> None:
self.log(f"Processed {type or self.type} {entity}")
self.processed_entities.append(entity)

def dropped(self, entity: str, type: Optional[str] = None) -> None:
self.log(f"Filtered {type or self.type} {entity}")
self.dropped_entities.append(entity)

def as_obj(self) -> dict:
return {
"filtered": self.dropped_entities.as_obj(),
"processed": self.processed_entities.as_obj(),
}

@staticmethod
def field(type: str, severity: LogLevel = "DEBUG") -> "EntityFilterReport":
"""A helper to create a dataclass field."""

return dataclasses.field(
default_factory=lambda: EntityFilterReport(type=type, severity=severity)
)
23 changes: 23 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/report_helpers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
from datetime import datetime, timedelta, timezone

import humanfriendly


def format_datetime_relative(some_val: datetime) -> str:
"""Formats a datetime as a human-readable string relative to now."""

# check if we have a tz_aware object or not (https://stackoverflow.com/questions/5802108/how-to-check-if-a-datetime-object-is-localized-with-pytz)
tz_aware = (
some_val.tzinfo is not None and some_val.tzinfo.utcoffset(some_val) is not None
)
now = datetime.now(timezone.utc) if tz_aware else datetime.now()
diff = now - some_val
if abs(diff) < timedelta(seconds=1):
# the timestamps are close enough that printing a duration isn't useful
return f"{some_val} (now)"
elif diff > timedelta(seconds=0):
# timestamp is in the past
return f"{some_val} ({humanfriendly.format_timespan(diff)} ago)"
else:
# timestamp is in the future
return f"{some_val} (in {humanfriendly.format_timespan(some_val - now)})"
36 changes: 33 additions & 3 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
@@ -1,15 +1,19 @@
import datetime
from abc import ABCMeta, abstractmethod
from collections import defaultdict
from dataclasses import dataclass, field
from enum import Enum
from typing import Dict, Generic, Iterable, List, Optional, Type, TypeVar, Union, cast
from typing import Dict, Generic, Iterable, Optional, Set, Type, TypeVar, Union, cast

from pydantic import BaseModel

from datahub.configuration.common import ConfigModel
from datahub.emitter.mcp_builder import mcps_from_mce
from datahub.ingestion.api.closeable import Closeable
from datahub.ingestion.api.common import PipelineContext, RecordEnvelope, WorkUnit
from datahub.ingestion.api.report import Report
from datahub.ingestion.api.workunit import MetadataWorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import MetadataChangeEvent
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.type_annotations import get_class_from_annotation

Expand All @@ -34,14 +38,40 @@ class SourceCapability(Enum):
class SourceReport(Report):
events_produced: int = 0
events_produced_per_sec: int = 0
event_ids: List[str] = field(default_factory=LossyList)

_urns_seen: Set[str] = field(default_factory=set)
entities: Dict[str, list] = field(default_factory=lambda: defaultdict(LossyList))
aspects: Dict[str, Dict[str, int]] = field(
default_factory=lambda: defaultdict(lambda: defaultdict(int))
)

warnings: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict)
failures: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict)

def report_workunit(self, wu: WorkUnit) -> None:
self.events_produced += 1
self.event_ids.append(wu.id)

if isinstance(wu, MetadataWorkUnit):
# Specialized entity reporting.
if not isinstance(wu.metadata, MetadataChangeEvent):
mcps = [wu.metadata]
else:
mcps = list(mcps_from_mce(wu.metadata))

for mcp in mcps:
urn = mcp.entityUrn
if not urn: # this case is rare
continue

entityType = mcp.entityType
aspectName = mcp.aspectName

if urn not in self._urns_seen:
self._urns_seen.add(urn)
self.entities[entityType].append(urn)

if aspectName: # usually true
self.aspects[entityType][aspectName] += 1

def report_warning(self, key: str, reason: str) -> None:
warnings = self.warnings.get(key, LossyList())
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/api/workunit.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from typing import Iterable, Optional, Union, overload

from datahub.emitter.mcp import MetadataChangeProposalWrapper
from datahub.ingestion.api.source import WorkUnit
from datahub.ingestion.api.common import WorkUnit
from datahub.metadata.com.linkedin.pegasus2avro.mxe import (
MetadataChangeEvent,
MetadataChangeProposal,
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ def pretty_print_summary(
click.echo(self.sink.get_report().as_string())
click.echo()
workunits_produced = self.source.get_report().events_produced
duration_message = f"in {Report.to_str(self.source.get_report().running_time)}."
duration_message = f"in {humanfriendly.format_timespan(self.source.get_report().running_time)}."

if self.source.get_report().failures or self.sink.get_report().failures:
num_failures_source = self._approx_all_vals(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ class BigQueryV2Report(ProfilingSqlReport):
include_table_lineage: Optional[bool] = None
use_date_sharded_audit_log_tables: Optional[bool] = None
log_page_size: Optional[pydantic.PositiveInt] = None
use_v2_audit_metadata: Optional[bool] = None
use_exported_bigquery_audit_metadata: Optional[bool] = None
end_time: Optional[datetime] = None
log_entry_start_time: Optional[str] = None
Expand Down
10 changes: 4 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/source/nifi.py
Original file line number Diff line number Diff line change
Expand Up @@ -1074,9 +1074,8 @@ def construct_dataset_workunits(
else dataset_platform
)
wu = MetadataWorkUnit(id=f"{platform}.{dataset_name}.{mcp.aspectName}", mcp=mcp)
if wu.id not in self.report.event_ids:
self.report.report_workunit(wu)
yield wu
self.report.report_workunit(wu)
yield wu

mcp = MetadataChangeProposalWrapper(
entityUrn=dataset_urn,
Expand All @@ -1086,6 +1085,5 @@ def construct_dataset_workunits(
)

wu = MetadataWorkUnit(id=f"{platform}.{dataset_name}.{mcp.aspectName}", mcp=mcp)
if wu.id not in self.report.event_ids:
self.report.report_workunit(wu)
yield wu
self.report.report_workunit(wu)
yield wu
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ def catalogs(self, metastore: Metastore) -> Iterable[Catalog]:
logger.info(f"Catalogs not found for metastore {metastore.name}")
return []

self.report.num_catalogs_to_scan[metastore.id] = len(response["catalogs"])

for obj in response["catalogs"]:
if obj["metastore_id"] == metastore.metastore_id:
yield self._create_catalog(metastore, obj)
Expand All @@ -170,10 +168,6 @@ def schemas(self, catalog: Catalog) -> Iterable[Schema]:
logger.info(f"Schemas not found for catalog {catalog.name}")
return []

self.report.num_schemas_to_scan[
f"{catalog.metastore.metastore_id}.{catalog.name}"
] = len(response["schemas"])

for schema in response["schemas"]:
yield self._create_schema(catalog, schema)

Expand All @@ -188,9 +182,6 @@ def tables(self, schema: Schema) -> Iterable[Table]:
logger.info(f"Tables not found for schema {schema.name}")
return []

self.report.num_tables_to_scan[
f"{schema.catalog.metastore.metastore_id}.{schema.catalog.name}.{schema.name}"
] = len(response["tables"])
for table in response["tables"]:
yield self._create_table(schema=schema, obj=table)

Expand Down
Loading

0 comments on commit e23eb71

Please sign in to comment.