Skip to content

Commit

Permalink
Merge pull request #1380 from Flowminder/flowetl-refactoring
Browse files Browse the repository at this point in the history
Flowetl refactoring & bug fixes
  • Loading branch information
mergify[bot] authored Oct 9, 2019
2 parents faada91 + 81106f3 commit 587a79f
Show file tree
Hide file tree
Showing 11 changed files with 175 additions and 122 deletions.
9 changes: 8 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).

### Fixed

- The FlowETL config file is now always validated, avoiding runtime errors if a config setting is wrong or missing. [#1375](https://github.com/Flowminder/FlowKit/issues/1375)
- FlowETL now only creates DAGs for CDR types which are present in the config, leading to a better user experience in the Airflow UI. [#1376](https://github.com/Flowminder/FlowKit/issues/1376)
- The `concurrency` settings in the FlowETL config are no longer ignored. [#1378](https://github.com/Flowminder/FlowKit/issues/1378)
- The FlowETL deployment example has been updated so that it no longer fails due to a missing foreign data wrapper for the available CDR dates. [#1379](https://github.com/Flowminder/FlowKit/issues/1379)

### Removed

- The `default_args` section in the FlowETL config file has been removed. [#1377](https://github.com/Flowminder/FlowKit/issues/1377)


## [0.9.0]

Expand Down Expand Up @@ -45,11 +52,11 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/).
### Fixed
- Quickstart will no longer fail if it has been run previously with a different FlowDB data size and not explicitly shut down. [#900](https://github.com/Flowminder/FlowKit/issues/900)


### Removed
- Flowmachine's `subscriber_locations_cluster` function has been removed - use `HartiganCluster` or `MeaningfulLocations` directly.
- FlowAPI no longer supports the non-reproducible random sampling method `system_rows`. [#1263](https://github.com/Flowminder/FlowKit/issues/1263)


## [0.8.0]

### Added
Expand Down
42 changes: 17 additions & 25 deletions flowetl/dags/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,25 +9,18 @@
import os
import structlog

from pathlib import Path

# need to import and not use so that airflow looks here for a DAG
# Need to import the DAG class (even if it is not directly
# used in this file) so that Airflow looks here for a DAG.
from airflow import DAG # pylint: disable=unused-import

from pendulum import parse
from etl.dag_task_callable_mappings import (
TEST_ETL_TASK_CALLABLES,
PRODUCTION_ETL_TASK_CALLABLES,
)
from etl.etl_utils import construct_etl_dag, CDRType
from etl.config_parser import (
get_config_from_file,
validate_config,
fill_config_default_values,
)
from etl.config_parser import get_config_from_file

logger = structlog.get_logger("flowetl")
default_args = {"owner": "flowminder", "start_date": parse("1900-01-01")}

ETL_TASK_CALLABLES = {
"testing": TEST_ETL_TASK_CALLABLES,
Expand All @@ -39,33 +32,32 @@
# Determine if we are in a testing environment - use dummy callables if so
if flowetl_runtime_config == "testing":
task_callable_mapping = TEST_ETL_TASK_CALLABLES
logger.info("running in testing environment")
logger.info("Running in testing environment")

dag = construct_etl_dag(
**task_callable_mapping, default_args=default_args, cdr_type="testing"
**task_callable_mapping, cdr_type="testing", max_active_runs_per_dag=1
)
elif flowetl_runtime_config == "production":
task_callable_mapping = PRODUCTION_ETL_TASK_CALLABLES
logger.info("running in production environment")
logger.info("Running in production environment")

# read and validate the config file before creating the DAGs
global_config_dict = get_config_from_file(
config_filepath=Path("/mounts/config/config.yml")
)
validate_config(global_config_dict)
global_config_dict = fill_config_default_values(global_config_dict)

default_args = global_config_dict["default_args"]
global_config_dict = get_config_from_file("/mounts/config/config.yml")

# create DAG for each cdr_type
# Create DAG for each cdr_type occurring in the config
for cdr_type in CDRType:
# Ensure `cdr_type` is a string (e.g. "sms", instead of the raw value `CDRType.SMS`)
# so that interpolation in SQL templates works as expected.
cdr_type = cdr_type.value
cdr_type = CDRType(cdr_type).value

globals()[f"etl_{cdr_type}"] = construct_etl_dag(
**task_callable_mapping, default_args=default_args, cdr_type=cdr_type
)
# Only process CDR types that are actually specified in the config
if cdr_type in global_config_dict["etl"]:
max_active_runs_per_dag = global_config_dict["etl"][cdr_type]["concurrency"]
globals()[f"etl_{cdr_type}"] = construct_etl_dag(
**task_callable_mapping,
cdr_type=cdr_type,
max_active_runs_per_dag=max_active_runs_per_dag,
)
else:
raise ValueError(
f"Invalid config name: '{flowetl_runtime_config}'. "
Expand Down
14 changes: 4 additions & 10 deletions flowetl/dags/etl_sensor.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,21 +6,16 @@
import os
import structlog

# need to import and not use so that airflow looks here for a DAG
# Need to import the DAG class (even if it is not directly
# used in this file) so that Airflow looks here for a DAG.
from airflow import DAG # pylint: disable=unused-import

from pendulum import parse

from etl.etl_utils import construct_etl_sensor_dag
from etl.dag_task_callable_mappings import (
TEST_ETL_SENSOR_TASK_CALLABLE,
PRODUCTION_ETL_SENSOR_TASK_CALLABLE,
)

logger = structlog.get_logger("flowetl")

default_args = {"owner": "flowminder", "start_date": parse("1900-01-01")}

ETL_SENSOR_TASK_CALLABLES = {
"testing": TEST_ETL_SENSOR_TASK_CALLABLE,
"production": PRODUCTION_ETL_SENSOR_TASK_CALLABLE,
Expand All @@ -36,7 +31,6 @@
f"Valid config names are: {list(ETL_SENSOR_TASK_CALLABLES.keys())}"
)

logger = structlog.get_logger("flowetl")
logger.info(f"Running in {flowetl_runtime_config} environment")
dag = construct_etl_sensor_dag(
callable=etl_sensor_task_callable, default_args=default_args
)
dag = construct_etl_sensor_dag(callable=etl_sensor_task_callable)
70 changes: 68 additions & 2 deletions flowetl/deployment_example/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,50 @@ ingestion_db=# SELECT * FROM events.cdr LIMIT 3;
(3 rows)
```

### Create a view to determine available dates in IngestionDB

FlowETL needs to be able to determine which dates are available for ingestion.
A convenient way to do this is to create a view in the ingestion database which lists all available dates.
Since our ingestion database is based on [TimescaleDB](https://docs.timescale.com/latest/main) and the table `events.cdr`
is a [hypertable](https://docs.timescale.com/latest/using-timescaledb/hypertables) with chunk size of 1 day, we can
extract this information from the chunk ranges. Further down we will

Connect to the ingestion database (`make connect-ingestion_db`) and then run the following SQL snippet
to create the `available_dates view.
```
CREATE VIEW available_dates AS (
WITH chunk_ranges_as_strings AS (
SELECT unnest(ranges) as ranges
FROM chunk_relation_size_pretty('events.cdr')
),
chunk_ranges AS (
SELECT
substring(ranges, 2, 24)::TIMESTAMPTZ as chunk_start,
substring(ranges, 25, 24)::TIMESTAMPTZ as chunk_end
FROM chunk_ranges_as_strings
)
SELECT
chunk_start::DATE as cdr_date
FROM chunk_ranges
ORDER BY cdr_date
);
```

Let's confirm that it works as expected.
```
ingestion_db=# SELECT * FROM available_dates;
cdr_date
------------
2019-09-02
2019-09-03
2019-09-04
2019-09-05
2019-09-06
2019-09-07
2019-09-08
(7 rows)
```

### Set up a docker stack with FlowDB and FlowETL

Set relevant environment variables for FlowDB and FlowETL.
Expand Down Expand Up @@ -166,7 +210,7 @@ This is useful if you make changes to the source code, in which case you need to
for these changes to be picked up.


### Create a foreign data wrapper to connect FlowDB to IngestionDB
### Create foreign data wrappers to connect FlowDB to IngestionDB

Run the following from within `flowdb` (you can connect to flowdb by running `make connect-flowdb`).
```
Expand All @@ -187,6 +231,12 @@ CREATE USER MAPPING IF NOT EXISTS FOR flowdb
password 'etletl'
);
CREATE FOREIGN TABLE sample_data_available_dates_fdw (
cdr_date DATE
)
SERVER ingestion_db_server
OPTIONS (table_name 'available_dates');
CREATE FOREIGN TABLE sample_data_fdw (
event_time TIMESTAMPTZ,
msisdn TEXT,
Expand All @@ -196,8 +246,24 @@ CREATE FOREIGN TABLE sample_data_fdw (
OPTIONS (schema_name 'events', table_name 'cdr');
```

Let's verify that the foreign data wrapper was set up correctly, so that `flowdb` can now read data remotely from `ingestion_db`:
This creates two foreign data wrappers within `flowdb`. The first one (`sample_data_available_dates_fdw`) allows
to determine the dates for which CDR data is available, so that FlowETL can schedule ingestion for any unprocessed dates.
The second one (`sample_data_fdw`) wraps the actual data itself and acts as the "source" in the ETL pipeline.

Let's verify that these were set up correctly, so that `flowdb` can now read data remotely from `ingestion_db`:
```
flowdb=# SELECT * FROM sample_data_available_dates_fdw;
cdr_date
------------
2019-09-02
2019-09-03
2019-09-04
2019-09-05
2019-09-06
2019-09-07
2019-09-08
(7 rows)
flowdb=# SELECT * FROM sample_data_fdw LIMIT 3;
event_time | msisdn | cell_id
-------------------------------+------------------------------------------------------------------+---------
Expand Down
3 changes: 0 additions & 3 deletions flowetl/deployment_example/mounts/config/config.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,3 @@
default_args:
owner: flowminder
start_date: '1900-01-01'
etl:
calls:
concurrency: 4
Expand Down
19 changes: 11 additions & 8 deletions flowetl/etl/etl/config_parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

from copy import deepcopy
from pathlib import Path
from typing import Union

from etl.etl_utils import CDRType

Expand All @@ -32,11 +33,6 @@ def validate_config(global_config_dict: dict) -> None:
if "etl" not in keys:
exceptions.append(ValueError("etl must be a toplevel key in the config file"))

if "default_args" not in keys:
exceptions.append(
ValueError("default_args must be a toplevel key in the config file")
)

etl_keys = global_config_dict.get("etl", {}).keys()
if not set(etl_keys).issubset(CDRType):
unexpected_keys = list(set(etl_keys).difference(CDRType))
Expand Down Expand Up @@ -112,19 +108,26 @@ def fill_config_default_values(global_config_dict: dict) -> dict:
return global_config_dict


def get_config_from_file(*, config_filepath: Path) -> dict:
def get_config_from_file(config_filepath: Union[Path, str]) -> dict:
"""
Function used to load configuration from YAML file.
This also validates the structure of the config and
fills any optional settings with default values.
Parameters
----------
config_filepath : Path
config_filepath : Path or str
Location of the file config.yml
Returns
-------
dict
Yaml config loaded into a python dict
"""
# Ensure config_filepath is actually a Path object (e.g. in case a string is passed)
config_filepath = Path(config_filepath)

content = config_filepath.open("r").read()
return yaml.load(content, Loader=yaml.SafeLoader)
config_dict = yaml.load(content, Loader=yaml.SafeLoader)
validate_config(config_dict)
return fill_config_default_values(config_dict)
28 changes: 18 additions & 10 deletions flowetl/etl/etl/etl_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@
import os
import pendulum
import re
import sqlalchemy

from typing import List, Callable
from enum import Enum
Expand All @@ -23,7 +22,7 @@
from airflow.operators.python_operator import PythonOperator


def construct_etl_sensor_dag(*, callable: Callable, default_args: dict) -> DAG:
def construct_etl_sensor_dag(*, callable: Callable) -> DAG:
"""
This function constructs the sensor single task DAG that triggers ETL
DAGS with correct config based on filename.
Expand All @@ -33,16 +32,21 @@ def construct_etl_sensor_dag(*, callable: Callable, default_args: dict) -> DAG:
callable : Callable
The sense callable that deals with finding files and triggering
ETL DAGs
default_args : dict
Default arguments for DAG
Returns
-------
DAG
Airflow DAG
"""
default_args = {"owner": "flowminder"}

# Note: the `start_date` parameter needs to be set to a date in the past,
# otherwise Airflow won't run the DAG when it is triggered.
with DAG(
dag_id=f"etl_sensor", schedule_interval=None, default_args=default_args
dag_id=f"etl_sensor",
start_date=pendulum.parse("1900-01-01"),
schedule_interval=None,
default_args=default_args,
) as dag:
sense = PythonOperator(
task_id="sense", python_callable=callable, provide_context=True
Expand All @@ -63,8 +67,8 @@ def construct_etl_dag(
quarantine: Callable,
clean: Callable,
fail: Callable,
default_args: dict,
cdr_type: str,
max_active_runs_per_dag: int,
config_path: str = "/mounts/config",
) -> DAG:
"""
Expand Down Expand Up @@ -94,12 +98,10 @@ def construct_etl_dag(
The clean task callable.
fail : Callable
The fail task callable.
default_args : dict
A set of default args to pass to all callables.
Must containt at least "owner" key and "start" key (which must be a
pendulum date object)
cdr_type : str
The type of CDR that this ETL DAG will process.
max_active_runs_per_dag : int
The maximum number of active DAG runs per DAG.
config_path : str
The config path used to look for the sql templates.
Expand All @@ -109,9 +111,15 @@ def construct_etl_dag(
Specification of Airflow DAG for ETL
"""

default_args = {"owner": "flowminder"}

# Note: the `start_date` parameter needs to be set to a date in the past,
# otherwise Airflow won't run the DAG when it is triggered.
with DAG(
dag_id=f"etl_{cdr_type}",
start_date=pendulum.parse("1900-01-01"),
schedule_interval=None,
max_active_runs=max_active_runs_per_dag,
default_args=default_args,
template_searchpath=config_path, # template paths will be relative to this
user_defined_macros={
Expand Down
Loading

0 comments on commit 587a79f

Please sign in to comment.