-
Notifications
You must be signed in to change notification settings - Fork 3k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(ingest): add --log-file
option and show CLI logs in UI report
#7118
Changes from 7 commits
f5595a0
308428f
3954c51
b1c3d60
4e36436
efb4d24
3e22c5e
62a890d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -393,5 +393,5 @@ def rollback( | |
writer.writerow([row.get("urn")]) | ||
|
||
except IOError as e: | ||
print(e) | ||
logger.exception(f"Unable to save rollback failure report: {e}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Thank you |
||
sys.exit(f"Unable to write reports to {report_dir}") |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -262,7 +262,7 @@ def dataplatform2instance_func( | |
delete_cli._delete_one_urn(src_entity_urn, soft=not hard, run_id=run_id) | ||
migration_report.on_entity_migrated(src_entity_urn, "status") # type: ignore | ||
|
||
print(f"{migration_report}") | ||
click.echo(f"{migration_report}") | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. omg thank you |
||
migrate_containers( | ||
dry_run=dry_run, | ||
env=env, | ||
|
@@ -372,7 +372,7 @@ def migrate_containers( | |
delete_cli._delete_one_urn(src_urn, soft=not hard, run_id=run_id) | ||
migration_report.on_entity_migrated(src_urn, "status") # type: ignore | ||
|
||
print(f"{migration_report}") | ||
click.echo(f"{migration_report}") | ||
|
||
|
||
def get_containers_for_migration(env: str) -> List[Any]: | ||
|
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -24,8 +24,10 @@ | |
ExecutionRequestInputClass, | ||
ExecutionRequestResultClass, | ||
ExecutionRequestSourceClass, | ||
StructuredExecutionReportClass, | ||
_Aspect, | ||
) | ||
from datahub.utilities.logging_manager import get_log_buffer | ||
from datahub.utilities.urns.urn import Urn | ||
|
||
logger = logging.getLogger(__name__) | ||
|
@@ -143,7 +145,6 @@ def __init__(self, sink: Sink, report_recipe: bool, ctx: PipelineContext) -> Non | |
# Emit the dataHubIngestionSourceInfo aspect | ||
self._emit_aspect( | ||
entity_urn=self.ingestion_source_urn, | ||
aspect_name="dataHubIngestionSourceInfo", | ||
aspect_value=source_info_aspect, | ||
) | ||
|
||
|
@@ -154,17 +155,12 @@ def _get_recipe_to_report(self, ctx: PipelineContext) -> str: | |
else: | ||
return json.dumps(redact_raw_config(ctx.pipeline_config._raw_dict)) | ||
|
||
def _emit_aspect( | ||
self, entity_urn: Urn, aspect_name: str, aspect_value: _Aspect | ||
) -> None: | ||
def _emit_aspect(self, entity_urn: Urn, aspect_value: _Aspect) -> None: | ||
self.sink.write_record_async( | ||
RecordEnvelope( | ||
record=MetadataChangeProposalWrapper( | ||
entityType=entity_urn.get_type(), | ||
entityUrn=str(entity_urn), | ||
aspectName=aspect_name, | ||
aspect=aspect_value, | ||
changeType="UPSERT", | ||
), | ||
metadata={}, | ||
), | ||
|
@@ -190,7 +186,6 @@ def on_start(self, ctx: PipelineContext) -> None: | |
# Emit the dataHubExecutionRequestInput aspect | ||
self._emit_aspect( | ||
entity_urn=self.execution_request_input_urn, | ||
aspect_name="dataHubExecutionRequestInput", | ||
aspect_value=execution_input_aspect, | ||
) | ||
|
||
|
@@ -200,18 +195,29 @@ def on_completion( | |
report: Dict[str, Any], | ||
ctx: PipelineContext, | ||
) -> None: | ||
# Prepare a nicely formatted summary | ||
structured_report_str = json.dumps(report, indent=2) | ||
summary = f"~~~~ Ingestion Report ~~~~\n{structured_report_str}\n\n" | ||
summary += "~~~~ Ingestion Logs ~~~~\n" | ||
summary += get_log_buffer().format_lines() | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. ah so here we leverage the in memory buffer. nice |
||
|
||
# Construct the dataHubExecutionRequestResult aspect | ||
structured_report = StructuredExecutionReportClass( | ||
type="CLI_INGEST", | ||
serializedValue=structured_report_str, | ||
contentType="application/json", | ||
) | ||
execution_result_aspect = ExecutionRequestResultClass( | ||
status=status, | ||
startTimeMs=self.start_time_ms, | ||
durationMs=self.get_cur_time_in_ms() - self.start_time_ms, | ||
report=json.dumps(report, indent=2), | ||
report=summary, | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Nice! So in this case, we are now sending the structured report to the correct field. From here, we can continue to add to that report and get it into our result aspect There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. (Sets ourselves up for much richer reports) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. yup exactly - this way the structured report shows up in the correct field for both managed ingestion and cli ingestion note that the structured report field is basically a string right now - we may need to rethink that in the future |
||
structuredReport=structured_report, | ||
) | ||
|
||
# Emit the dataHubExecutionRequestResult aspect | ||
self._emit_aspect( | ||
entity_urn=self.execution_request_input_urn, | ||
aspect_name="dataHubExecutionRequestResult", | ||
aspect_value=execution_result_aspect, | ||
) | ||
self.sink.close() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nice refactor!