Skip to content

Commit

Permalink
feat(ingest):looker - reduce mem usage, misc reporting improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
shirshanka committed Sep 4, 2022
1 parent 3cd1feb commit 9448811
Show file tree
Hide file tree
Showing 11 changed files with 562 additions and 386 deletions.
6 changes: 5 additions & 1 deletion metadata-ingestion/src/datahub/cli/ingest_cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,9 @@ def ingest() -> None:
default=False,
help="Turn off default reporting of ingestion results to DataHub",
)
@click.option(
"--no-spinner", type=bool, is_flag=True, default=False, help="Turn off spinner"
)
@click.pass_context
@telemetry.with_telemetry
@memory_leak_detector.with_leak_detection
Expand All @@ -117,6 +120,7 @@ def run(
test_source_connection: bool,
report_to: str,
no_default_report: bool,
no_spinner: bool,
) -> None:
"""Ingest metadata into DataHub."""

Expand All @@ -125,7 +129,7 @@ def run_pipeline_to_completion(
) -> int:
logger.info("Starting metadata ingestion")
with click_spinner.spinner(
beep=False, disable=False, force=False, stream=sys.stdout
beep=False, disable=no_spinner, force=False, stream=sys.stdout
):
try:
pipeline.run()
Expand Down
35 changes: 29 additions & 6 deletions metadata-ingestion/src/datahub/ingestion/api/report.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,12 @@
import pprint
import sys
from dataclasses import dataclass
from datetime import datetime, timedelta
from enum import Enum
from typing import Any, Dict

import humanfriendly

# The sort_dicts option was added in Python 3.8.
if sys.version_info >= (3, 8):
PPRINT_OPTIONS = {"sort_dicts": False}
Expand All @@ -18,6 +21,22 @@ class Report:
def to_str(some_val: Any) -> str:
if isinstance(some_val, Enum):
return some_val.name
elif isinstance(some_val, timedelta):
return humanfriendly.format_timespan(some_val)
elif isinstance(some_val, datetime):
now = datetime.now()
diff = now - some_val
if abs(diff) < timedelta(seconds=1):
# the timestamps are close enough that printing a duration isn't useful
return f"{some_val} (now)."
elif diff > timedelta(seconds=0):
# timestamp is in the past
return f"{some_val} ({humanfriendly.format_timespan(diff)} ago)."
else:
# timestamp is in the future
return (
f"{some_val} (in {humanfriendly.format_timespan(some_val - now)})."
)
else:
return str(some_val)

Expand All @@ -26,18 +45,21 @@ def to_dict(some_val: Any) -> Any:
"""A cheap way to generate a dictionary."""
if hasattr(some_val, "as_obj"):
return some_val.as_obj()
if hasattr(some_val, "dict"):
if hasattr(some_val, "dict"): # pydantic models
return some_val.dict()
elif isinstance(some_val, list):
if hasattr(some_val, "asdict"): # dataclasses
return some_val.asdict()
if isinstance(some_val, list):
return [Report.to_dict(v) for v in some_val if v is not None]
elif isinstance(some_val, dict):
if isinstance(some_val, dict):
return {
Report.to_str(k): Report.to_dict(v)
for k, v in some_val.items()
if v is not None
}
else:
return Report.to_str(some_val)

# fall through option
return Report.to_str(some_val)

def compute_stats(self) -> None:
"""A hook to compute derived stats"""
Expand All @@ -48,7 +70,8 @@ def as_obj(self) -> dict:
return {
str(key): Report.to_dict(value)
for (key, value) in self.__dict__.items()
if value is not None # ignore nulls
if value is not None
and not str(key).startswith("_") # ignore nulls and fields starting with _
}

def as_string(self) -> str:
Expand Down
12 changes: 7 additions & 5 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,14 +55,16 @@ def report_failure(self, key: str, reason: str) -> None:

def __post_init__(self) -> None:
self.start_time = datetime.datetime.now()
self.running_time_in_seconds = 0
self.running_time: datetime.timedelta = datetime.timedelta(seconds=0)

def compute_stats(self) -> None:
duration = int((datetime.datetime.now() - self.start_time).total_seconds())
duration = datetime.datetime.now() - self.start_time
workunits_produced = self.events_produced
if duration > 0:
self.events_produced_per_sec: int = int(workunits_produced / duration)
self.running_time_in_seconds = duration
if duration.total_seconds() > 0:
self.events_produced_per_sec: int = int(
workunits_produced / duration.total_seconds()
)
self.running_time = duration
else:
self.read_rate = 0

Expand Down
24 changes: 12 additions & 12 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -315,13 +315,12 @@ def create(

def _time_to_print(self) -> bool:
self.num_intermediate_workunits += 1
if self.num_intermediate_workunits > 1000:
current_time = int(time.time())
if current_time - self.last_time_printed > 10:
# we print
self.num_intermediate_workunits = 0
self.last_time_printed = current_time
return True
current_time = int(time.time())
if current_time - self.last_time_printed > 10:
# we print
self.num_intermediate_workunits = 0
self.last_time_printed = current_time
return True
return False

def run(self) -> None:
Expand All @@ -340,8 +339,11 @@ def run(self) -> None:
self.source.get_workunits(),
self.preview_workunits if self.preview_mode else None,
):
if self._time_to_print():
self.pretty_print_summary(currently_running=True)
try:
if self._time_to_print():
self.pretty_print_summary(currently_running=True)
except Exception as e:
logger.warning("Failed to print summary", e)

if not self.dry_run:
self.sink.handle_work_unit_start(wu)
Expand Down Expand Up @@ -495,9 +497,7 @@ def pretty_print_summary(
click.echo(self.sink.get_report().as_string())
click.echo()
workunits_produced = self.source.get_report().events_produced
duration_message = (
f"in {self.source.get_report().running_time_in_seconds} seconds."
)
duration_message = f"in {Report.to_str(self.source.get_report().running_time)}."

if self.source.get_report().failures or self.sink.get_report().failures:
num_failures_source = self._approx_all_vals(
Expand Down
2 changes: 1 addition & 1 deletion metadata-ingestion/src/datahub/ingestion/source/file.py
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ def compute_stats(self) -> None:
if percentage_completion > 0:
self.estimated_time_to_completion_in_minutes = int(
(
self.running_time_in_seconds
self.running_time.total_seconds()
* (100 - percentage_completion)
/ percentage_completion
)
Expand Down
Loading

0 comments on commit 9448811

Please sign in to comment.