From 94bcb79b8462738b3144d3800731c0c7066bad4c Mon Sep 17 00:00:00 2001 From: noggi Date: Thu, 31 Oct 2024 10:59:21 -0700 Subject: [PATCH] feat(ingestion): Add execution request cleanup job (#11765) --- .../bootstrapmcp/datahub-test-mcp.yaml | 8 +- .../docs/sources/gc/gc_recipe.dhub.yml | 11 + .../datahub/ingestion/source/gc/datahub_gc.py | 25 +- .../source/gc/execution_request_cleanup.py | 240 ++++++++++++++++++ .../src/main/resources/bootstrap_mcps.yaml | 4 +- .../bootstrap_mcps/ingestion-datahub-gc.yaml | 8 +- 6 files changed, 291 insertions(+), 5 deletions(-) create mode 100644 metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py diff --git a/datahub-upgrade/src/test/resources/bootstrapmcp/datahub-test-mcp.yaml b/datahub-upgrade/src/test/resources/bootstrapmcp/datahub-test-mcp.yaml index d049a807ac1d8..233db06d61c3f 100644 --- a/datahub-upgrade/src/test/resources/bootstrapmcp/datahub-test-mcp.yaml +++ b/datahub-upgrade/src/test/resources/bootstrapmcp/datahub-test-mcp.yaml @@ -23,7 +23,13 @@ keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}} soft_deleted_entities_cleanup: retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}} + execution_request_cleanup: + keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}} + keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}} + keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}} + batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}} + enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}} extraArgs: {} debugMode: false executorId: default - headers: {} \ No newline at end of file + headers: {} diff --git a/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml b/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml index 21734cd4e03fa..05e5205f7da41 100644 --- a/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml +++ b/metadata-ingestion/docs/sources/gc/gc_recipe.dhub.yml @@ -22,3 +22,14 @@ source: soft_deleted_entities_cleanup: # Delete soft deleted entities which were deleted 10 days ago retention_days: 10 + execution_request_cleanup: + # Minimum number of execution requests to keep, per ingestion source + keep_history_min_count: 10 + # Maximum number of execution requests to keep, per ingestion source + keep_history_max_count: 1000 + # Maximum number of days to keep execution requests for, per ingestion source + keep_history_max_days: 30 + # Number of records per read operation + batch_read_size: 100 + # Global switch for this cleanup task + enabled: true diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py index 1897f3f288ec0..c4b4186f45fc3 100644 --- a/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/datahub_gc.py @@ -24,6 +24,11 @@ DataProcessCleanupConfig, DataProcessCleanupReport, ) +from datahub.ingestion.source.gc.execution_request_cleanup import ( + DatahubExecutionRequestCleanup, + DatahubExecutionRequestCleanupConfig, + DatahubExecutionRequestCleanupReport, +) from datahub.ingestion.source.gc.soft_deleted_entity_cleanup import ( SoftDeletedEntitiesCleanup, SoftDeletedEntitiesCleanupConfig, @@ -70,9 +75,18 @@ class DataHubGcSourceConfig(ConfigModel): description="Configuration for soft deleted entities cleanup", ) + execution_request_cleanup: Optional[DatahubExecutionRequestCleanupConfig] = Field( + default=None, + description="Configuration for execution request cleanup", + ) + @dataclass -class DataHubGcSourceReport(DataProcessCleanupReport, SoftDeletedEntitiesReport): +class DataHubGcSourceReport( + DataProcessCleanupReport, + SoftDeletedEntitiesReport, + DatahubExecutionRequestCleanupReport, +): expired_tokens_revoked: int = 0 @@ -97,6 +111,7 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig): self.graph = ctx.require_graph("The DataHubGc source") self.dataprocess_cleanup: Optional[DataProcessCleanup] = None self.soft_deleted_entities_cleanup: Optional[SoftDeletedEntitiesCleanup] = None + self.execution_request_cleanup: Optional[DatahubExecutionRequestCleanup] = None if self.config.dataprocess_cleanup: self.dataprocess_cleanup = DataProcessCleanup( @@ -109,6 +124,12 @@ def __init__(self, ctx: PipelineContext, config: DataHubGcSourceConfig): self.report, self.config.dry_run, ) + if self.config.execution_request_cleanup: + self.execution_request_cleanup = DatahubExecutionRequestCleanup( + config=self.config.execution_request_cleanup, + graph=self.graph, + report=self.report, + ) @classmethod def create(cls, config_dict, ctx): @@ -130,6 +151,8 @@ def get_workunits_internal( yield from self.dataprocess_cleanup.get_workunits_internal() if self.soft_deleted_entities_cleanup: self.soft_deleted_entities_cleanup.cleanup_soft_deleted_entities() + if self.execution_request_cleanup: + self.execution_request_cleanup.run() yield from [] def truncate_indices(self) -> None: diff --git a/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py new file mode 100644 index 0000000000000..570df4e99ab13 --- /dev/null +++ b/metadata-ingestion/src/datahub/ingestion/source/gc/execution_request_cleanup.py @@ -0,0 +1,240 @@ +import logging +import time +from typing import Any, Dict, Iterator, Optional + +from pydantic import BaseModel, Field + +from datahub.configuration.common import ConfigModel +from datahub.ingestion.api.source import SourceReport +from datahub.ingestion.graph.client import DataHubGraph + +logger = logging.getLogger(__name__) + +DATAHUB_EXECUTION_REQUEST_ENTITY_NAME = "dataHubExecutionRequest" +DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME = "dataHubExecutionRequestKey" +DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME = "dataHubExecutionRequestInput" +DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME = "dataHubExecutionRequestResult" + + +class DatahubExecutionRequestCleanupConfig(ConfigModel): + keep_history_min_count: int = Field( + 10, + description="Minimum number of execution requests to keep, per ingestion source", + ) + + keep_history_max_count: int = Field( + 1000, + description="Maximum number of execution requests to keep, per ingestion source", + ) + + keep_history_max_days: int = Field( + 30, + description="Maximum number of days to keep execution requests for, per ingestion source", + ) + + batch_read_size: int = Field( + 100, + description="Number of records per read operation", + ) + + enabled: bool = Field( + True, + description="Global switch for this cleanup task", + ) + + def keep_history_max_milliseconds(self): + return self.keep_history_max_days * 24 * 3600 * 1000 + + +class DatahubExecutionRequestCleanupReport(SourceReport): + execution_request_cleanup_records_read: int = 0 + execution_request_cleanup_records_preserved: int = 0 + execution_request_cleanup_records_deleted: int = 0 + execution_request_cleanup_read_errors: int = 0 + execution_request_cleanup_delete_errors: int = 0 + + +class CleanupRecord(BaseModel): + urn: str + request_id: str + status: str + ingestion_source: str + requested_at: int + + +class DatahubExecutionRequestCleanup: + def __init__( + self, + graph: DataHubGraph, + report: DatahubExecutionRequestCleanupReport, + config: Optional[DatahubExecutionRequestCleanupConfig] = None, + ) -> None: + + self.graph = graph + self.report = report + self.instance_id = int(time.time()) + + if config is not None: + self.config = config + else: + self.config = DatahubExecutionRequestCleanupConfig() + + def _to_cleanup_record(self, entry: Dict) -> CleanupRecord: + input_aspect = ( + entry.get("aspects", {}) + .get(DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME, {}) + .get("value", {}) + ) + result_aspect = ( + entry.get("aspects", {}) + .get(DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME, {}) + .get("value", {}) + ) + key_aspect = ( + entry.get("aspects", {}) + .get(DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME, {}) + .get("value", {}) + ) + return CleanupRecord( + urn=entry.get("urn"), + request_id=key_aspect.get("id"), + requested_at=input_aspect.get("requestedAt", 0), + status=result_aspect.get("status", "PENDING"), + ingestion_source=input_aspect.get("source", {}).get("ingestionSource", ""), + ) + + def _scroll_execution_requests( + self, overrides: Dict[str, Any] = {} + ) -> Iterator[CleanupRecord]: + headers: Dict[str, Any] = { + "Accept": "application/json", + "Content-Type": "application/json", + } + params = { + "aspectNames": [ + DATAHUB_EXECUTION_REQUEST_KEY_ASPECT_NAME, + DATAHUB_EXECUTION_REQUEST_INPUT_ASPECT_NAME, + DATAHUB_EXECUTION_REQUEST_RESULT_ASPECT_NAME, + ], + "count": str(self.config.batch_read_size), + "sort": "requestTimeMs", + "sortOrder": "DESCENDING", + "systemMetadata": "false", + "skipCache": "true", + } + params.update(overrides) + + while True: + try: + url = f"{self.graph.config.server}/openapi/v2/entity/{DATAHUB_EXECUTION_REQUEST_ENTITY_NAME}" + response = self.graph._session.get(url, headers=headers, params=params) + response.raise_for_status() + document = response.json() + + entries = document.get("results", []) + for entry in entries: + yield self._to_cleanup_record(entry) + + if "scrollId" not in document: + break + params["scrollId"] = document["scrollId"] + except Exception as e: + logger.error( + f"ergc({self.instance_id}): failed to fetch next batch of execution requests: {e}" + ) + self.report.execution_request_cleanup_read_errors += 1 + + def _scroll_garbage_records(self): + state: Dict[str, Dict] = {} + + now_ms = int(time.time()) * 1000 + running_guard_timeout = now_ms - 30 * 24 * 3600 * 1000 + + for entry in self._scroll_execution_requests(): + self.report.execution_request_cleanup_records_read += 1 + key = entry.ingestion_source + + # Always delete corrupted records + if not key: + logger.warning( + f"ergc({self.instance_id}): will delete corrupted entry with missing source key: {entry}" + ) + yield entry + continue + + if key not in state: + state[key] = {} + state[key]["cutoffTimestamp"] = ( + entry.requested_at - self.config.keep_history_max_milliseconds() + ) + + state[key]["count"] = state[key].get("count", 0) + 1 + + # Do not delete if number of requests is below minimum + if state[key]["count"] < self.config.keep_history_min_count: + self.report.execution_request_cleanup_records_preserved += 1 + continue + + # Do not delete if number of requests do not exceed allowed maximum, + # or the cutoff date. + if (state[key]["count"] < self.config.keep_history_max_count) and ( + entry.requested_at > state[key]["cutoffTimestamp"] + ): + self.report.execution_request_cleanup_records_preserved += 1 + continue + + # Do not delete if status is RUNNING or PENDING and created within last month. If the record is >month old and it did not + # transition to a final state within that timeframe, it likely has no value. + if entry.requested_at > running_guard_timeout and entry.status in [ + "RUNNING", + "PENDING", + ]: + self.report.execution_request_cleanup_records_preserved += 1 + continue + + # Otherwise delete current record + logger.info( + ( + f"ergc({self.instance_id}): going to delete {entry.request_id} in source {key}; " + f"source count: {state[key]['count']}; " + f"source cutoff: {state[key]['cutoffTimestamp']}; " + f"record timestamp: {entry.requested_at}." + ) + ) + self.report.execution_request_cleanup_records_deleted += 1 + yield entry + + def _delete_entry(self, entry: CleanupRecord) -> None: + try: + logger.info( + f"ergc({self.instance_id}): going to delete ExecutionRequest {entry.request_id}" + ) + self.graph.delete_entity(entry.urn, True) + except Exception as e: + self.report.execution_request_cleanup_delete_errors += 1 + logger.error( + f"ergc({self.instance_id}): failed to delete ExecutionRequest {entry.request_id}: {e}" + ) + + def run(self) -> None: + if not self.config.enabled: + logger.info( + f"ergc({self.instance_id}): ExecutionRequest cleaner is disabled." + ) + return + + logger.info( + ( + f"ergc({self.instance_id}): Starting cleanup of ExecutionRequest records; " + f"max days: {self.config.keep_history_max_days}, " + f"min records: {self.config.keep_history_min_count}, " + f"max records: {self.config.keep_history_max_count}." + ) + ) + + for entry in self._scroll_garbage_records(): + self._delete_entry(entry) + + logger.info( + f"ergc({self.instance_id}): Finished cleanup of ExecutionRequest records." + ) diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml index 10ae176b2c31e..dda79120118e5 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps.yaml @@ -38,7 +38,7 @@ bootstrap: # Ingestion Recipes - name: ingestion-datahub-gc - version: v1 + version: v2 optional: true mcps_location: "bootstrap_mcps/ingestion-datahub-gc.yaml" - values_env: "DATAHUB_GC_BOOTSTRAP_VALUES" \ No newline at end of file + values_env: "DATAHUB_GC_BOOTSTRAP_VALUES" diff --git a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml index e70ab1162a381..f30ce148ec6cb 100644 --- a/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml +++ b/metadata-service/configuration/src/main/resources/bootstrap_mcps/ingestion-datahub-gc.yaml @@ -27,7 +27,13 @@ keep_last_n: {{dataprocess_cleanup.keep_last_n}}{{^dataprocess_cleanup.keep_last_n}}5{{/dataprocess_cleanup.keep_last_n}} soft_deleted_entities_cleanup: retention_days: {{soft_deleted_entities_cleanup.retention_days}}{{^soft_deleted_entities_cleanup.retention_days}}10{{/soft_deleted_entities_cleanup.retention_days}} - extraArgs: {} + execution_request_cleanup: + keep_history_min_count: {{execution_request_cleanup.keep_history_min_count}}{{^execution_request_cleanup.keep_history_min_count}}10{{/execution_request_cleanup.keep_history_min_count}} + keep_history_max_count: {{execution_request_cleanup.keep_history_max_count}}{{^execution_request_cleanup.keep_history_max_count}}1000{{/execution_request_cleanup.keep_history_max_count}} + keep_history_max_days: {{execution_request_cleanup.keep_history_max_days}}{{^execution_request_cleanup.keep_history_max_days}}30{{/execution_request_cleanup.keep_history_max_days}} + batch_read_size: {{execution_request_cleanup.batch_read_size}}{{^execution_request_cleanup.batch_read_size}}100{{/execution_request_cleanup.batch_read_size}} + enabled: {{execution_request_cleanup.enabled}}{{^execution_request_cleanup.enabled}}false{{/execution_request_cleanup.enabled}} + extraArgs: {} debugMode: false executorId: default source: