diff --git a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py index 02f7a50c64d28..b0d0259e3d867 100644 --- a/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py +++ b/providers/openlineage/src/airflow/providers/openlineage/plugins/adapter.py @@ -251,6 +251,7 @@ def complete_task( run_facets = run_facets or {} if task: run_facets = {**task.run_facets, **run_facets} + run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore event = RunEvent( eventType=RunState.COMPLETE, eventTime=end_time, @@ -296,6 +297,7 @@ def fail_task( run_facets = run_facets or {} if task: run_facets = {**task.run_facets, **run_facets} + run_facets = {**run_facets, **get_processing_engine_facet()} # type: ignore if error: stack_trace = None @@ -391,6 +393,7 @@ def dag_success( facets={ **get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state), **get_airflow_debug_facet(), + **get_processing_engine_facet(), **run_facets, }, ), @@ -434,6 +437,7 @@ def dag_failed( ), **get_airflow_state_run_facet(dag_id, run_id, task_ids, dag_run_state), **get_airflow_debug_facet(), + **get_processing_engine_facet(), **run_facets, }, ), diff --git a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_adapter.py b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_adapter.py index 9e19437142b10..3acdacee09f44 100644 --- a/providers/openlineage/tests/provider_tests/openlineage/plugins/test_adapter.py +++ b/providers/openlineage/tests/provider_tests/openlineage/plugins/test_adapter.py @@ -302,7 +302,14 @@ def test_emit_complete_event(mock_stats_incr, mock_stats_timer): RunEvent( eventType=RunState.COMPLETE, eventTime=event_time, - run=Run(runId=run_id, facets={}), + run=Run( + runId=run_id, + facets={ + "processing_engine": processing_engine_run.ProcessingEngineRunFacet( + version=ANY, name="Airflow", openlineageAdapterVersion=ANY + ) + }, + ), job=Job( namespace=namespace(), name="job", @@ -366,6 +373,9 @@ def test_emit_complete_event_with_additional_information(mock_stats_incr, mock_s run=parent_run.Run(runId=parent_run_id), job=parent_run.Job(namespace=namespace(), name="parent_job_name"), ), + "processing_engine": processing_engine_run.ProcessingEngineRunFacet( + version=ANY, name="Airflow", openlineageAdapterVersion=ANY + ), "externalQuery": external_query_run.ExternalQueryRunFacet( externalQueryId="123", source="source" ), @@ -421,7 +431,14 @@ def test_emit_failed_event(mock_stats_incr, mock_stats_timer): RunEvent( eventType=RunState.FAIL, eventTime=event_time, - run=Run(runId=run_id, facets={}), + run=Run( + runId=run_id, + facets={ + "processing_engine": processing_engine_run.ProcessingEngineRunFacet( + version=ANY, name="Airflow", openlineageAdapterVersion=ANY + ) + }, + ), job=Job( namespace=namespace(), name="job", @@ -485,6 +502,9 @@ def test_emit_failed_event_with_additional_information(mock_stats_incr, mock_sta run=parent_run.Run(runId=parent_run_id), job=parent_run.Job(namespace=namespace(), name="parent_job_name"), ), + "processing_engine": processing_engine_run.ProcessingEngineRunFacet( + version=ANY, name="Airflow", openlineageAdapterVersion=ANY + ), "errorMessage": error_message_run.ErrorMessageRunFacet( message="Error message", programmingLanguage="python", stackTrace=None ), @@ -731,6 +751,9 @@ def test_emit_dag_complete_event( task_2.task_id: TaskInstanceState.FAILED, }, ), + "processing_engine": processing_engine_run.ProcessingEngineRunFacet( + version=ANY, name="Airflow", openlineageAdapterVersion=ANY + ), "debug": AirflowDebugRunFacet(packages=ANY), "airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run), }, @@ -827,6 +850,9 @@ def test_emit_dag_failed_event( task_2.task_id: TaskInstanceState.FAILED, }, ), + "processing_engine": processing_engine_run.ProcessingEngineRunFacet( + version=ANY, name="Airflow", openlineageAdapterVersion=ANY + ), "debug": AirflowDebugRunFacet(packages=ANY), "airflowDagRun": AirflowDagRunFacet(dag={"description": "dag desc"}, dagRun=dag_run), },