Skip to content

Commit

Permalink
Introduce AirflowJobFacet and AirflowStateRunFacet (#39520)
Browse files Browse the repository at this point in the history
Signed-off-by: Kacper Muda <[email protected]>
  • Loading branch information
kacpermuda authored Jun 5, 2024
1 parent c6f85f0 commit c202c07
Show file tree
Hide file tree
Showing 11 changed files with 1,115 additions and 68 deletions.
40 changes: 40 additions & 0 deletions airflow/providers/openlineage/facets/AirflowJobFacet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$defs": {
"AirflowJobFacet": {
"allOf": [
{
"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
},
{
"type": "object",
"properties": {
"taskTree": {
"description": "The hierarchical structure of tasks in the DAG.",
"type": "object",
"additionalProperties": true
},
"taskGroups": {
"description": "Information about all task groups within the DAG.",
"type": "object",
"additionalProperties": true
},
"tasks": {
"description": "Details of all individual tasks within the DAG.",
"type": "object",
"additionalProperties": true
}
},
"required": ["taskTree", "taskGroups", "tasks"]
}
],
"type": "object"
}
},
"type": "object",
"properties": {
"airflow": {
"$ref": "#/$defs/AirflowJobFacet"
}
}
}
254 changes: 254 additions & 0 deletions airflow/providers/openlineage/facets/AirflowRunFacet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,254 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$defs": {
"AirflowRunFacet": {
"allOf": [
{
"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
},
{
"type": "object",
"properties": {
"dag": {
"$ref": "#/$defs/DAG"
},
"dagRun": {
"$ref": "#/$defs/DagRun"
},
"taskInstance": {
"$ref": "#/$defs/TaskInstance"
},
"task": {
"$ref": "#/$defs/Task"
},
"taskUuid": {
"type": "string"
}
},
"required": [
"dag",
"dagRun",
"taskInstance",
"task",
"taskUuid"
]
}
]
},
"Task": {
"type": "object",
"properties": {
"depends_on_past": {
"type": "boolean"
},
"downstream_task_ids": {
"type": "string"
},
"execution_timeout": {
"type": "string"
},
"executor_config": {
"type": "object",
"additionalProperties": true
},
"ignore_first_depends_on_past": {
"type": "boolean"
},
"is_setup": {
"type": "boolean"
},
"is_teardown": {
"type": "boolean"
},
"mapped": {
"type": "boolean"
},
"max_active_tis_per_dag": {
"type": "integer"
},
"max_active_tis_per_dagrun": {
"type": "integer"
},
"max_retry_delay": {
"type": "string"
},
"multiple_outputs": {
"type": "boolean"
},
"operator_class": {
"description": "Module + class name of the operator",
"type": "string"
},
"owner": {
"type": "string"
},
"priority_weight": {
"type": "integer"
},
"queue": {
"type": "string"
},
"retries": {
"type": "integer"
},
"retry_exponential_backoff": {
"type": "boolean"
},
"run_as_user": {
"type": "string"
},
"task_id": {
"type": "string"
},
"trigger_rule": {
"type": "string"
},
"upstream_task_ids": {
"type": "string"
},
"wait_for_downstream": {
"type": "boolean"
},
"wait_for_past_depends_before_skipping": {
"type": "boolean"
},
"weight_rule": {
"type": "string"
},
"task_group": {
"description": "Task group related information",
"type": "object",
"properties": {
"group_id": {
"type": "string"
},
"downstream_group_ids": {
"type": "string"
},
"downstream_task_ids": {
"type": "string"
},
"prefix_group_id": {
"type": "boolean"
},
"tooltip": {
"type": "string"
},
"upstream_group_ids": {
"type": "string"
},
"upstream_task_ids": {
"type": "string"
}
},
"additionalProperties": true,
"required": ["group_id"]
}
},
"additionalProperties": true,
"required": [
"task_id"
]
},
"DAG": {
"type": "object",
"properties": {
"dag_id": {
"type": "string"
},
"description": {
"type": "string"
},
"owner": {
"type": "string"
},
"schedule_interval": {
"type": "string"
},
"start_date": {
"type": "string",
"format": "date-time"
},
"tags": {
"type": "string"
},
"timetable": {
"description": "Describes timetable (successor of schedule_interval)",
"type": "object",
"additionalProperties": true
}
},
"additionalProperties": true,
"required": [
"dag_id",
"start_date"
]
},
"TaskInstance": {
"type": "object",
"properties": {
"duration": {
"type": "number"
},
"map_index": {
"type": "integer"
},
"pool": {
"type": "string"
},
"try_number": {
"type": "integer"
}
},
"additionalProperties": true,
"required": [
"pool",
"try_number"
]
},
"DagRun": {
"type": "object",
"properties": {
"conf": {
"type": "object",
"additionalProperties": true
},
"dag_id": {
"type": "string"
},
"data_interval_start": {
"type": "string",
"format": "date-time"
},
"data_interval_end": {
"type": "string",
"format": "date-time"
},
"external_trigger": {
"type": "boolean"
},
"run_id": {
"type": "string"
},
"run_type": {
"type": "string"
},
"start_date": {
"type": "string",
"format": "date-time"
}
},
"additionalProperties": true,
"required": [
"dag_id",
"run_id"
]
}
},
"type": "object",
"properties": {
"airflow": {
"$ref": "#/$defs/AirflowRunFacet"
}
}
}
34 changes: 34 additions & 0 deletions airflow/providers/openlineage/facets/AirflowStateRunFacet.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
{
"$schema": "https://json-schema.org/draft/2020-12/schema",
"$defs": {
"AirflowStateRunFacet": {
"allOf": [
{
"$ref": "https://openlineage.io/spec/2-0-2/OpenLineage.json#/$defs/RunFacet"
},
{
"type": "object",
"properties": {
"dagRunState": {
"description": "The final status of the entire DagRun",
"type": "string"
},
"tasksState": {
"description": "Mapping of task IDs to their respective states",
"type": "object",
"additionalProperties": true
}
},
"required": ["dagRunState", "tasksState"]
}
],
"type": "object"
}
},
"type": "object",
"properties": {
"airflowState": {
"$ref": "#/$defs/AirflowStateRunFacet"
}
}
}
16 changes: 16 additions & 0 deletions airflow/providers/openlineage/facets/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
Loading

0 comments on commit c202c07

Please sign in to comment.