diff --git a/CHANGELOG.md b/CHANGELOG.md index 53db54cc8a..0c44e4b8fc 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,8 +12,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Fixed +- The FlowETL config file is now always validated, avoiding runtime errors if a config setting is wrong or missing. [#1375](https://github.com/Flowminder/FlowKit/issues/1375) +- FlowETL now only creates DAGs for CDR types which are present in the config, leading to a better user experience in the Airflow UI. [#1376](https://github.com/Flowminder/FlowKit/issues/1376) +- The `concurrency` settings in the FlowETL config are no longer ignored. [#1378](https://github.com/Flowminder/FlowKit/issues/1378) +- The FlowETL deployment example has been updated so that it no longer fails due to a missing foreign data wrapper for the available CDR dates. [#1379](https://github.com/Flowminder/FlowKit/issues/1379) + ### Removed +- The `default_args` section in the FlowETL config file has been removed. [#1377](https://github.com/Flowminder/FlowKit/issues/1377) + ## [0.9.0] @@ -45,11 +52,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/). ### Fixed - Quickstart will no longer fail if it has been run previously with a different FlowDB data size and not explicitly shut down. [#900](https://github.com/Flowminder/FlowKit/issues/900) - ### Removed - Flowmachine's `subscriber_locations_cluster` function has been removed - use `HartiganCluster` or `MeaningfulLocations` directly. - FlowAPI no longer supports the non-reproducible random sampling method `system_rows`. [#1263](https://github.com/Flowminder/FlowKit/issues/1263) + ## [0.8.0] ### Added diff --git a/flowetl/dags/etl.py b/flowetl/dags/etl.py index df0c2d975b..72b53e1400 100644 --- a/flowetl/dags/etl.py +++ b/flowetl/dags/etl.py @@ -9,25 +9,18 @@ import os import structlog -from pathlib import Path - -# need to import and not use so that airflow looks here for a DAG +# Need to import the DAG class (even if it is not directly +# used in this file) so that Airflow looks here for a DAG. from airflow import DAG # pylint: disable=unused-import -from pendulum import parse from etl.dag_task_callable_mappings import ( TEST_ETL_TASK_CALLABLES, PRODUCTION_ETL_TASK_CALLABLES, ) from etl.etl_utils import construct_etl_dag, CDRType -from etl.config_parser import ( - get_config_from_file, - validate_config, - fill_config_default_values, -) +from etl.config_parser import get_config_from_file logger = structlog.get_logger("flowetl") -default_args = {"owner": "flowminder", "start_date": parse("1900-01-01")} ETL_TASK_CALLABLES = { "testing": TEST_ETL_TASK_CALLABLES, @@ -39,33 +32,32 @@ # Determine if we are in a testing environment - use dummy callables if so if flowetl_runtime_config == "testing": task_callable_mapping = TEST_ETL_TASK_CALLABLES - logger.info("running in testing environment") + logger.info("Running in testing environment") dag = construct_etl_dag( - **task_callable_mapping, default_args=default_args, cdr_type="testing" + **task_callable_mapping, cdr_type="testing", max_active_runs_per_dag=1 ) elif flowetl_runtime_config == "production": task_callable_mapping = PRODUCTION_ETL_TASK_CALLABLES - logger.info("running in production environment") + logger.info("Running in production environment") # read and validate the config file before creating the DAGs - global_config_dict = get_config_from_file( - config_filepath=Path("/mounts/config/config.yml") - ) - validate_config(global_config_dict) - global_config_dict = fill_config_default_values(global_config_dict) - - default_args = global_config_dict["default_args"] + global_config_dict = get_config_from_file("/mounts/config/config.yml") - # create DAG for each cdr_type + # Create DAG for each cdr_type occurring in the config for cdr_type in CDRType: # Ensure `cdr_type` is a string (e.g. "sms", instead of the raw value `CDRType.SMS`) # so that interpolation in SQL templates works as expected. - cdr_type = cdr_type.value + cdr_type = CDRType(cdr_type).value - globals()[f"etl_{cdr_type}"] = construct_etl_dag( - **task_callable_mapping, default_args=default_args, cdr_type=cdr_type - ) + # Only process CDR types that are actually specified in the config + if cdr_type in global_config_dict["etl"]: + max_active_runs_per_dag = global_config_dict["etl"][cdr_type]["concurrency"] + globals()[f"etl_{cdr_type}"] = construct_etl_dag( + **task_callable_mapping, + cdr_type=cdr_type, + max_active_runs_per_dag=max_active_runs_per_dag, + ) else: raise ValueError( f"Invalid config name: '{flowetl_runtime_config}'. " diff --git a/flowetl/dags/etl_sensor.py b/flowetl/dags/etl_sensor.py index ed995a65cc..1ae8c4b22b 100644 --- a/flowetl/dags/etl_sensor.py +++ b/flowetl/dags/etl_sensor.py @@ -6,21 +6,16 @@ import os import structlog -# need to import and not use so that airflow looks here for a DAG +# Need to import the DAG class (even if it is not directly +# used in this file) so that Airflow looks here for a DAG. from airflow import DAG # pylint: disable=unused-import -from pendulum import parse - from etl.etl_utils import construct_etl_sensor_dag from etl.dag_task_callable_mappings import ( TEST_ETL_SENSOR_TASK_CALLABLE, PRODUCTION_ETL_SENSOR_TASK_CALLABLE, ) -logger = structlog.get_logger("flowetl") - -default_args = {"owner": "flowminder", "start_date": parse("1900-01-01")} - ETL_SENSOR_TASK_CALLABLES = { "testing": TEST_ETL_SENSOR_TASK_CALLABLE, "production": PRODUCTION_ETL_SENSOR_TASK_CALLABLE, @@ -36,7 +31,6 @@ f"Valid config names are: {list(ETL_SENSOR_TASK_CALLABLES.keys())}" ) +logger = structlog.get_logger("flowetl") logger.info(f"Running in {flowetl_runtime_config} environment") -dag = construct_etl_sensor_dag( - callable=etl_sensor_task_callable, default_args=default_args -) +dag = construct_etl_sensor_dag(callable=etl_sensor_task_callable) diff --git a/flowetl/deployment_example/README.md b/flowetl/deployment_example/README.md index d14becf06d..5b30ca947d 100644 --- a/flowetl/deployment_example/README.md +++ b/flowetl/deployment_example/README.md @@ -95,6 +95,50 @@ ingestion_db=# SELECT * FROM events.cdr LIMIT 3; (3 rows) ``` +### Create a view to determine available dates in IngestionDB + +FlowETL needs to be able to determine which dates are available for ingestion. +A convenient way to do this is to create a view in the ingestion database which lists all available dates. +Since our ingestion database is based on [TimescaleDB](https://docs.timescale.com/latest/main) and the table `events.cdr` +is a [hypertable](https://docs.timescale.com/latest/using-timescaledb/hypertables) with chunk size of 1 day, we can +extract this information from the chunk ranges. Further down we will + +Connect to the ingestion database (`make connect-ingestion_db`) and then run the following SQL snippet +to create the `available_dates view. +``` +CREATE VIEW available_dates AS ( + WITH chunk_ranges_as_strings AS ( + SELECT unnest(ranges) as ranges + FROM chunk_relation_size_pretty('events.cdr') + ), + chunk_ranges AS ( + SELECT + substring(ranges, 2, 24)::TIMESTAMPTZ as chunk_start, + substring(ranges, 25, 24)::TIMESTAMPTZ as chunk_end + FROM chunk_ranges_as_strings + ) + SELECT + chunk_start::DATE as cdr_date + FROM chunk_ranges + ORDER BY cdr_date +); +``` + +Let's confirm that it works as expected. +``` +ingestion_db=# SELECT * FROM available_dates; + cdr_date +------------ + 2019-09-02 + 2019-09-03 + 2019-09-04 + 2019-09-05 + 2019-09-06 + 2019-09-07 + 2019-09-08 +(7 rows) +``` + ### Set up a docker stack with FlowDB and FlowETL Set relevant environment variables for FlowDB and FlowETL. @@ -166,7 +210,7 @@ This is useful if you make changes to the source code, in which case you need to for these changes to be picked up. -### Create a foreign data wrapper to connect FlowDB to IngestionDB +### Create foreign data wrappers to connect FlowDB to IngestionDB Run the following from within `flowdb` (you can connect to flowdb by running `make connect-flowdb`). ``` @@ -187,6 +231,12 @@ CREATE USER MAPPING IF NOT EXISTS FOR flowdb password 'etletl' ); +CREATE FOREIGN TABLE sample_data_available_dates_fdw ( + cdr_date DATE + ) + SERVER ingestion_db_server + OPTIONS (table_name 'available_dates'); + CREATE FOREIGN TABLE sample_data_fdw ( event_time TIMESTAMPTZ, msisdn TEXT, @@ -196,8 +246,24 @@ CREATE FOREIGN TABLE sample_data_fdw ( OPTIONS (schema_name 'events', table_name 'cdr'); ``` -Let's verify that the foreign data wrapper was set up correctly, so that `flowdb` can now read data remotely from `ingestion_db`: +This creates two foreign data wrappers within `flowdb`. The first one (`sample_data_available_dates_fdw`) allows +to determine the dates for which CDR data is available, so that FlowETL can schedule ingestion for any unprocessed dates. +The second one (`sample_data_fdw`) wraps the actual data itself and acts as the "source" in the ETL pipeline. + +Let's verify that these were set up correctly, so that `flowdb` can now read data remotely from `ingestion_db`: ``` +flowdb=# SELECT * FROM sample_data_available_dates_fdw; + cdr_date +------------ + 2019-09-02 + 2019-09-03 + 2019-09-04 + 2019-09-05 + 2019-09-06 + 2019-09-07 + 2019-09-08 +(7 rows) + flowdb=# SELECT * FROM sample_data_fdw LIMIT 3; event_time | msisdn | cell_id -------------------------------+------------------------------------------------------------------+--------- diff --git a/flowetl/deployment_example/mounts/config/config.yml b/flowetl/deployment_example/mounts/config/config.yml index 945c3fb506..23d4499e73 100644 --- a/flowetl/deployment_example/mounts/config/config.yml +++ b/flowetl/deployment_example/mounts/config/config.yml @@ -1,6 +1,3 @@ -default_args: - owner: flowminder - start_date: '1900-01-01' etl: calls: concurrency: 4 diff --git a/flowetl/etl/etl/config_parser.py b/flowetl/etl/etl/config_parser.py index 26dd185337..4cd97be57c 100644 --- a/flowetl/etl/etl/config_parser.py +++ b/flowetl/etl/etl/config_parser.py @@ -10,6 +10,7 @@ from copy import deepcopy from pathlib import Path +from typing import Union from etl.etl_utils import CDRType @@ -32,11 +33,6 @@ def validate_config(global_config_dict: dict) -> None: if "etl" not in keys: exceptions.append(ValueError("etl must be a toplevel key in the config file")) - if "default_args" not in keys: - exceptions.append( - ValueError("default_args must be a toplevel key in the config file") - ) - etl_keys = global_config_dict.get("etl", {}).keys() if not set(etl_keys).issubset(CDRType): unexpected_keys = list(set(etl_keys).difference(CDRType)) @@ -112,13 +108,15 @@ def fill_config_default_values(global_config_dict: dict) -> dict: return global_config_dict -def get_config_from_file(*, config_filepath: Path) -> dict: +def get_config_from_file(config_filepath: Union[Path, str]) -> dict: """ Function used to load configuration from YAML file. + This also validates the structure of the config and + fills any optional settings with default values. Parameters ---------- - config_filepath : Path + config_filepath : Path or str Location of the file config.yml Returns @@ -126,5 +124,10 @@ def get_config_from_file(*, config_filepath: Path) -> dict: dict Yaml config loaded into a python dict """ + # Ensure config_filepath is actually a Path object (e.g. in case a string is passed) + config_filepath = Path(config_filepath) + content = config_filepath.open("r").read() - return yaml.load(content, Loader=yaml.SafeLoader) + config_dict = yaml.load(content, Loader=yaml.SafeLoader) + validate_config(config_dict) + return fill_config_default_values(config_dict) diff --git a/flowetl/etl/etl/etl_utils.py b/flowetl/etl/etl/etl_utils.py index a972f093bb..16faa1d164 100644 --- a/flowetl/etl/etl/etl_utils.py +++ b/flowetl/etl/etl/etl_utils.py @@ -9,7 +9,6 @@ import os import pendulum import re -import sqlalchemy from typing import List, Callable from enum import Enum @@ -23,7 +22,7 @@ from airflow.operators.python_operator import PythonOperator -def construct_etl_sensor_dag(*, callable: Callable, default_args: dict) -> DAG: +def construct_etl_sensor_dag(*, callable: Callable) -> DAG: """ This function constructs the sensor single task DAG that triggers ETL DAGS with correct config based on filename. @@ -33,16 +32,21 @@ def construct_etl_sensor_dag(*, callable: Callable, default_args: dict) -> DAG: callable : Callable The sense callable that deals with finding files and triggering ETL DAGs - default_args : dict - Default arguments for DAG Returns ------- DAG Airflow DAG """ + default_args = {"owner": "flowminder"} + + # Note: the `start_date` parameter needs to be set to a date in the past, + # otherwise Airflow won't run the DAG when it is triggered. with DAG( - dag_id=f"etl_sensor", schedule_interval=None, default_args=default_args + dag_id=f"etl_sensor", + start_date=pendulum.parse("1900-01-01"), + schedule_interval=None, + default_args=default_args, ) as dag: sense = PythonOperator( task_id="sense", python_callable=callable, provide_context=True @@ -63,8 +67,8 @@ def construct_etl_dag( quarantine: Callable, clean: Callable, fail: Callable, - default_args: dict, cdr_type: str, + max_active_runs_per_dag: int, config_path: str = "/mounts/config", ) -> DAG: """ @@ -94,12 +98,10 @@ def construct_etl_dag( The clean task callable. fail : Callable The fail task callable. - default_args : dict - A set of default args to pass to all callables. - Must containt at least "owner" key and "start" key (which must be a - pendulum date object) cdr_type : str The type of CDR that this ETL DAG will process. + max_active_runs_per_dag : int + The maximum number of active DAG runs per DAG. config_path : str The config path used to look for the sql templates. @@ -109,9 +111,15 @@ def construct_etl_dag( Specification of Airflow DAG for ETL """ + default_args = {"owner": "flowminder"} + + # Note: the `start_date` parameter needs to be set to a date in the past, + # otherwise Airflow won't run the DAG when it is triggered. with DAG( dag_id=f"etl_{cdr_type}", + start_date=pendulum.parse("1900-01-01"), schedule_interval=None, + max_active_runs=max_active_runs_per_dag, default_args=default_args, template_searchpath=config_path, # template paths will be relative to this user_defined_macros={ diff --git a/flowetl/etl/tests/test_config.py b/flowetl/etl/tests/test_config.py index 74573093ce..b33a66988e 100644 --- a/flowetl/etl/tests/test_config.py +++ b/flowetl/etl/tests/test_config.py @@ -60,20 +60,6 @@ def test_config_validation_fails_for_invalid_etl_section(sample_config_dict): validate_config(bad_config) -def test_config_validation_fails_no_default_args_section(sample_config_dict): - """ - Check that we get an exception raised if default args - subsection missing. - """ - bad_config = deepcopy(sample_config_dict) - bad_config.pop("default_args") - - with pytest.raises(ValueError) as raised_exception: - validate_config(bad_config) - - assert len(raised_exception.value.args[0]) == 1 - - def test_config_validation_fails_bad_etl_subsection(sample_config_dict): """ Check that we get an exception raised if an etl subsection @@ -200,15 +186,39 @@ def test_get_config_from_file(tmpdir): """ Test that we can load yaml to dict from file """ - sample_dict = {"A": 23, "B": [1, 2, 34], "C": {"A": "bob"}} + sample_dict = { + "etl": {"calls": {"concurrency": 3, "source": {"source_type": "csv"}}} + } config_dir = tmpdir.mkdir("config") config_file = config_dir.join("config.yml") config_file.write(yaml.dump(sample_dict)) - config = get_config_from_file(config_filepath=Path(config_file)) + config = get_config_from_file(config_filepath=config_file) assert config == sample_dict +def test_loading_invalid_config_file_raises_error(tmpdir): + """ + Test that the config is validated when loading from a file and errors are raised. + """ + invalid_config = textwrap.dedent( + """ + etl: + calls: + source: + source_type: sql + table_name: "sample_data_fdw" + """ + ) + + config_file = tmpdir.join("config.yml") + config_file.write(invalid_config) + + error_msg = "Each etl subsection must contain a 'source' and 'concurrency' subsection - not present for 'calls'." + with pytest.raises(ValueError, match=error_msg): + get_config_from_file(config_filepath=config_file) + + def test_extract_date_from_filename(): filename = "CALLS_20160101.csv.gz" filename_pattern = r"CALLS_(\d{8}).csv.gz" @@ -238,9 +248,6 @@ def test_sql_find_available_dates(sample_config_dict): config_without_explicit_sql = textwrap.dedent( """ - default_args: - owner: flowminder - start_date: '1900-01-01' etl: calls: concurrency: 4 diff --git a/flowetl/etl/tests/test_construct_etl_dag.py b/flowetl/etl/tests/test_construct_etl_dag.py index 42ecbc1f71..e94e309115 100644 --- a/flowetl/etl/tests/test_construct_etl_dag.py +++ b/flowetl/etl/tests/test_construct_etl_dag.py @@ -8,10 +8,8 @@ """ import pytest from pendulum import parse -from pendulum.parsing.exceptions import ParserError from unittest.mock import Mock -from airflow.exceptions import AirflowException from airflow.operators.python_operator import PythonOperator from etl.etl_utils import construct_etl_dag @@ -27,14 +25,13 @@ def test_construct_etl_dag_with_test_callables(): specified in the task_callable_mapping argument. Use TEST_ETL_TASK_CALLABLES mapping. """ - default_args = {"owner": "bob", "start_date": parse("1900-01-01")} task_callable_mapping = { t: Mock(wraps=v) for (t, v) in TEST_ETL_TASK_CALLABLES.items() } cdr_type = "spaghetti" dag = construct_etl_dag( - **task_callable_mapping, default_args=default_args, cdr_type=cdr_type + **task_callable_mapping, cdr_type=cdr_type, max_active_runs_per_dag=1 ) assert dag.dag_id == f"etl_{cdr_type}" @@ -57,14 +54,13 @@ def test_construct_etl_dag_with_production_callables(): specified in the task_callable_mapping argument. Use PRODUCTION_ETL_TASK_CALLABLES mapping. """ - default_args = {"owner": "bob", "start_date": parse("1900-01-01")} task_callable_mapping = { t: Mock(wraps=v) for (t, v) in PRODUCTION_ETL_TASK_CALLABLES.items() } cdr_type = "spaghetti" dag = construct_etl_dag( - **task_callable_mapping, default_args=default_args, cdr_type=cdr_type + **task_callable_mapping, cdr_type=cdr_type, max_active_runs_per_dag=1 ) assert dag.dag_id == f"etl_{cdr_type}" @@ -83,50 +79,46 @@ def test_construct_etl_dag_with_production_callables(): [t.assert_called_once() for _, t in task_callable_mapping.items()] -def test_construct_etl_dag_fails_with_no_start_date(): +def test_construct_etl_dag_sets_owner_to_airflow(): """ - Make sure we get an exception if default_args does not contain a start_date + Make sure that the DAG owner of the constructed DAG is Flowminder. """ - default_args = {"owner": "bob"} task_callable_mapping = TEST_ETL_TASK_CALLABLES cdr_type = "spaghetti" - # pylint: disable=unused-variable - with pytest.raises(AirflowException): - dag = construct_etl_dag( - **task_callable_mapping, default_args=default_args, cdr_type=cdr_type - ) + dag = construct_etl_dag( + **task_callable_mapping, cdr_type=cdr_type, max_active_runs_per_dag=1 + ) + + assert dag.owner == "flowminder" -def test_construct_etl_dag_with_no_owner_defaults_to_airflow(): +def test_construct_etl_dag_sets_start_date_correctly(): """ - Make sure that if we pass no owner in default_args the owner is - Airflow. + Make sure that the start_date of the DAG is the expected date in the past. """ - default_args = {"start_date": parse("1900-01-01")} task_callable_mapping = TEST_ETL_TASK_CALLABLES cdr_type = "spaghetti" dag = construct_etl_dag( - **task_callable_mapping, default_args=default_args, cdr_type=cdr_type + **task_callable_mapping, cdr_type=cdr_type, max_active_runs_per_dag=1 ) - assert dag.owner == "airflow" + assert dag.start_date == parse("1900-01-01") -def test_construct_etl_dag_fails_with_bad_start_date(): +def test_construct_etl_dag_concurrency_setting(): """ - If the start_date is not a valid date we get an error + Make sure that the DAG's `max_active_runs` is set correctly. """ - default_args = {"owner": "bob", "start_date": "bob_time"} task_callable_mapping = TEST_ETL_TASK_CALLABLES cdr_type = "spaghetti" - # pylint: disable=unused-variable - with pytest.raises(ParserError): - dag = construct_etl_dag( - **task_callable_mapping, default_args=default_args, cdr_type=cdr_type - ) + dag = construct_etl_dag( + **task_callable_mapping, cdr_type=cdr_type, max_active_runs_per_dag=42 + ) + + assert dag.max_active_runs == 42 def test_construct_etl_dag_fails_with_incorrect_mapping_keys(): @@ -134,12 +126,11 @@ def test_construct_etl_dag_fails_with_incorrect_mapping_keys(): If the dictionary we pass to task_callable_mapping does not have correct keys we get a TypeError. """ - default_args = {"owner": "bob", "start_date": "bob_time"} task_callable_mapping = {} cdr_type = "spaghetti" # pylint: disable=unused-variable with pytest.raises(TypeError): dag = construct_etl_dag( - **task_callable_mapping, default_args=default_args, cdr_type=cdr_type + **task_callable_mapping, cdr_type=cdr_type, max_active_runs_per_dag=1 ) diff --git a/flowetl/etl/tests/test_construct_etl_sensor_dag.py b/flowetl/etl/tests/test_construct_etl_sensor_dag.py index 660b9263a2..cfa42b310f 100644 --- a/flowetl/etl/tests/test_construct_etl_sensor_dag.py +++ b/flowetl/etl/tests/test_construct_etl_sensor_dag.py @@ -3,7 +3,6 @@ # file, You can obtain one at http://mozilla.org/MPL/2.0/. # -*- coding: utf-8 -*- -from pendulum import parse from etl.etl_utils import construct_etl_sensor_dag from etl.dag_task_callable_mappings import ( TEST_ETL_SENSOR_TASK_CALLABLE, @@ -16,15 +15,11 @@ def test_construct_etl_sensor_dag_with_test_callable(): Make sure we get the python callables we expect when using construct_etl_sensor_dag with testing callables. """ - default_args = {"owner": "bob", "start_date": parse("1900-01-01")} task_callable = TEST_ETL_SENSOR_TASK_CALLABLE - - dag = construct_etl_sensor_dag(callable=task_callable, default_args=default_args) + dag = construct_etl_sensor_dag(callable=task_callable) assert dag.dag_id == f"etl_sensor" - assert len(dag.tasks) == 1 - assert dag.tasks[0].python_callable is task_callable @@ -33,13 +28,9 @@ def test_construct_etl_sensor_dag_with_production_callable(): Make sure we get the python callables we expect when using construct_etl_sensor_dag with production callables. """ - default_args = {"owner": "bob", "start_date": parse("1900-01-01")} task_callable = PRODUCTION_ETL_SENSOR_TASK_CALLABLE - - dag = construct_etl_sensor_dag(callable=task_callable, default_args=default_args) + dag = construct_etl_sensor_dag(callable=task_callable) assert dag.dag_id == f"etl_sensor" - assert len(dag.tasks) == 1 - assert dag.tasks[0].python_callable is task_callable diff --git a/flowetl/mounts/config/config.yml b/flowetl/mounts/config/config.yml index 39e8070382..407bb842ae 100644 --- a/flowetl/mounts/config/config.yml +++ b/flowetl/mounts/config/config.yml @@ -1,8 +1,5 @@ # This is an example flowetl config file which # is also used in the flowetl unit tests. -default_args: - owner: flowminder - start_date: '1900-01-01' etl: calls: concurrency: 4