Skip to content

Commit

Permalink
feat: Add ProcessingEngineRunFacet to all OL events (#46283)
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Feb 3, 2025
1 parent ef66a9f commit a252a98
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,7 @@ def complete_task(
run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}
run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore
event = RunEvent(
eventType=RunState.COMPLETE,
eventTime=end_time,
Expand Down Expand Up @@ -296,6 +297,7 @@ def fail_task(
run_facets = run_facets or {}
if task:
run_facets = {**task.run_facets, **run_facets}
run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore

if error:
stack_trace = None
Expand Down Expand Up @@ -391,6 +393,7 @@ def dag_success(
facets={
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
**get_processing_engine_facet(),
**run_facets,
},
),
Expand Down Expand Up @@ -434,6 +437,7 @@ def dag_failed(
),
**get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state),
**get_airflow_debug_facet(),
**get_processing_engine_facet(),
**run_facets,
},
),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,14 @@ def test_emit_complete_event(mock_stats_incr, mock_stats_timer):
RunEvent(
eventType=RunState.COMPLETE,
eventTime=event_time,
run=Run(runId=run_id, facets={}),
run=Run(
runId=run_id,
facets={
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
)
},
),
job=Job(
namespace=namespace(),
name="job",
Expand Down Expand Up @@ -366,6 +373,9 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s
run=parent_run.Run(runId=parent_run_id),
job=parent_run.Job(namespace=namespace(), name="parent_job_name"),
),
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
),
"externalQuery": external_query_run.ExternalQueryRunFacet(
externalQueryId="123", source="source"
),
Expand Down Expand Up @@ -421,7 +431,14 @@ def test_emit_failed_event(mock_stats_incr, mock_stats_timer):
RunEvent(
eventType=RunState.FAIL,
eventTime=event_time,
run=Run(runId=run_id, facets={}),
run=Run(
runId=run_id,
facets={
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
)
},
),
job=Job(
namespace=namespace(),
name="job",
Expand Down Expand Up @@ -485,6 +502,9 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta
run=parent_run.Run(runId=parent_run_id),
job=parent_run.Job(namespace=namespace(), name="parent_job_name"),
),
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
),
"errorMessage": error_message_run.ErrorMessageRunFacet(
message="Error message", programmingLanguage="python", stackTrace=None
),
Expand Down Expand Up @@ -731,6 +751,9 @@ def test_emit_dag_complete_event(
task_2.task_id: TaskInstanceState.FAILED,
},
),
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
),
"debug": AirflowDebugRunFacet(packages=ANY),
"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run),
},
Expand Down Expand Up @@ -827,6 +850,9 @@ def test_emit_dag_failed_event(
task_2.task_id: TaskInstanceState.FAILED,
},
),
"processing_engine": processing_engine_run.ProcessingEngineRunFacet(
version=ANY, name="Airflow", openlineageAdapterVersion=ANY
),
"debug": AirflowDebugRunFacet(packages=ANY),
"airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run),
},
Expand Down

0 comments on commit a252a98

Please sign in to comment.