From 5f342365e4051f675f24d27da567ff19a6e212d3 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Tue, 18 Jun 2024 13:10:28 +0200 Subject: [PATCH 1/5] fix(ingestion/airflow-plugin): emit browsePathV2 --- .../datahub_listener.py | 12 + .../integration/goldens/v2_basic_iolets.json | 21 +- .../v2_basic_iolets_no_dag_listener.json | 15 + .../integration/goldens/v2_simple_dag.json | 27 +- .../v2_simple_dag_no_dag_listener.json | 15 + .../goldens/v2_sqlite_operator.json | 395 ++---------------- .../v2_sqlite_operator_no_dag_listener.json | 154 ++++--- 7 files changed, 181 insertions(+), 458 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py index 53d735f6c6ebb7..53f1dde48cb3fd 100644 --- a/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py +++ b/metadata-ingestion-modules/airflow-plugin/src/datahub_airflow_plugin/datahub_listener.py @@ -16,6 +16,8 @@ from datahub.emitter.rest_emitter import DatahubRestEmitter from datahub.ingestion.graph.client import DataHubGraph from datahub.metadata.schema_classes import ( + BrowsePathEntryClass, + BrowsePathsV2Class, DataFlowKeyClass, DataJobKeyClass, FineGrainedLineageClass, @@ -544,6 +546,16 @@ def on_dag_start(self, dag_run: "DagRun") -> None: self.emitter.emit(event) + browse_path_v2_event: MetadataChangeProposalWrapper = ( + MetadataChangeProposalWrapper( + entityUrn=str(dataflow.urn), + aspect=BrowsePathsV2Class( + path=[BrowsePathEntryClass(str(dag.dag_id))], + ), + ) + ) + self.emitter.emit(browse_path_v2_event) + if dag.dag_id == _DATAHUB_CLEANUP_DAG: assert self.graph diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json index 128881b1299e1d..43168efc8fb2e4 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json @@ -57,6 +57,21 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "basic_iolets" + } + ] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)", @@ -242,16 +257,16 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", + "log_url": "http://airflow.example.com/dags/basic_iolets/grid?dag_run_id=manual_run_test&task_id=run_data_task&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "basic_iolets", "task_id": "run_data_task" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/basic_iolets/grid?dag_run_id=manual_run_test&task_id=run_data_task&map_index=-1&tab=logs", "name": "basic_iolets_run_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717179624988, + "time": 1718708732325, "actor": "urn:li:corpuser:datahub" } } diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json index 2645fb82ca023f..ef55850894bf61 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets_no_dag_listener.json @@ -57,6 +57,21 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,basic_iolets,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "basic_iolets" + } + ] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,basic_iolets,prod),run_data_task)", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json index 67b6b9500b6c59..9c33d9c043ac95 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json @@ -58,6 +58,21 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "simple_dag" + } + ] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", @@ -201,16 +216,16 @@ "state": "running", "operator": "BashOperator", "priority_weight": "2", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", + "log_url": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=task_1&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "simple_dag", "task_id": "task_1" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=task_1&map_index=-1&tab=logs", "name": "simple_dag_task_1_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717179559032, + "time": 1718708686979, "actor": "urn:li:corpuser:datahub" } } @@ -572,16 +587,16 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", + "log_url": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=run_another_data_task&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "simple_dag", "task_id": "run_another_data_task" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=run_another_data_task&map_index=-1&tab=logs", "name": "simple_dag_run_another_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717179564453, + "time": 1718708691148, "actor": "urn:li:corpuser:datahub" } } diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json index 7b6df6e157f1df..434f4b6eb7a09c 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag_no_dag_listener.json @@ -58,6 +58,21 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,simple_dag,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "simple_dag" + } + ] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,simple_dag,prod),task_1)", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index dc6eb20773b998..4451bc205f2690 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -57,6 +57,21 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "sqlite_operator" + } + ] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -207,16 +222,16 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "5", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", + "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=create_cost_table&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "create_cost_table" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=create_cost_table&map_index=-1&tab=logs", "name": "sqlite_operator_create_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717179743558, + "time": 1718708812962, "actor": "urn:li:corpuser:datahub" } } @@ -599,16 +614,16 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "4", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", + "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=populate_cost_table&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "populate_cost_table" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=populate_cost_table&map_index=-1&tab=logs", "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717179748679, + "time": 1718708817141, "actor": "urn:li:corpuser:datahub" } } @@ -992,16 +1007,16 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "3", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", + "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "transform_cost_table" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", "name": "sqlite_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717179757397, + "time": 1718708824107, "actor": "urn:li:corpuser:datahub" } } @@ -1350,56 +1365,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInfo", - "aspect": { - "json": { - "customProperties": { - "depends_on_past": "False", - "email": "None", - "label": "'cleanup_costs'", - "execution_timeout": "None", - "sla": "None", - "sql": "'\\n DROP TABLE costs\\n '", - "task_id": "'cleanup_costs'", - "trigger_rule": "", - "wait_for_downstream": "False", - "downstream_task_ids": "[]", - "inlets": "[]", - "outlets": "[]", - "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE costs\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" - }, - "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_costs", - "name": "cleanup_costs", - "type": { - "string": "COMMAND" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" - ], - "fineGrainedLineages": [] - } - } -}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", @@ -1413,100 +1378,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "changeType": "UPSERT", - "aspectName": "ownership", - "aspect": { - "json": { - "owners": [ - { - "owner": "urn:li:corpuser:airflow", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:airflow" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "changeType": "UPSERT", - "aspectName": "globalTags", - "aspect": { - "json": { - "tags": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceProperties", - "aspect": { - "json": { - "customProperties": { - "run_id": "manual_run_test", - "duration": "", - "start_date": "", - "end_date": "", - "execution_date": "2023-09-27 21:34:38+00:00", - "try_number": "0", - "max_tries": "0", - "external_executor_id": "None", - "state": "running", - "operator": "SqliteOperator", - "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_costs&dag_id=sqlite_operator&map_index=-1", - "orchestrator": "airflow", - "dag_id": "sqlite_operator", - "task_id": "cleanup_costs" - }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_costs&dag_id=sqlite_operator&map_index=-1", - "name": "sqlite_operator_cleanup_costs_manual_run_test", - "type": "BATCH_AD_HOC", - "created": { - "time": 1717179766820, - "actor": "urn:li:corpuser:datahub" - } - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRelationships", - "aspect": { - "json": { - "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "upstreamInstances": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", - "aspect": { - "json": { - "inputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ] - } - } -}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", @@ -1520,23 +1391,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1717179766820, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "status": "STARTED", - "attempt": 1 - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", @@ -1635,76 +1489,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1717179767882, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "status": "COMPLETE", - "result": { - "type": "SUCCESS", - "nativeResultType": "airflow" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInfo", - "aspect": { - "json": { - "customProperties": { - "depends_on_past": "False", - "email": "None", - "label": "'cleanup_processed_costs'", - "execution_timeout": "None", - "sla": "None", - "sql": "'\\n DROP TABLE processed_costs\\n '", - "task_id": "'cleanup_processed_costs'", - "trigger_rule": "", - "wait_for_downstream": "False", - "downstream_task_ids": "[]", - "inlets": "[]", - "outlets": "[]", - "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE processed_costs\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" - }, - "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_processed_costs", - "name": "cleanup_processed_costs", - "type": { - "string": "COMMAND" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" - ], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" - ], - "fineGrainedLineages": [] - } - } -}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", @@ -1718,100 +1502,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "changeType": "UPSERT", - "aspectName": "ownership", - "aspect": { - "json": { - "owners": [ - { - "owner": "urn:li:corpuser:airflow", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:airflow" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "changeType": "UPSERT", - "aspectName": "globalTags", - "aspect": { - "json": { - "tags": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceProperties", - "aspect": { - "json": { - "customProperties": { - "run_id": "manual_run_test", - "duration": "", - "start_date": "", - "end_date": "", - "execution_date": "2023-09-27 21:34:38+00:00", - "try_number": "0", - "max_tries": "0", - "external_executor_id": "None", - "state": "running", - "operator": "SqliteOperator", - "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_processed_costs&dag_id=sqlite_operator&map_index=-1", - "orchestrator": "airflow", - "dag_id": "sqlite_operator", - "task_id": "cleanup_processed_costs" - }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=cleanup_processed_costs&dag_id=sqlite_operator&map_index=-1", - "name": "sqlite_operator_cleanup_processed_costs_manual_run_test", - "type": "BATCH_AD_HOC", - "created": { - "time": 1717179773312, - "actor": "urn:li:corpuser:datahub" - } - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRelationships", - "aspect": { - "json": { - "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "upstreamInstances": [] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", - "aspect": { - "json": { - "inputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" - ] - } - } -}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", @@ -1825,23 +1515,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1717179773312, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "status": "STARTED", - "attempt": 1 - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", @@ -1939,25 +1612,5 @@ "tags": [] } } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceRunEvent", - "aspect": { - "json": { - "timestampMillis": 1717179774628, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "status": "COMPLETE", - "result": { - "type": "SUCCESS", - "nativeResultType": "airflow" - } - } - } } ] \ No newline at end of file diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json index adedfeca1fd370..f3c6df9b24da08 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json @@ -57,6 +57,21 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "sqlite_operator" + } + ] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -176,6 +191,19 @@ } } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "datasetKey", + "aspect": { + "json": { + "platform": "urn:li:dataPlatform:sqlite", + "name": "public.costs", + "origin": "PROD" + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -296,17 +324,13 @@ "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", "changeType": "UPSERT", - "aspectName": "operation", + "aspectName": "dataJobInputOutput", "aspect": { "json": { - "timestampMillis": 1718701132691, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "actor": "urn:li:corpuser:airflow", - "operationType": "CREATE", - "lastUpdatedTimestamp": 1718701132691 + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [], + "fineGrainedLineages": [] } } }, @@ -598,15 +622,18 @@ } }, { - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:04e1badac1eacd1c41123d07f579fa92", + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),populate_cost_table)", "changeType": "UPSERT", - "aspectName": "dataProcessInstanceOutput", + "aspectName": "dataJobInputOutput", "aspect": { "json": { - "outputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ] + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)" + ], + "fineGrainedLineages": [] } } }, @@ -1118,9 +1145,7 @@ "aspectName": "dataJobInputOutput", "aspect": { "json": { - "inputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ], + "inputDatasets": [], "outputDatasets": [], "inputDatajobs": [ "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" @@ -1129,32 +1154,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", - "aspect": { - "json": { - "inputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ] - } - } -}, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceOutput", - "aspect": { - "json": { - "outputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" - ] - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", @@ -1224,24 +1223,6 @@ } } }, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", - "changeType": "UPSERT", - "aspectName": "operation", - "aspect": { - "json": { - "timestampMillis": 1718701139266, - "partitionSpec": { - "type": "FULL_TABLE", - "partition": "FULL_TABLE_SNAPSHOT" - }, - "actor": "urn:li:corpuser:airflow", - "operationType": "CREATE", - "lastUpdatedTimestamp": 1718701139266 - } - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", @@ -1300,6 +1281,22 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" + ], + "fineGrainedLineages": [] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", @@ -1449,9 +1446,7 @@ "aspectName": "dataJobInputOutput", "aspect": { "json": { - "inputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" - ], + "inputDatasets": [], "outputDatasets": [], "inputDatajobs": [ "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" @@ -1587,6 +1582,22 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" + ], + "fineGrainedLineages": [] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", @@ -1622,19 +1633,6 @@ } } }, -{ - "entityType": "dataProcessInstance", - "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", - "changeType": "UPSERT", - "aspectName": "dataProcessInstanceInput", - "aspect": { - "json": { - "inputs": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ] - } - } -}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", From 7f4d6a108800ba9bd2bc52fdef97f90a7e6a5412 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Tue, 18 Jun 2024 14:34:18 +0200 Subject: [PATCH 2/5] fix(ingestion/airflow-plugin): updated golden files --- .../integration/goldens/v2_basic_iolets.json | 6 +- .../integration/goldens/v2_simple_dag.json | 12 +- .../goldens/v2_snowflake_operator.json | 15 ++ .../goldens/v2_sqlite_operator.json | 188 +----------------- 4 files changed, 33 insertions(+), 188 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json index 43168efc8fb2e4..298ee4c9dddc49 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json @@ -257,16 +257,16 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/dags/basic_iolets/grid?dag_run_id=manual_run_test&task_id=run_data_task&map_index=-1&tab=logs", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", "orchestrator": "airflow", "dag_id": "basic_iolets", "task_id": "run_data_task" }, - "externalUrl": "http://airflow.example.com/dags/basic_iolets/grid?dag_run_id=manual_run_test&task_id=run_data_task&map_index=-1&tab=logs", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", "name": "basic_iolets_run_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718708732325, + "time": 1718712738744, "actor": "urn:li:corpuser:datahub" } } diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json index 9c33d9c043ac95..5e20063f0e6f7b 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json @@ -216,16 +216,16 @@ "state": "running", "operator": "BashOperator", "priority_weight": "2", - "log_url": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=task_1&map_index=-1&tab=logs", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", "orchestrator": "airflow", "dag_id": "simple_dag", "task_id": "task_1" }, - "externalUrl": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=task_1&map_index=-1&tab=logs", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", "name": "simple_dag_task_1_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718708686979, + "time": 1718712697855, "actor": "urn:li:corpuser:datahub" } } @@ -587,16 +587,16 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=run_another_data_task&map_index=-1&tab=logs", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", "orchestrator": "airflow", "dag_id": "simple_dag", "task_id": "run_another_data_task" }, - "externalUrl": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=run_another_data_task&map_index=-1&tab=logs", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", "name": "simple_dag_run_another_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718708691148, + "time": 1718712701742, "actor": "urn:li:corpuser:datahub" } } diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json index 41afe54d9a022c..e60fcd8db0affa 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json @@ -57,6 +57,21 @@ } } }, +{ + "entityType": "dataFlow", + "entityUrn": "urn:li:dataFlow:(airflow,snowflake_operator,prod)", + "changeType": "UPSERT", + "aspectName": "browsePathsV2", + "aspect": { + "json": { + "path": [ + { + "id": "snowflake_operator" + } + ] + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,snowflake_operator,prod),transform_cost_table)", diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index 4451bc205f2690..8c4d64d30db728 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -222,16 +222,16 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "5", - "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=create_cost_table&map_index=-1&tab=logs", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "create_cost_table" }, - "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=create_cost_table&map_index=-1&tab=logs", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_create_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718708812962, + "time": 1718712819047, "actor": "urn:li:corpuser:datahub" } } @@ -614,16 +614,16 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "4", - "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=populate_cost_table&map_index=-1&tab=logs", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "populate_cost_table" }, - "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=populate_cost_table&map_index=-1&tab=logs", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718708817141, + "time": 1718712822925, "actor": "urn:li:corpuser:datahub" } } @@ -1007,16 +1007,16 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "3", - "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "transform_cost_table" }, - "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718708824107, + "time": 1718712827615, "actor": "urn:li:corpuser:datahub" } } @@ -1391,56 +1391,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInfo", - "aspect": { - "json": { - "customProperties": { - "depends_on_past": "False", - "email": "None", - "label": "'cleanup_costs'", - "execution_timeout": "None", - "sla": "None", - "sql": "'\\n DROP TABLE costs\\n '", - "task_id": "'cleanup_costs'", - "trigger_rule": "", - "wait_for_downstream": "False", - "downstream_task_ids": "[]", - "inlets": "[]", - "outlets": "[]", - "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE costs\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" - }, - "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_costs", - "name": "cleanup_costs", - "type": { - "string": "COMMAND" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" - ], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" - ], - "fineGrainedLineages": [] - } - } -}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", @@ -1454,41 +1404,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "changeType": "UPSERT", - "aspectName": "ownership", - "aspect": { - "json": { - "owners": [ - { - "owner": "urn:li:corpuser:airflow", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:airflow" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "changeType": "UPSERT", - "aspectName": "globalTags", - "aspect": { - "json": { - "tags": [] - } - } -}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", @@ -1515,56 +1430,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInfo", - "aspect": { - "json": { - "customProperties": { - "depends_on_past": "False", - "email": "None", - "label": "'cleanup_processed_costs'", - "execution_timeout": "None", - "sla": "None", - "sql": "'\\n DROP TABLE processed_costs\\n '", - "task_id": "'cleanup_processed_costs'", - "trigger_rule": "", - "wait_for_downstream": "False", - "downstream_task_ids": "[]", - "inlets": "[]", - "outlets": "[]", - "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", - "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE processed_costs\\n \"}", - "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" - }, - "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_processed_costs", - "name": "cleanup_processed_costs", - "type": { - "string": "COMMAND" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [ - "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" - ], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" - ], - "fineGrainedLineages": [] - } - } -}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", @@ -1577,40 +1442,5 @@ "origin": "PROD" } } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "changeType": "UPSERT", - "aspectName": "ownership", - "aspect": { - "json": { - "owners": [ - { - "owner": "urn:li:corpuser:airflow", - "type": "DEVELOPER", - "source": { - "type": "SERVICE" - } - } - ], - "ownerTypes": {}, - "lastModified": { - "time": 0, - "actor": "urn:li:corpuser:airflow" - } - } - } -}, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "changeType": "UPSERT", - "aspectName": "globalTags", - "aspect": { - "json": { - "tags": [] - } - } } ] \ No newline at end of file From f5fa6b97bb7babf1b3dfdabbcd4b21a43550f632 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Tue, 18 Jun 2024 19:20:29 +0200 Subject: [PATCH 3/5] fix(ingestion/airflow-plugin): emit browsePathV2 --- .../integration/goldens/v2_basic_iolets.json | 6 +++--- .../integration/goldens/v2_simple_dag.json | 12 ++++++------ .../goldens/v2_sqlite_operator.json | 18 +++++++++--------- 3 files changed, 18 insertions(+), 18 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json index 298ee4c9dddc49..2c050b77206a74 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json @@ -257,16 +257,16 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", + "log_url": "http://airflow.example.com/dags/basic_iolets/grid?dag_run_id=manual_run_test&task_id=run_data_task&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "basic_iolets", "task_id": "run_data_task" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_data_task&dag_id=basic_iolets&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/basic_iolets/grid?dag_run_id=manual_run_test&task_id=run_data_task&map_index=-1&tab=logs", "name": "basic_iolets_run_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718712738744, + "time": 1718715802611, "actor": "urn:li:corpuser:datahub" } } diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json index 5e20063f0e6f7b..ccce8119db9c8d 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json @@ -216,16 +216,16 @@ "state": "running", "operator": "BashOperator", "priority_weight": "2", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", + "log_url": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=task_1&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "simple_dag", "task_id": "task_1" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=task_1&dag_id=simple_dag&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=task_1&map_index=-1&tab=logs", "name": "simple_dag_task_1_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718712697855, + "time": 1718715756430, "actor": "urn:li:corpuser:datahub" } } @@ -587,16 +587,16 @@ "state": "running", "operator": "BashOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", + "log_url": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=run_another_data_task&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "simple_dag", "task_id": "run_another_data_task" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=run_another_data_task&dag_id=simple_dag&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/simple_dag/grid?dag_run_id=manual_run_test&task_id=run_another_data_task&map_index=-1&tab=logs", "name": "simple_dag_run_another_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718712701742, + "time": 1718715760616, "actor": "urn:li:corpuser:datahub" } } diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index 8c4d64d30db728..8937c4667a9abd 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -222,16 +222,16 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "5", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", + "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=create_cost_table&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "create_cost_table" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=create_cost_table&dag_id=sqlite_operator&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=create_cost_table&map_index=-1&tab=logs", "name": "sqlite_operator_create_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718712819047, + "time": 1718715882139, "actor": "urn:li:corpuser:datahub" } } @@ -614,16 +614,16 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "4", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", + "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=populate_cost_table&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "populate_cost_table" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=populate_cost_table&map_index=-1&tab=logs", "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718712822925, + "time": 1718715886259, "actor": "urn:li:corpuser:datahub" } } @@ -1007,16 +1007,16 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "3", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", + "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "transform_cost_table" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=sqlite_operator&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", "name": "sqlite_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718712827615, + "time": 1718715894274, "actor": "urn:li:corpuser:datahub" } } From 1cb8918fe96ab47df622c27e613c1cd41751a5fb Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Wed, 19 Jun 2024 08:44:15 +0200 Subject: [PATCH 4/5] fix(ingestion/airflow-plugin): emit browsePathV2 --- .../integration/goldens/v2_basic_iolets.json | 2 +- .../integration/goldens/v2_simple_dag.json | 4 +- .../goldens/v2_snowflake_operator.json | 6 +- .../goldens/v2_sqlite_operator.json | 366 +++++++++++++++++- .../tests/integration/test_plugin.py | 2 + 5 files changed, 372 insertions(+), 8 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json index 2c050b77206a74..1ff53a45abf399 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_basic_iolets.json @@ -266,7 +266,7 @@ "name": "basic_iolets_run_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718715802611, + "time": 1718733614956, "actor": "urn:li:corpuser:datahub" } } diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json index ccce8119db9c8d..48f8872d3831fa 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_simple_dag.json @@ -225,7 +225,7 @@ "name": "simple_dag_task_1_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718715756430, + "time": 1718733547259, "actor": "urn:li:corpuser:datahub" } } @@ -596,7 +596,7 @@ "name": "simple_dag_run_another_data_task_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1718715760616, + "time": 1718733551439, "actor": "urn:li:corpuser:datahub" } } diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json index e60fcd8db0affa..89464448032b12 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_snowflake_operator.json @@ -257,16 +257,16 @@ "state": "running", "operator": "SnowflakeOperator", "priority_weight": "1", - "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=snowflake_operator&map_index=-1", + "log_url": "http://airflow.example.com/dags/snowflake_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", "orchestrator": "airflow", "dag_id": "snowflake_operator", "task_id": "transform_cost_table" }, - "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=transform_cost_table&dag_id=snowflake_operator&map_index=-1", + "externalUrl": "http://airflow.example.com/dags/snowflake_operator/grid?dag_run_id=manual_run_test&task_id=transform_cost_table&map_index=-1&tab=logs", "name": "snowflake_operator_transform_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { - "time": 1717179684292, + "time": 1718733682840, "actor": "urn:li:corpuser:datahub" } } diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json index 8937c4667a9abd..b4b97bbb74fae8 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator.json @@ -614,12 +614,12 @@ "state": "running", "operator": "SqliteOperator", "priority_weight": "4", - "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=populate_cost_table&map_index=-1&tab=logs", + "log_url": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", "orchestrator": "airflow", "dag_id": "sqlite_operator", "task_id": "populate_cost_table" }, - "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=populate_cost_table&map_index=-1&tab=logs", + "externalUrl": "http://airflow.example.com/log?execution_date=2023-09-27T21%3A34%3A38%2B00%3A00&task_id=populate_cost_table&dag_id=sqlite_operator&map_index=-1", "name": "sqlite_operator_populate_cost_table_manual_run_test", "type": "BATCH_AD_HOC", "created": { @@ -1365,6 +1365,38 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'cleanup_costs'", + "execution_timeout": "None", + "sla": "None", + "sql": "'\\n DROP TABLE costs\\n '", + "task_id": "'cleanup_costs'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE costs\\n \"}", + "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_costs", + "name": "cleanup_costs", + "type": { + "string": "COMMAND" + } + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", @@ -1378,6 +1410,59 @@ } } }, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" + ], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" + ], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", @@ -1430,6 +1515,52 @@ } } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "run_id": "manual_run_test", + "duration": "", + "start_date": "", + "end_date": "", + "execution_date": "2023-09-27 21:34:38+00:00", + "try_number": "0", + "max_tries": "0", + "external_executor_id": "None", + "state": "running", + "operator": "SqliteOperator", + "priority_weight": "1", + "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=cleanup_costs&map_index=-1&tab=logs", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "cleanup_costs" + }, + "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=cleanup_costs&map_index=-1&tab=logs", + "name": "sqlite_operator_cleanup_costs_manual_run_test", + "type": "BATCH_AD_HOC", + "created": { + "time": 1718733767964, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", + "upstreamInstances": [] + } + } +}, { "entityType": "dataset", "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", @@ -1442,5 +1573,236 @@ "origin": "PROD" } } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1718733767964, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "STARTED", + "attempt": 1 + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1718733768638, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", + "changeType": "UPSERT", + "aspectName": "dataJobInputOutput", + "aspect": { + "json": { + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" + ], + "outputDatasets": [], + "inputDatajobs": [ + "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" + ], + "fineGrainedLineages": [] + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", + "changeType": "UPSERT", + "aspectName": "dataJobInfo", + "aspect": { + "json": { + "customProperties": { + "depends_on_past": "False", + "email": "None", + "label": "'cleanup_processed_costs'", + "execution_timeout": "None", + "sla": "None", + "sql": "'\\n DROP TABLE processed_costs\\n '", + "task_id": "'cleanup_processed_costs'", + "trigger_rule": "", + "wait_for_downstream": "False", + "downstream_task_ids": "[]", + "inlets": "[]", + "outlets": "[]", + "datahub_sql_parser_error": "UnsupportedStatementTypeError: Can only generate column-level lineage for select-like inner statements, not (outer statement type: )", + "openlineage_job_facet_sql": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/SqlJobFacet\", \"query\": \"\\n DROP TABLE processed_costs\\n \"}", + "openlineage_run_facet_extractionError": "{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/ExtractionErrorRunFacet\", \"errors\": [{\"_producer\": \"https://github.com/OpenLineage/OpenLineage/tree/1.12.0/integration/airflow\", \"_schemaURL\": \"https://raw.githubusercontent.com/OpenLineage/OpenLineage/main/spec/OpenLineage.json#/definitions/BaseFacet\", \"errorMessage\": \"Can only generate column-level lineage for select-like inner statements, not (outer statement type: )\", \"task\": \"datahub_sql_parser\"}], \"failedTasks\": 1, \"totalTasks\": 1}" + }, + "externalUrl": "http://airflow.example.com/taskinstance/list/?flt1_dag_id_equals=sqlite_operator&_flt_3_task_id=cleanup_processed_costs", + "name": "cleanup_processed_costs", + "type": { + "string": "COMMAND" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", + "changeType": "UPSERT", + "aspectName": "ownership", + "aspect": { + "json": { + "owners": [ + { + "owner": "urn:li:corpuser:airflow", + "type": "DEVELOPER", + "source": { + "type": "SERVICE" + } + } + ], + "ownerTypes": {}, + "lastModified": { + "time": 0, + "actor": "urn:li:corpuser:airflow" + } + } + } +}, +{ + "entityType": "dataJob", + "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", + "changeType": "UPSERT", + "aspectName": "globalTags", + "aspect": { + "json": { + "tags": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceProperties", + "aspect": { + "json": { + "customProperties": { + "run_id": "manual_run_test", + "duration": "", + "start_date": "", + "end_date": "", + "execution_date": "2023-09-27 21:34:38+00:00", + "try_number": "0", + "max_tries": "0", + "external_executor_id": "None", + "state": "running", + "operator": "SqliteOperator", + "priority_weight": "1", + "log_url": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=cleanup_processed_costs&map_index=-1&tab=logs", + "orchestrator": "airflow", + "dag_id": "sqlite_operator", + "task_id": "cleanup_processed_costs" + }, + "externalUrl": "http://airflow.example.com/dags/sqlite_operator/grid?dag_run_id=manual_run_test&task_id=cleanup_processed_costs&map_index=-1&tab=logs", + "name": "sqlite_operator_cleanup_processed_costs_manual_run_test", + "type": "BATCH_AD_HOC", + "created": { + "time": 1718733773354, + "actor": "urn:li:corpuser:datahub" + } + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRelationships", + "aspect": { + "json": { + "parentTemplate": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", + "upstreamInstances": [] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1718733773354, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "STARTED", + "attempt": 1 + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:bab908abccf3cd6607b50fdaf3003372", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceRunEvent", + "aspect": { + "json": { + "timestampMillis": 1718733774147, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "status": "COMPLETE", + "result": { + "type": "SUCCESS", + "nativeResultType": "airflow" + } + } + } } ] \ No newline at end of file diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py index 70581fc49ba900..9ea822edeef81f 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/test_plugin.py @@ -375,6 +375,8 @@ def test_airflow_plugin( # TODO: If we switched to Git urls, maybe we could get this to work consistently. r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['datahub_sql_parser_error'\]", r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['openlineage_.*'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['customProperties'\]\['log_url'\]", + r"root\[\d+\]\['aspect'\]\['json'\]\['externalUrl'\]", ], ) From 01e28dedb19e64887cf39881b426c2c6a0df8313 Mon Sep 17 00:00:00 2001 From: Dushyant Bhalgami Date: Wed, 19 Jun 2024 09:20:03 +0200 Subject: [PATCH 5/5] fix(ingestion/airflow-plugin): updated golden files --- .../v2_sqlite_operator_no_dag_listener.json | 158 +++++++++++------- 1 file changed, 94 insertions(+), 64 deletions(-) diff --git a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json index f3c6df9b24da08..d09d4f76e3f02d 100644 --- a/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json +++ b/metadata-ingestion-modules/airflow-plugin/tests/integration/goldens/v2_sqlite_operator_no_dag_listener.json @@ -291,6 +291,24 @@ } } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1718780495946, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:airflow", + "operationType": "CREATE", + "lastUpdatedTimestamp": 1718780495946 + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -320,20 +338,6 @@ } } }, -{ - "entityType": "dataset", - "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [], - "outputDatasets": [], - "inputDatajobs": [], - "fineGrainedLineages": [] - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)", @@ -621,22 +625,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),populate_cost_table)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),create_cost_table)" - ], - "fineGrainedLineages": [] - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),populate_cost_table)", @@ -692,6 +680,19 @@ } } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:04e1badac1eacd1c41123d07f579fa92", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" + ] + } + } +}, { "entityType": "dataFlow", "entityUrn": "urn:li:dataFlow:(airflow,sqlite_operator,prod)", @@ -1145,7 +1146,9 @@ "aspectName": "dataJobInputOutput", "aspect": { "json": { - "inputDatasets": [], + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" + ], "outputDatasets": [], "inputDatajobs": [ "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" @@ -1189,6 +1192,32 @@ } } }, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" + ] + } + } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:64e5ff8f552e857b607832731e09808b", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceOutput", + "aspect": { + "json": { + "outputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" + ] + } + } +}, { "entityType": "dataProcessInstance", "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", @@ -1252,6 +1281,24 @@ } } }, +{ + "entityType": "dataset", + "entityUrn": "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)", + "changeType": "UPSERT", + "aspectName": "operation", + "aspect": { + "json": { + "timestampMillis": 1718780501750, + "partitionSpec": { + "type": "FULL_TABLE", + "partition": "FULL_TABLE_SNAPSHOT" + }, + "actor": "urn:li:corpuser:airflow", + "operationType": "CREATE", + "lastUpdatedTimestamp": 1718780501750 + } + } +}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", @@ -1281,22 +1328,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" - ], - "fineGrainedLineages": [] - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_costs)", @@ -1446,7 +1477,9 @@ "aspectName": "dataJobInputOutput", "aspect": { "json": { - "inputDatasets": [], + "inputDatasets": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.processed_costs,PROD)" + ], "outputDatasets": [], "inputDatajobs": [ "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" @@ -1582,22 +1615,6 @@ } } }, -{ - "entityType": "dataJob", - "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", - "changeType": "UPSERT", - "aspectName": "dataJobInputOutput", - "aspect": { - "json": { - "inputDatasets": [], - "outputDatasets": [], - "inputDatajobs": [ - "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),transform_cost_table)" - ], - "fineGrainedLineages": [] - } - } -}, { "entityType": "dataJob", "entityUrn": "urn:li:dataJob:(urn:li:dataFlow:(airflow,sqlite_operator,prod),cleanup_processed_costs)", @@ -1665,5 +1682,18 @@ ] } } +}, +{ + "entityType": "dataProcessInstance", + "entityUrn": "urn:li:dataProcessInstance:07285de22276959612189d51336cc21a", + "changeType": "UPSERT", + "aspectName": "dataProcessInstanceInput", + "aspect": { + "json": { + "inputs": [ + "urn:li:dataset:(urn:li:dataPlatform:sqlite,public.costs,PROD)" + ] + } + } } ] \ No newline at end of file