From ee962ef99a7faf3a4a0fe6de5d84bcbdd8b020e9 Mon Sep 17 00:00:00 2001 From: abhishekbhakat Date: Wed, 27 Nov 2024 08:10:23 +0000 Subject: [PATCH] deprecated subdagoperator --- .../resources/test/fixtures/airflow/AIR304.py | 30 ++++++++ .../src/checkers/ast/analyze/expression.rs | 3 + crates/ruff_linter/src/codes.rs | 1 + crates/ruff_linter/src/rules/airflow/mod.rs | 1 + .../src/rules/airflow/rules/mod.rs | 2 + .../rules/airflow/rules/subdag_operator.rs | 70 +++++++++++++++++++ ...les__airflow__tests__AIR304_AIR304.py.snap | 22 ++++++ ruff.schema.json | 1 + 8 files changed, 130 insertions(+) create mode 100644 crates/ruff_linter/resources/test/fixtures/airflow/AIR304.py create mode 100644 crates/ruff_linter/src/rules/airflow/rules/subdag_operator.rs create mode 100644 crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR304_AIR304.py.snap diff --git a/crates/ruff_linter/resources/test/fixtures/airflow/AIR304.py b/crates/ruff_linter/resources/test/fixtures/airflow/AIR304.py new file mode 100644 index 0000000000000..2fa9c14d2cdb6 --- /dev/null +++ b/crates/ruff_linter/resources/test/fixtures/airflow/AIR304.py @@ -0,0 +1,30 @@ +from airflow.operators.subdag import SubDagOperator +from airflow.operators.python import PythonOperator +from airflow.utils.task_group import TaskGroup # This is the recommended alternative + +# This should trigger AIR304 +task1 = SubDagOperator( + task_id="subdag_task", + subdag=some_subdag, +) + +# This should be fine +task2 = PythonOperator( + task_id="python_task", + python_callable=lambda: None, +) + +# This is the recommended way using TaskGroup +with TaskGroup(group_id="my_task_group") as task_group: + task4 = PythonOperator( + task_id="task_in_group", + python_callable=lambda: None, + ) + +# Direct import should also trigger AIR304 +from airflow.operators.subdag_operator import SubDagOperator + +task3 = SubDagOperator( + task_id="another_subdag", + subdag=another_subdag, +) diff --git a/crates/ruff_linter/src/checkers/ast/analyze/expression.rs b/crates/ruff_linter/src/checkers/ast/analyze/expression.rs index 0c2f4e235c087..fe1cfe1f0dc1a 100644 --- a/crates/ruff_linter/src/checkers/ast/analyze/expression.rs +++ b/crates/ruff_linter/src/checkers/ast/analyze/expression.rs @@ -1076,6 +1076,9 @@ pub(crate) fn expression(expr: &Expr, checker: &mut Checker) { if checker.enabled(Rule::AirflowDagNoScheduleArgument) { airflow::rules::dag_no_schedule_argument(checker, expr); } + if checker.enabled(Rule::AirflowDeprecatedSubDag) { + airflow::rules::subdag_check(checker, expr); + } } Expr::Dict(dict) => { if checker.any_enabled(&[ diff --git a/crates/ruff_linter/src/codes.rs b/crates/ruff_linter/src/codes.rs index 1c0fb68f73279..7306833fa2f9b 100644 --- a/crates/ruff_linter/src/codes.rs +++ b/crates/ruff_linter/src/codes.rs @@ -1035,6 +1035,7 @@ pub fn code_to_rule(linter: Linter, code: &str) -> Option<(RuleGroup, Rule)> { // airflow (Airflow, "001") => (RuleGroup::Stable, rules::airflow::rules::AirflowVariableNameTaskIdMismatch), (Airflow, "301") => (RuleGroup::Preview, rules::airflow::rules::AirflowDagNoScheduleArgument), + (Airflow, "304") => (RuleGroup::Preview, rules::airflow::rules::AirflowDeprecatedSubDag), // perflint (Perflint, "101") => (RuleGroup::Stable, rules::perflint::rules::UnnecessaryListCast), diff --git a/crates/ruff_linter/src/rules/airflow/mod.rs b/crates/ruff_linter/src/rules/airflow/mod.rs index 4e4130766022f..a321c31daeffe 100644 --- a/crates/ruff_linter/src/rules/airflow/mod.rs +++ b/crates/ruff_linter/src/rules/airflow/mod.rs @@ -14,6 +14,7 @@ mod tests { #[test_case(Rule::AirflowVariableNameTaskIdMismatch, Path::new("AIR001.py"))] #[test_case(Rule::AirflowDagNoScheduleArgument, Path::new("AIR301.py"))] + #[test_case(Rule::AirflowDeprecatedSubDag, Path::new("AIR304.py"))] fn rules(rule_code: Rule, path: &Path) -> Result<()> { let snapshot = format!("{}_{}", rule_code.noqa_code(), path.to_string_lossy()); let diagnostics = test_path( diff --git a/crates/ruff_linter/src/rules/airflow/rules/mod.rs b/crates/ruff_linter/src/rules/airflow/rules/mod.rs index b01391092ca77..12b5d180305d0 100644 --- a/crates/ruff_linter/src/rules/airflow/rules/mod.rs +++ b/crates/ruff_linter/src/rules/airflow/rules/mod.rs @@ -1,5 +1,7 @@ pub(crate) use dag_schedule_argument::*; +pub(crate) use subdag_operator::*; pub(crate) use task_variable_name::*; mod dag_schedule_argument; +mod subdag_operator; mod task_variable_name; diff --git a/crates/ruff_linter/src/rules/airflow/rules/subdag_operator.rs b/crates/ruff_linter/src/rules/airflow/rules/subdag_operator.rs new file mode 100644 index 0000000000000..9be4bf87da933 --- /dev/null +++ b/crates/ruff_linter/src/rules/airflow/rules/subdag_operator.rs @@ -0,0 +1,70 @@ +use ruff_diagnostics::{Diagnostic, Violation}; +use ruff_macros::{derive_message_formats, violation}; +use ruff_python_ast as ast; +use ruff_python_ast::Expr; +use ruff_python_semantic::Modules; +use ruff_text_size::Ranged; + +use crate::checkers::ast::Checker; + +/// ## What it does +/// Checks for usage of deprecated SubDag functionality in Airflow. +/// +/// ## Why is this bad? +/// SubDags have been removed in Airflow 3.0 in favor of TaskGroups. +/// Code using SubDags needs to be updated to use TaskGroups instead. +/// +/// ## Example +/// ```python +/// from airflow.operators.subdag import SubDagOperator +/// +/// # Invalid: using deprecated SubDag +/// task = SubDagOperator( +/// task_id="subdag_task", +/// subdag=some_subdag, +/// ) +/// ``` +/// +/// Use instead: +/// ```python +/// from airflow.utils.task_group import TaskGroup +/// +/// # Valid: using TaskGroup +/// with TaskGroup(group_id="my_task_group") as task_group: +/// task = SomeOperator(...) +/// ``` +#[violation] +pub struct AirflowDeprecatedSubDag; + +impl Violation for AirflowDeprecatedSubDag { + #[derive_message_formats] + fn message(&self) -> String { + "SubDags have been removed in Airflow 3.0, use TaskGroups instead".to_string() + } +} + +/// AIR304 +pub(crate) fn subdag_check(checker: &mut Checker, value: &Expr) { + // If the value is not a call, we can't do anything. + let ast::Expr::Call(ast::ExprCall { func, .. }) = value else { + return; + }; + + // If we haven't seen any airflow imports, we can't do anything. + if !checker.semantic().seen_module(Modules::AIRFLOW) { + return; + } + + // Check if it's a SubDagOperator import or usage + if let Some(qualified_name) = checker.semantic().resolve_qualified_name(func) { + let segments = qualified_name.segments(); + if segments.contains(&"operators") && segments.contains(&"subdag") + // Also check for direct subdag imports + || segments.contains(&"SubDagOperator") + { + checker + .diagnostics + .push(Diagnostic::new(AirflowDeprecatedSubDag, func.range())); + } + } +} diff --git a/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR304_AIR304.py.snap b/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR304_AIR304.py.snap new file mode 100644 index 0000000000000..7522821090f78 --- /dev/null +++ b/crates/ruff_linter/src/rules/airflow/snapshots/ruff_linter__rules__airflow__tests__AIR304_AIR304.py.snap @@ -0,0 +1,22 @@ +--- +source: crates/ruff_linter/src/rules/airflow/mod.rs +snapshot_kind: text +--- +AIR304.py:6:9: AIR304 SubDags have been removed in Airflow 3.0, use TaskGroups instead + | +5 | # This should trigger AIR304 +6 | task1 = SubDagOperator( + | ^^^^^^^^^^^^^^ AIR304 +7 | task_id="subdag_task", +8 | subdag=some_subdag, + | + +AIR304.py:27:9: AIR304 SubDags have been removed in Airflow 3.0, use TaskGroups instead + | +25 | from airflow.operators.subdag_operator import SubDagOperator +26 | +27 | task3 = SubDagOperator( + | ^^^^^^^^^^^^^^ AIR304 +28 | task_id="another_subdag", +29 | subdag=another_subdag, + | diff --git a/ruff.schema.json b/ruff.schema.json index ff9810758261b..d095b7ade1c7b 100644 --- a/ruff.schema.json +++ b/ruff.schema.json @@ -2797,6 +2797,7 @@ "AIR3", "AIR30", "AIR301", + "AIR304", "ALL", "ANN", "ANN0",