Skip to content

Commit

Permalink
Rename DagBag.store_serialized_dags to Dagbag.read_dags_from_db (apac…
Browse files Browse the repository at this point in the history
  • Loading branch information
kaxil authored and scrambldchannel committed Jul 17, 2020
1 parent f9ec7b6 commit c662962
Show file tree
Hide file tree
Showing 9 changed files with 76 additions and 23 deletions.
34 changes: 34 additions & 0 deletions UPDATING.md
Original file line number Diff line number Diff line change
Expand Up @@ -1442,6 +1442,40 @@ Now the `dag_id` will not appear repeated in the payload, and the response forma
}
```

### Change in DagBag signature

Passing `store_serialized_dags` argument to DagBag.__init__ and accessing `DagBag.store_serialized_dags` property
are deprecated and will be removed in future versions.


**Previous signature**:

```python
DagBag(
dag_folder=None,
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
store_serialized_dags=False
):
```

**current**:
```python
DagBag(
dag_folder=None,
include_examples=conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode=conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
read_dags_from_db=False
):
```

If you were using positional arguments, it requires no change but if you were using keyword
arguments, please change `store_serialized_dags` to `read_dags_from_db`.

Similarly, if you were using `DagBag().store_serialized_dags` property, change it to
`DagBag().read_dags_from_db`.


## Airflow 1.10.11

### Use NULL as default value for dag.description
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ def check_and_get_dag(dag_id: str, task_id: Optional[str] = None) -> DagModel:

dagbag = DagBag(
dag_folder=dag_model.fileloc,
store_serialized_dags=conf.getboolean('core', 'store_serialized_dags')
read_dags_from_db=conf.getboolean('core', 'store_serialized_dags')
)
dag = dagbag.get_dag(dag_id) # prefetch dag if it is stored serialized
if dag_id not in dagbag.dags:
Expand Down
2 changes: 1 addition & 1 deletion airflow/api/common/experimental/trigger_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ def read_store_serialized_dags():
return conf.getboolean('core', 'store_serialized_dags')
dagbag = DagBag(
dag_folder=dag_model.fileloc,
store_serialized_dags=read_store_serialized_dags()
read_dags_from_db=read_store_serialized_dags()
)
dag_run = DagRun()
triggers = _trigger_dag(
Expand Down
43 changes: 31 additions & 12 deletions airflow/models/dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import os
import sys
import textwrap
import warnings
import zipfile
from datetime import datetime, timedelta
from typing import Dict, List, NamedTuple, Optional
Expand Down Expand Up @@ -69,34 +70,43 @@ class DagBag(BaseDagBag, LoggingMixin):
:param include_examples: whether to include the examples that ship
with airflow or not
:type include_examples: bool
:param store_serialized_dags: Read DAGs from DB if store_serialized_dags is ``True``.
:param read_dags_from_db: Read DAGs from DB if store_serialized_dags is ``True``.
If ``False`` DAGs are read from python files. This property is not used when
determining whether or not to write Serialized DAGs, that is done by checking
the config ``store_serialized_dags``.
:type store_serialized_dags: bool
:type read_dags_from_db: bool
"""

DAGBAG_IMPORT_TIMEOUT = conf.getint('core', 'DAGBAG_IMPORT_TIMEOUT')
SCHEDULER_ZOMBIE_TASK_THRESHOLD = conf.getint('scheduler', 'scheduler_zombie_task_threshold')

def __init__(
self,
dag_folder: Optional[str] = None,
include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
store_serialized_dags: bool = False,
self,
dag_folder: Optional[str] = None,
include_examples: bool = conf.getboolean('core', 'LOAD_EXAMPLES'),
safe_mode: bool = conf.getboolean('core', 'DAG_DISCOVERY_SAFE_MODE'),
read_dags_from_db: bool = False,
store_serialized_dags: Optional[bool] = None,
):
# Avoid circular import
from airflow.models.dag import DAG
super().__init__()

if store_serialized_dags:
warnings.warn(
"The store_serialized_dags parameter has been deprecated. "
"You should pass the read_dags_from_db parameter.",
DeprecationWarning, stacklevel=2)
read_dags_from_db = store_serialized_dags

dag_folder = dag_folder or settings.DAGS_FOLDER
self.dag_folder = dag_folder
self.dags: Dict[str, DAG] = {}
# the file's last modified timestamp when we last read it
self.file_last_changed: Dict[str, datetime] = {}
self.import_errors: Dict[str, str] = {}
self.has_logged = False
self.store_serialized_dags = store_serialized_dags
self.read_dags_from_db = read_dags_from_db

self.collect_dags(
dag_folder=dag_folder,
Expand All @@ -109,6 +119,15 @@ def size(self) -> int:
"""
return len(self.dags)

@property
def store_serialized_dags(self) -> bool:
"""Whether or not to read dags from DB"""
warnings.warn(
"The store_serialized_dags property has been deprecated. "
"Use read_dags_from_db instead.", DeprecationWarning, stacklevel=2
)
return self.read_dags_from_db

@property
def dag_ids(self) -> List[str]:
return list(self.dags.keys())
Expand All @@ -123,8 +142,8 @@ def get_dag(self, dag_id):
# Avoid circular import
from airflow.models.dag import DagModel

# Only read DAGs from DB if this dagbag is store_serialized_dags.
if self.store_serialized_dags:
# Only read DAGs from DB if this dagbag is read_dags_from_db.
if self.read_dags_from_db:
# Import here so that serialized dag is only imported when serialization is enabled
from airflow.models.serialized_dag import SerializedDagModel
if dag_id not in self.dags:
Expand Down Expand Up @@ -363,7 +382,7 @@ def collect_dags(
**Note**: The patterns in .airflowignore are treated as
un-anchored regexes, not shell-like glob patterns.
"""
if self.store_serialized_dags:
if self.read_dags_from_db:
return

self.log.info("Filling up the DagBag from %s", dag_folder)
Expand Down Expand Up @@ -439,7 +458,7 @@ def sync_to_db(self):
self.log.debug("Calling the DAG.bulk_sync_to_db method")
DAG.bulk_sync_to_db(self.dags.values())
# Write Serialized DAGs to DB if DAG Serialization is turned on
# Even though self.store_serialized_dags is False
# Even though self.read_dags_from_db is False
if settings.STORE_SERIALIZED_DAGS:
self.log.debug("Calling the SerializedDagModel.bulk_sync_to_db method")
SerializedDagModel.bulk_sync_to_db(self.dags.values())
2 changes: 1 addition & 1 deletion airflow/www/extensions/init_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,4 +29,4 @@ def init_dagbag(app):
if os.environ.get('SKIP_DAGS_PARSING') == 'True':
app.dag_bag = DagBag(os.devnull, include_examples=False)
else:
app.dag_bag = DagBag(DAGS_FOLDER, store_serialized_dags=STORE_SERIALIZED_DAGS)
app.dag_bag = DagBag(DAGS_FOLDER, read_dags_from_db=STORE_SERIALIZED_DAGS)
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_dag_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def test_should_response_200(self):
def test_should_response_200_serialized(self):
# Create empty app with empty dagbag to check if DAG is read from db
app_serialized = app.create_app(testing=True) # type:ignore
dag_bag = DagBag(os.devnull, include_examples=False, store_serialized_dags=True)
dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True)
app_serialized.dag_bag = dag_bag # type:ignore
client = app_serialized.test_client()

Expand Down
2 changes: 1 addition & 1 deletion tests/api_connexion/endpoints/test_task_endpoint.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ def test_should_response_200(self):
def test_should_response_200_serialized(self):
# Create empty app with empty dagbag to check if DAG is read from db
app_serialized = app.create_app(testing=True) # type:ignore
dag_bag = DagBag(os.devnull, include_examples=False, store_serialized_dags=True)
dag_bag = DagBag(os.devnull, include_examples=False, read_dags_from_db=True)
app_serialized.dag_bag = dag_bag # type:ignore
client = app_serialized.test_client()

Expand Down
8 changes: 4 additions & 4 deletions tests/jobs/test_scheduler_job.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,9 @@ def create_test_dag(self, start_date=DEFAULT_DATE, end_date=DEFAULT_DATE + timed
@patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
def setUpClass(cls):
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False)
non_serialized_dagbag.sync_to_db()
cls.dagbag = DagBag(store_serialized_dags=True)
cls.dagbag = DagBag(read_dags_from_db=True)

def test_dag_file_processor_sla_miss_callback(self):
"""
Expand Down Expand Up @@ -1373,9 +1373,9 @@ def setUp(self):
@patch("airflow.models.dagbag.settings.STORE_SERIALIZED_DAGS", True)
def setUpClass(cls):
# Ensure the DAGs we are looking at from the DB are up-to-date
non_serialized_dagbag = DagBag(store_serialized_dags=False, include_examples=False)
non_serialized_dagbag = DagBag(read_dags_from_db=False, include_examples=False)
non_serialized_dagbag.sync_to_db()
cls.dagbag = DagBag(store_serialized_dags=True)
cls.dagbag = DagBag(read_dags_from_db=True)

def test_is_alive(self):
job = SchedulerJob(None, heartrate=10, state=State.RUNNING)
Expand Down
4 changes: 2 additions & 2 deletions tests/models/test_dagbag.py
Original file line number Diff line number Diff line change
Expand Up @@ -642,7 +642,7 @@ def test_deactivate_unknown_dags(self):
def test_serialized_dags_are_written_to_db_on_sync(self):
"""
Test that when dagbag.sync_to_db is called the DAGs are Serialized and written to DB
even when dagbag.store_serialized_dags is False
even when dagbag.read_dags_from_db is False
"""
with create_session() as session:
serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
Expand All @@ -653,7 +653,7 @@ def test_serialized_dags_are_written_to_db_on_sync(self):
include_examples=False)
dagbag.sync_to_db()

self.assertFalse(dagbag.store_serialized_dags)
self.assertFalse(dagbag.read_dags_from_db)

new_serialized_dags_count = session.query(func.count(SerializedDagModel.dag_id)).scalar()
self.assertEqual(new_serialized_dags_count, 1)

0 comments on commit c662962

Please sign in to comment.