Skip to content

Commit

Permalink
Merge remote-tracking branch 'acryl/jj--add-structured-logging-to-ing…
Browse files Browse the repository at this point in the history
…estion' into jj--add-structured-logging-to-ingestion
  • Loading branch information
John Joyce authored and John Joyce committed Jul 2, 2024
2 parents 8d5f0f3 + 4a637b0 commit 856731c
Show file tree
Hide file tree
Showing 4 changed files with 198 additions and 36 deletions.
75 changes: 75 additions & 0 deletions metadata-ingestion/src/datahub/ingestion/api/exception.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
from enum import Enum


class ScanUnauthorizedException(Exception):
pass


class LineageUnauthorizedException(Exception):
pass


class UsageUnauthorizedException(Exception):
pass


class ProfilingUnauthorizedException(Exception):
pass


class LineageQueryParsingFailedException(Exception):
pass


class UsageQueryParsingFailedException(Exception):
pass


class ConnectionFailedCoordinatesException(Exception):
pass


class ConnectionFailedCredentialsException(Exception):
pass


class ConnectionFailedServiceUnavailableException(Exception):
pass


class ConnectionFailedServiceTimeoutException(Exception):
pass


class ConnectionFailedUnknownException(Exception):
pass


class StructuredReportLogType(Enum):
SCAN_UNAUTHORIZED = "SCAN_UNAUTHORIZED"
LINEAGE_UNAUTHORIZED = "LINEAGE_UNAUTHORIZED"
USAGE_UNAUTHORIZED = "USAGE_UNAUTHORIZED"
PROFILING_UNAUTHORIZED = "PROFILING_UNAUTHORIZED"
LINEAGE_QUERY_PARSING_FAILED = "LINEAGE_QUERY_PARSING_FAILED"
USAGE_QUERY_PARSING_FAILED = "USAGE_QUERY_PARSING_FAILED"
CONNECTION_FAILED_COORDINATES = "CONNECTION_FAILED_COORDINATES"
CONNECTION_FAILED_CREDENTIALS = "CONNECTION_FAILED_CREDENTIALS"
CONNECTION_FAILED_SERVICE_UNAVAILABLE = "CONNECTION_FAILED_SERVICE_UNAVAILABLE"
CONNECTION_FAILED_SERVICE_TIMEOUT = "CONNECTION_FAILED_SERVICE_TIMEOUT"
CONNECTION_FAILED_UNKNOWN = "CONNECTION_FAILED_UNKNOWN"
UNKNOWN = "UNKNOWN"


EXCEPTION_TO_REPORT_TYPE = {
ScanUnauthorizedException: StructuredReportLogType.SCAN_UNAUTHORIZED,
LineageUnauthorizedException: StructuredReportLogType.LINEAGE_UNAUTHORIZED,
UsageUnauthorizedException: StructuredReportLogType.USAGE_UNAUTHORIZED,
ProfilingUnauthorizedException: StructuredReportLogType.PROFILING_UNAUTHORIZED,
LineageQueryParsingFailedException: StructuredReportLogType.LINEAGE_QUERY_PARSING_FAILED,
UsageQueryParsingFailedException: StructuredReportLogType.USAGE_QUERY_PARSING_FAILED,
ConnectionFailedCoordinatesException: StructuredReportLogType.CONNECTION_FAILED_COORDINATES,
ConnectionFailedCredentialsException: StructuredReportLogType.CONNECTION_FAILED_CREDENTIALS,
ConnectionFailedServiceUnavailableException: StructuredReportLogType.CONNECTION_FAILED_SERVICE_UNAVAILABLE,
ConnectionFailedServiceTimeoutException: StructuredReportLogType.CONNECTION_FAILED_SERVICE_TIMEOUT,
ConnectionFailedUnknownException: StructuredReportLogType.CONNECTION_FAILED_UNKNOWN,
}
118 changes: 100 additions & 18 deletions metadata-ingestion/src/datahub/ingestion/api/source.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,20 @@ class SourceCapability(Enum):
CLASSIFICATION = "Classification"


class StructuredLogLevel(Enum):
INFO = "INFO"
WARN = "WARN"
ERROR = "ERROR"


@dataclass
class StructuredLog(Report):
level: StructuredLogLevel
type: str
message: Optional[str]
context: LossyList[str]


@dataclass
class SourceReport(Report):
events_produced: int = 0
Expand All @@ -76,8 +90,29 @@ class SourceReport(Report):
default_factory=lambda: defaultdict(lambda: defaultdict(LossyList))
)

warnings: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict)
failures: LossyDict[str, LossyList[str]] = field(default_factory=LossyDict)
# Legacy objects which will be converted to structured logs prior to reporting.
# In the future, we will remove this in favor of the consolidated logging framework.
warnings: LossyDict[str, StructuredLog] = field(default_factory=LossyDict)
failures: LossyDict[str, StructuredLog] = field(default_factory=LossyDict)
infos: LossyDict[str, StructuredLog] = field(default_factory=LossyDict)

@property
def structured_logs(self) -> List[StructuredLog]:
consolidated_logs = []

# Append all warning logs to new consolidated_logs field
for log in self.warnings.values():
consolidated_logs.append(log)

# Append all failure logs to new consolidated_logs field
for log in self.failures.values():
consolidated_logs.append(log)

# Append all information logs to the new consolidated_logs field
for log in self.infos.values():
consolidated_logs.append(log)

return consolidated_logs

def report_workunit(self, wu: WorkUnit) -> None:
self.events_produced += 1
Expand Down Expand Up @@ -109,23 +144,70 @@ def report_workunit(self, wu: WorkUnit) -> None:
"fineGrainedLineages"
].append(urn)

def report_warning(self, key: str, reason: str) -> None:
warnings = self.warnings.get(key, LossyList())
warnings.append(reason)
self.warnings[key] = warnings

def warning(self, key: str, reason: str) -> None:
self.report_warning(key, reason)
logger.warning(f"{key} => {reason}", stacklevel=2)

def report_failure(self, key: str, reason: str) -> None:
failures = self.failures.get(key, LossyList())
failures.append(reason)
self.failures[key] = failures
def report_warning(
self, type: str, message: str, context: Optional[str] = None
) -> None:
key = f"{type}-{message}"
if key not in self.warnings:
context_list: LossyList[str] = LossyList()
if context is not None:
context_list.append(context)
self.warnings[key] = StructuredLog(
level=StructuredLogLevel.WARN,
type=type,
message=message,
context=context_list,
)
else:
if context is not None:
self.warnings[key].context.append(context)

def warning(self, type: str, message: str, context: Optional[str] = None) -> None:
self.report_warning(type, message, context)
logger.warning(f"{type} => {message}", stacklevel=2)

def report_failure(
self, type: str, message: str, context: Optional[str] = None
) -> None:
key = f"{type}-{message}"
if key not in self.failures:
context_list: LossyList[str] = LossyList()
if context is not None:
context_list.append(context)
self.failures[key] = StructuredLog(
level=StructuredLogLevel.ERROR,
type=type,
message=message,
context=context_list,
)
else:
if context is not None:
self.failures[key].context.append(context)

def failure(self, type: str, message: str, context: Optional[str] = None) -> None:
self.report_failure(type, message, context)

def report_info(
self, type: str, message: str, context: Optional[str] = None
) -> None:
key = f"{type}-{message}"
if key not in self.infos:
context_list: LossyList[str] = LossyList()
if context is not None:
context_list.append(context)
self.infos[key] = StructuredLog(
level=StructuredLogLevel.INFO,
type=type,
message=message,
context=context_list,
)
else:
if context is not None:
self.infos[key].context.append(context)

def failure(self, key: str, reason: str) -> None:
self.report_failure(key, reason)
logger.error(f"{key} => {reason}", stacklevel=2)
def info(self, type: str, message: str, context: Optional[str] = None) -> None:
self.report_info(type, message, context)
logger.info(f"{type} => {message}", stacklevel=2)

def __post_init__(self) -> None:
self.start_time = datetime.datetime.now()
Expand Down
33 changes: 19 additions & 14 deletions metadata-ingestion/src/datahub/ingestion/run/pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
)
from datahub.ingestion.api.committable import CommitPolicy
from datahub.ingestion.api.common import EndOfStream, PipelineContext, RecordEnvelope
from datahub.ingestion.api.exception import EXCEPTION_TO_REPORT_TYPE
from datahub.ingestion.api.global_context import set_graph_context
from datahub.ingestion.api.pipeline_run_listener import PipelineRunListener
from datahub.ingestion.api.report import Report
Expand Down Expand Up @@ -50,7 +51,7 @@
clear_global_warnings,
get_global_warnings,
)
from datahub.utilities.lossy_collections import LossyDict, LossyList
from datahub.utilities.lossy_collections import LossyDict

logger = logging.getLogger(__name__)
_REPORT_PRINT_INTERVAL_SECONDS = 60
Expand Down Expand Up @@ -124,7 +125,7 @@ class PipelineInitError(Exception):
class PipelineStatus(enum.Enum):
UNKNOWN = enum.auto()
COMPLETED = enum.auto()
PIPELINE_ERROR = enum.auto()
ERROR = enum.auto()
CANCELLED = enum.auto()


Expand Down Expand Up @@ -508,16 +509,8 @@ def run(self) -> None:
logger.error("Caught error", exc_info=e)
raise
except Exception as exc:
self.final_status = PipelineStatus.PIPELINE_ERROR
logger.exception("Ingestion pipeline threw an uncaught exception")

# HACK: We'll report this as a source error, since we don't have a great place to put it.
# It theoretically could've come from any part of the pipeline, but usually it's from the source.
# This ensures that it is included in the report, and that the run is marked as failed.
self.source.get_report().report_failure(
"pipeline_error",
f"Ingestion pipeline threw an uncaught exception: {exc}",
)
self.final_status = PipelineStatus.ERROR
self._handle_uncaught_pipeline_exception(exc)
finally:
clear_global_warnings()

Expand Down Expand Up @@ -627,7 +620,7 @@ def log_ingestion_stats(self) -> None:
self.ctx.graph,
)

def _approx_all_vals(self, d: LossyDict[str, LossyList]) -> int:
def _approx_all_vals(self, d: LossyDict[str, Any]) -> int:
result = d.dropped_keys_count()
for k in d:
result += len(d[k])
Expand Down Expand Up @@ -657,7 +650,7 @@ def pretty_print_summary(
if (
not workunits_produced
and not currently_running
and self.final_status == PipelineStatus.PIPELINE_ERROR
and self.final_status == PipelineStatus.ERROR
):
# If the pipeline threw an uncaught exception before doing anything, printing
# out the report would just be annoying.
Expand Down Expand Up @@ -725,6 +718,18 @@ def pretty_print_summary(
)
return 0

def _handle_uncaught_pipeline_exception(self, exc: Exception) -> None:
exception_type = type(exc)
if exception_type in EXCEPTION_TO_REPORT_TYPE:
report_type = EXCEPTION_TO_REPORT_TYPE[exception_type]
self.source.get_report().report_failure(report_type.value, str(exc))
else:
logger.exception("Ingestion pipeline threw an uncaught exception")
self.source.get_report().report_failure(
"pipeline_error",
f"Ingestion pipeline threw an uncaught exception: {exc}",
)

def _get_structured_report(self) -> Dict[str, Any]:
return {
"cli": self.cli_report.as_obj(),
Expand Down
8 changes: 4 additions & 4 deletions metadata-ingestion/tests/unit/test_nifi_source.py
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ def test_single_user_auth_failed_to_get_token():

assert source.get_report().failures
assert "Failed to authenticate" in list(
source.get_report().failures[config.site_url]
source.get_report().failures[config.site_url].context
)


Expand All @@ -353,7 +353,7 @@ def test_kerberos_auth_failed_to_get_token():

assert source.get_report().failures
assert "Failed to authenticate" in list(
source.get_report().failures[config.site_url]
source.get_report().failures[config.site_url].context
)


Expand All @@ -373,7 +373,7 @@ def test_client_cert_auth_failed():

assert source.get_report().failures
assert "Failed to authenticate" in list(
source.get_report().failures[config.site_url]
source.get_report().failures[config.site_url].context
)


Expand All @@ -393,7 +393,7 @@ def test_failure_to_create_nifi_flow():

assert source.get_report().failures
assert "Failed to get root process group flow" in list(
source.get_report().failures[config.site_url]
source.get_report().failures[config.site_url].context
)


Expand Down

0 comments on commit 856731c

Please sign in to comment.