diff --git a/airflow/config_templates/config.yml b/airflow/config_templates/config.yml index 1a2d664955ddd..5890e43de6c87 100644 --- a/airflow/config_templates/config.yml +++ b/airflow/config_templates/config.yml @@ -2676,30 +2676,38 @@ dag_bundles: description: | Configuration for the DAG bundles. This allows Airflow to load DAGs from different sources. - Airflow will consume all options added to this section. Below you will see only the default, - ``dags_folder``. The option name is the bundle name and the value is a json object with the following - keys: - - * classpath: The classpath of the bundle class - * kwargs: The keyword arguments to pass to the bundle class - * refresh_interval: The interval in seconds to refresh the bundle from its source. + options: + backends: + description: | + List of backend configs. Must supply name, classpath, and kwargs for each backend. - For example, to add a new bundle named ``hello`` to my Airflow instance, add the following to your - airflow.cfg (this is just an example, the classpath and kwargs are not real): + By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can + also be overridden in kwargs if desired. - .. code-block:: ini + The default is the dags folder dag bundle. - [dag_bundles] - hello: {classpath: "airflow.some.classpath", kwargs: {"hello": "world"}, refresh_interval: 60} - options: - dags_folder: - description: | - This is the default DAG bundle that loads DAGs from the traditional ``[core] dags_folder``. - By default, ``refresh_interval`` is set to ``[scheduler] dag_dir_list_interval``, but that can be - overridden here if desired. - Parsing DAGs from the DAG folder can be disabled by setting this option to an empty string. - version_added: ~ + Note: As shown below, you can split your json config over multiple lines by indenting. + See configparser documentation for an example: + https://docs.python.org/3/library/configparser.html#supported-ini-file-structure. + version_added: 3.0.0 type: string - example: ~ - default: '{{"classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", - "kwargs": {{}}}}' + example: > + [ + { + "name": "my-git-repo", + "classpath": "airflow.dag_processing.bundles.git.GitDagBundle", + "kwargs": { + "subdir": "dags", + "repo_url": "git@github.com:example.com/my-dags.git", + "tracking_ref": "main", + "refresh_interval": 0 + } + ] + default: > + [ + {{ + "name": "dags-folder", + "classpath": "airflow.dag_processing.bundles.dagfolder.DagsFolderDagBundle", + "kwargs": {{}} + }} + ] diff --git a/airflow/configuration.py b/airflow/configuration.py index f5b2f8f7d5328..1ea8948483175 100644 --- a/airflow/configuration.py +++ b/airflow/configuration.py @@ -515,6 +515,8 @@ def _write_option_header( if example is not None and include_examples: if extra_spacing: file.write("#\n") + example_lines = example.splitlines() + example = "\n# ".join(example_lines) file.write(f"# Example: {option} = {example}\n") needs_separation = True if include_sources and sources_dict: @@ -553,6 +555,8 @@ def _write_value( file.write(f"# {option} = \n") else: if comment_out_everything: + value_lines = value.splitlines() + value = "\n# ".join(value_lines) file.write(f"# {option} = {value}\n") else: file.write(f"{option} = {value}\n") diff --git a/airflow/dag_processing/bundles/manager.py b/airflow/dag_processing/bundles/manager.py index 4f8b59b956e18..2eaba73148571 100644 --- a/airflow/dag_processing/bundles/manager.py +++ b/airflow/dag_processing/bundles/manager.py @@ -26,6 +26,8 @@ from airflow.utils.session import NEW_SESSION, provide_session if TYPE_CHECKING: + from collections.abc import Iterable + from sqlalchemy.orm import Session from airflow.dag_processing.bundles.base import BaseDagBundle @@ -34,52 +36,57 @@ class DagBundlesManager(LoggingMixin): """Manager for DAG bundles.""" - @property - def bundle_configs(self) -> dict[str, dict]: - """Get all DAG bundle configurations.""" - configured_bundles = conf.getsection("dag_bundles") + def __init__(self, *args, **kwargs): + super().__init__(*args, **kwargs) + self._bundle_config = {} + self.parse_config() - if not configured_bundles: - return {} + def parse_config(self) -> None: + """ + Get all DAG bundle configurations and store in instance variable. - # If dags_folder is empty string, we remove it. This allows the default dags_folder bundle to be disabled. - if not configured_bundles["dags_folder"]: - del configured_bundles["dags_folder"] + If a bundle class for a given name has already been imported, it will not be imported again. - dict_bundles: dict[str, dict] = {} - for key in configured_bundles.keys(): - config = conf.getjson("dag_bundles", key) - if not isinstance(config, dict): - raise AirflowConfigException(f"Bundle config for {key} is not a dict: {config}") - dict_bundles[key] = config + todo (AIP-66): proper validation of the bundle configuration so we have better error messages - return dict_bundles + :meta private: + """ + if self._bundle_config: + return + + backends = conf.getjson("dag_bundles", "backends") + + if not backends: + return + + if not isinstance(backends, list): + raise AirflowConfigException( + "Bundle config is not a list. Check config value" + " for section `dag_bundles` and key `backends`." + ) + seen = set() + for cfg in backends: + name = cfg["name"] + if name in seen: + raise ValueError(f"Dag bundle {name} is configured twice.") + seen.add(name) + class_ = import_string(cfg["classpath"]) + kwargs = cfg["kwargs"] + self._bundle_config[name] = (class_, kwargs) @provide_session def sync_bundles_to_db(self, *, session: Session = NEW_SESSION) -> None: - known_bundles = {b.name: b for b in session.query(DagBundleModel).all()} - - for name in self.bundle_configs.keys(): - if bundle := known_bundles.get(name): + stored = {b.name: b for b in session.query(DagBundleModel).all()} + for name in self._bundle_config.keys(): + if bundle := stored.pop(name, None): bundle.active = True else: session.add(DagBundleModel(name=name)) self.log.info("Added new DAG bundle %s to the database", name) - for name, bundle in known_bundles.items(): - if name not in self.bundle_configs: - bundle.active = False - self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name) - - def get_all_dag_bundles(self) -> list[BaseDagBundle]: - """ - Get all DAG bundles. - - :param session: A database session. - - :return: list of DAG bundles. - """ - return [self.get_bundle(name, version=None) for name in self.bundle_configs.keys()] + for name, bundle in stored.items(): + bundle.active = False + self.log.warning("DAG bundle %s is no longer found in config and has been disabled", name) def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: """ @@ -90,7 +97,17 @@ def get_bundle(self, name: str, version: str | None = None) -> BaseDagBundle: :return: The DAG bundle. """ - # TODO: proper validation of the bundle configuration so we have better error messages - bundle_config = self.bundle_configs[name] - bundle_class = import_string(bundle_config["classpath"]) - return bundle_class(name=name, version=version, **bundle_config["kwargs"]) + cfg_tuple = self._bundle_config.get(name) + if not cfg_tuple: + raise ValueError(f"Requested bundle '{name}' is not configured.") + class_, kwargs = cfg_tuple + return class_(name=name, version=version, **kwargs) + + def get_all_dag_bundles(self) -> Iterable[BaseDagBundle]: + """ + Get all DAG bundles. + + :return: list of DAG bundles. + """ + for name, (class_, kwargs) in self._bundle_config.items(): + yield class_(name=name, version=None, **kwargs) diff --git a/docs/exts/includes/sections-and-options.rst b/docs/exts/includes/sections-and-options.rst index f191cf10d5579..f9bbf5c83d5fe 100644 --- a/docs/exts/includes/sections-and-options.rst +++ b/docs/exts/includes/sections-and-options.rst @@ -60,7 +60,15 @@ {% endif %} :Type: {{ option["type"] }} - :Default: ``{{ "''" if option["default"] == "" else option["default"] }}`` + :Default: + {% set default = option["default"] %} + {% if default and "\n" in default %} + .. code-block:: + + {{ default }} + {% else %} + ``{{ "''" if default == "" else default }}`` + {% endif %} {% if option.get("sensitive") %} :Environment Variables: ``AIRFLOW__{{ section_name | replace(".", "_") | upper }}__{{ option_name | upper }}`` @@ -71,9 +79,16 @@ {% else %} :Environment Variable: ``AIRFLOW__{{ section_name | replace(".", "_") | upper }}__{{ option_name | upper }}`` {% endif %} - {% if option["example"] %} + {% set example = option["example"] %} + {% if example %} :Example: - ``{{ option["example"] }}`` + {% if "\n" in example %} + .. code-block:: + + {{ example }} + {% else %} + ``{{ example }}`` + {% endif %} {% endif %} {% endfor %} diff --git a/tests/cli/commands/remote_commands/test_config_command.py b/tests/cli/commands/remote_commands/test_config_command.py index f932b1851d227..b697f6d91cfc0 100644 --- a/tests/cli/commands/remote_commands/test_config_command.py +++ b/tests/cli/commands/remote_commands/test_config_command.py @@ -206,7 +206,8 @@ def test_cli_comment_out_everything(self): ) output = temp_stdout.getvalue() lines = output.splitlines() - assert all(not line.strip() or line.startswith(("#", "[")) for line in lines if line) + bad_lines = [l for l in lines if l and not (not l.strip() or l.startswith(("#", "[")))] # noqa: E741 + assert bad_lines == [] class TestCliConfigGetValue: diff --git a/tests/dag_processing/bundles/test_dag_bundle_manager.py b/tests/dag_processing/bundles/test_dag_bundle_manager.py index c79acfc2ed2a5..d9bc286181f28 100644 --- a/tests/dag_processing/bundles/test_dag_bundle_manager.py +++ b/tests/dag_processing/bundles/test_dag_bundle_manager.py @@ -19,6 +19,7 @@ import json import os +from contextlib import nullcontext from unittest.mock import patch import pytest @@ -33,41 +34,52 @@ @pytest.mark.parametrize( - "envs,expected_names", + "value, expected", [ - pytest.param({}, {"dags_folder"}, id="no_config"), + pytest.param(None, {"dags-folder"}, id="default"), + pytest.param("{}", set(), id="empty dict"), pytest.param( - {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": "{}"}, {"testbundle", "dags_folder"}, id="add_bundle" + "[]", + set(), + id="empty list", ), - pytest.param({"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": ""}, set(), id="remove_dags_folder_default"), pytest.param( - {"AIRFLOW__DAG_BUNDLES__DAGS_FOLDER": "", "AIRFLOW__DAG_BUNDLES__TESTBUNDLE": "{}"}, - {"testbundle"}, + json.dumps( + [ + { + "name": "my-bundle", + "classpath": "airflow.dag_processing.bundles.local.LocalDagBundle", + "kwargs": {"local_folder": "/tmp/hihi", "refresh_interval": 1}, + } + ] + ), + {"my-bundle"}, id="remove_dags_folder_default_add_bundle", ), + pytest.param( + "[]", + set(), + id="remove_dags_folder_default", + ), + pytest.param("1", "Bundle config is not a list", id="int"), + pytest.param("abc", "Unable to parse .* as valid json", id="not_json"), ], ) -def test_bundle_configs_property(envs, expected_names): +def test_parse_bundle_config(value, expected): """Test that bundle_configs are read from configuration.""" - bundle_manager = DagBundlesManager() - with patch.dict(os.environ, envs): - names = set(bundle_manager.bundle_configs.keys()) - assert names == expected_names + envs = {"AIRFLOW__DAG_BUNDLES__BACKENDS": value} if value else {} + cm = nullcontext() + exp_fail = False + if isinstance(expected, str): + exp_fail = True + cm = pytest.raises(AirflowConfigException, match=expected) + with patch.dict(os.environ, envs), cm: + bundle_manager = DagBundlesManager() + names = set(x.name for x in bundle_manager.get_all_dag_bundles()) -@pytest.mark.parametrize( - "config,message", - [ - pytest.param("1", "Bundle config for testbundle is not a dict: 1", id="int"), - pytest.param("[]", r"Bundle config for testbundle is not a dict: \[\]", id="list"), - pytest.param("abc", r"Unable to parse .* as valid json", id="not_json"), - ], -) -def test_bundle_configs_property_raises(config, message): - bundle_manager = DagBundlesManager() - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": config}): - with pytest.raises(AirflowConfigException, match=message): - bundle_manager.bundle_configs + if not exp_fail: + assert names == expected class BasicBundle(BaseDagBundle): @@ -81,46 +93,37 @@ def path(self): pass -BASIC_BUNDLE_CONFIG = { - "classpath": "tests.dag_processing.bundles.test_dag_bundle_manager.BasicBundle", - "kwargs": {"refresh_interval": 1}, -} +BASIC_BUNDLE_CONFIG = [ + { + "name": "my-test-bundle", + "classpath": "tests.dag_processing.bundles.test_dag_bundle_manager.BasicBundle", + "kwargs": {"refresh_interval": 1}, + } +] def test_get_bundle(): """Test that get_bundle builds and returns a bundle.""" - bundle_manager = DagBundlesManager() + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle_manager = DagBundlesManager() - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundle = bundle_manager.get_bundle(name="testbundle", version="hello") + with pytest.raises(ValueError, match="'bundle-that-doesn't-exist' is not configured"): + bundle_manager.get_bundle(name="bundle-that-doesn't-exist", version="hello") + bundle = bundle_manager.get_bundle(name="my-test-bundle", version="hello") assert isinstance(bundle, BasicBundle) - assert bundle.name == "testbundle" + assert bundle.name == "my-test-bundle" assert bundle.version == "hello" assert bundle.refresh_interval == 1 # And none for version also works! - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundle = bundle_manager.get_bundle(name="testbundle") + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): + bundle = bundle_manager.get_bundle(name="my-test-bundle") assert isinstance(bundle, BasicBundle) - assert bundle.name == "testbundle" + assert bundle.name == "my-test-bundle" assert bundle.version is None -def test_get_all_dag_bundles(): - """Test that get_all_dag_bundles returns all bundles.""" - - bundle_manager = DagBundlesManager() - - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundles = bundle_manager.get_all_dag_bundles() - assert len(bundles) == 2 - assert all(isinstance(x, BaseDagBundle) for x in bundles) - - bundle_names = {x.name for x in bundles} - assert bundle_names == {"testbundle", "dags_folder"} - - @pytest.fixture def clear_db(): clear_db_dag_bundles() @@ -130,8 +133,6 @@ def clear_db(): @pytest.mark.db_test def test_sync_bundles_to_db(clear_db): - bundle_manager = DagBundlesManager() - def _get_bundle_names_and_active(): with create_session() as session: return ( @@ -139,15 +140,19 @@ def _get_bundle_names_and_active(): ) # Initial add - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", True)] + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): + manager = DagBundlesManager() + manager.sync_bundles_to_db() + assert _get_bundle_names_and_active() == [("my-test-bundle", True)] - # Disable ones that disappear from config - bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", False)] + # simulate bundle config change + # note: airflow will detect config changes when they are in env vars + manager = DagBundlesManager() + manager.sync_bundles_to_db() + assert _get_bundle_names_and_active() == [("dags-folder", True), ("my-test-bundle", False)] # Re-enable one that reappears in config - with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__TESTBUNDLE": json.dumps(BASIC_BUNDLE_CONFIG)}): - bundle_manager.sync_bundles_to_db() - assert _get_bundle_names_and_active() == [("dags_folder", True), ("testbundle", True)] + with patch.dict(os.environ, {"AIRFLOW__DAG_BUNDLES__BACKENDS": json.dumps(BASIC_BUNDLE_CONFIG)}): + manager = DagBundlesManager() + manager.sync_bundles_to_db() + assert _get_bundle_names_and_active() == [("dags-folder", False), ("my-test-bundle", True)]