From 808349441bfeb71ccb66bc4be9baa19ca6db027e Mon Sep 17 00:00:00 2001 From: Dan Williams Date: Fri, 21 Jun 2019 10:37:31 +0100 Subject: [PATCH] Updated integration tests for the file moving logic (#946) --- flowetl/tests/integration/conftest.py | 46 ++++++------------- .../tests/integration/test_full_pipeline.py | 27 ++++------- 2 files changed, 22 insertions(+), 51 deletions(-) diff --git a/flowetl/tests/integration/conftest.py b/flowetl/tests/integration/conftest.py index 5597d5b6b8..693e5420db 100644 --- a/flowetl/tests/integration/conftest.py +++ b/flowetl/tests/integration/conftest.py @@ -12,7 +12,6 @@ import structlog import pytest -from itertools import chain from pathlib import Path from time import sleep from subprocess import DEVNULL, Popen @@ -157,27 +156,14 @@ def mounts(postgres_data_dir_for_tests, flowetl_mounts_dir): Various mount objects needed by containers """ config_mount = Mount("/mounts/config", f"{flowetl_mounts_dir}/config", type="bind") - archive_mount = Mount( - "/mounts/archive", f"{flowetl_mounts_dir}/archive", type="bind" - ) - dump_mount = Mount("/mounts/dump", f"{flowetl_mounts_dir}/dump", type="bind") - ingest_mount = Mount("/mounts/ingest", f"{flowetl_mounts_dir}/ingest", type="bind") - quarantine_mount = Mount( - "/mounts/quarantine", f"{flowetl_mounts_dir}/quarantine", type="bind" - ) - flowetl_mounts = [ - config_mount, - archive_mount, - dump_mount, - ingest_mount, - quarantine_mount, - ] + files_mount = Mount("/mounts/files", f"{flowetl_mounts_dir}/files", type="bind") + flowetl_mounts = [config_mount, files_mount] data_mount = Mount( "/var/lib/postgresql/data", postgres_data_dir_for_tests, type="bind" ) - ingest_mount = Mount("/ingest", f"{flowetl_mounts_dir}/ingest", type="bind") - flowdb_mounts = [data_mount, ingest_mount] + files_mount = Mount("/files", f"{flowetl_mounts_dir}/files", type="bind") + flowdb_mounts = [data_mount, files_mount] return {"flowetl": flowetl_mounts, "flowdb": flowdb_mounts} @@ -268,7 +254,7 @@ def flowetl_container( """ user = f"{os.getuid()}:{os.getgid()}" container = docker_client.containers.run( - f"flowminder/flowetl:{container_tag}", + f"flowminder/flowetl:testing12344321", environment=container_env["flowetl"], name="flowetl", network="testing", @@ -304,27 +290,21 @@ def trigger_dags_function(*, flowetl_container): @pytest.fixture(scope="function") -def write_files_to_dump(flowetl_mounts_dir): +def write_files_to_files(flowetl_mounts_dir): """ Returns a function that allows for writing a list - of empty files to the dump location. Also cleans - up dump, archive and quarantine. + of empty files to the files location. Also cleans + up the files location. """ - dump_dir = f"{flowetl_mounts_dir}/dump" - archive_dir = f"{flowetl_mounts_dir}/archive" - quarantine_dir = f"{flowetl_mounts_dir}/quarantine" + files_dir = f"{flowetl_mounts_dir}/files" - def write_files_to_dump_function(*, file_names): + def write_files_to_files_function(*, file_names): for file_name in file_names: - Path(f"{dump_dir}/{file_name}").touch() + Path(f"{files_dir}/{file_name}").touch() - yield write_files_to_dump_function + yield write_files_to_files_function - files_to_remove = chain( - Path(dump_dir).glob("*"), - Path(archive_dir).glob("*"), - Path(quarantine_dir).glob("*"), - ) + files_to_remove = Path(files_dir).glob("*") files_to_remove = filter(lambda file: file.name != "README.md", files_to_remove) [file.unlink() for file in files_to_remove] diff --git a/flowetl/tests/integration/test_full_pipeline.py b/flowetl/tests/integration/test_full_pipeline.py index a276c576f7..08474a9cd5 100644 --- a/flowetl/tests/integration/test_full_pipeline.py +++ b/flowetl/tests/integration/test_full_pipeline.py @@ -8,7 +8,7 @@ def test_single_file_previously_quarantined( flowetl_container, - write_files_to_dump, + write_files_to_files, trigger_dags, wait_for_completion, flowetl_db_session, @@ -18,7 +18,7 @@ def test_single_file_previously_quarantined( """ Test for full pipeline. We want to test the following things; - 1. Do files in the dump location get picked up? + 1. Do files in the files location get picked up? 2. Do files that do not match a configuration pattern get ignored? 3. Do files (cdr_type, cdr_date pairs) that have a state of archive in etl.etl_records get ignored? @@ -29,7 +29,7 @@ def test_single_file_previously_quarantined( 6. Do child tables get created under the associated parent table in the events schema? """ - write_files_to_dump( + write_files_to_files( file_names=[ "CALLS_20160101.csv.gz", "CALLS_20160102.csv.gz", @@ -76,27 +76,18 @@ def test_single_file_previously_quarantined( # make sure files are where they should be - dump_files = ["CALLS_20160101.csv.gz", "bad_file.bad"] # should have been ignored - archive_files = [ + all_files = [ + "CALLS_20160101.csv.gz", + "bad_file.bad", "CALLS_20160102.csv.gz", "SMS_20160101.csv.gz", "MDS_20160101.csv.gz", "TOPUPS_20160101.csv.gz", - ] # ingested so now in archive + ] # all files - dump = [file.name for file in Path(f"{os.getcwd()}/mounts/dump").glob("*")] - archive = [file.name for file in Path(f"{os.getcwd()}/mounts/archive").glob("*")] - quarantine = [ - file.name for file in Path(f"{os.getcwd()}/mounts/quarantine").glob("*") - ] - ingest = [file.name for file in Path(f"{os.getcwd()}/mounts/ingest").glob("*")] + files = [file.name for file in Path(f"{os.getcwd()}/mounts/files").glob("*")] - assert set(dump_files) == (set(dump) - set(["README.md"])) - assert set(archive_files) == (set(archive) - set(["README.md"])) - - # quarantine and ingest should be empty - assert set() == (set(quarantine) - set(["README.md"])) - assert set() == (set(ingest) - set(["README.md"])) + assert set(all_files) == (set(files) - set(["README.md"])) # make sure tables expected exist in flowdb connection, _ = flowdb_connection