Skip to content

Commit

Permalink
Updated integration tests for the file moving logic (#946)
Browse files Browse the repository at this point in the history
  • Loading branch information
danwilliams committed Jun 21, 2019
1 parent 3894733 commit 8083494
Show file tree
Hide file tree
Showing 2 changed files with 22 additions and 51 deletions.
46 changes: 13 additions & 33 deletions flowetl/tests/integration/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
import structlog
import pytest

from itertools import chain
from pathlib import Path
from time import sleep
from subprocess import DEVNULL, Popen
Expand Down Expand Up @@ -157,27 +156,14 @@ def mounts(postgres_data_dir_for_tests, flowetl_mounts_dir):
Various mount objects needed by containers
"""
config_mount = Mount("/mounts/config", f"{flowetl_mounts_dir}/config", type="bind")
archive_mount = Mount(
"/mounts/archive", f"{flowetl_mounts_dir}/archive", type="bind"
)
dump_mount = Mount("/mounts/dump", f"{flowetl_mounts_dir}/dump", type="bind")
ingest_mount = Mount("/mounts/ingest", f"{flowetl_mounts_dir}/ingest", type="bind")
quarantine_mount = Mount(
"/mounts/quarantine", f"{flowetl_mounts_dir}/quarantine", type="bind"
)
flowetl_mounts = [
config_mount,
archive_mount,
dump_mount,
ingest_mount,
quarantine_mount,
]
files_mount = Mount("/mounts/files", f"{flowetl_mounts_dir}/files", type="bind")
flowetl_mounts = [config_mount, files_mount]

data_mount = Mount(
"/var/lib/postgresql/data", postgres_data_dir_for_tests, type="bind"
)
ingest_mount = Mount("/ingest", f"{flowetl_mounts_dir}/ingest", type="bind")
flowdb_mounts = [data_mount, ingest_mount]
files_mount = Mount("/files", f"{flowetl_mounts_dir}/files", type="bind")
flowdb_mounts = [data_mount, files_mount]

return {"flowetl": flowetl_mounts, "flowdb": flowdb_mounts}

Expand Down Expand Up @@ -268,7 +254,7 @@ def flowetl_container(
"""
user = f"{os.getuid()}:{os.getgid()}"
container = docker_client.containers.run(
f"flowminder/flowetl:{container_tag}",
f"flowminder/flowetl:testing12344321",
environment=container_env["flowetl"],
name="flowetl",
network="testing",
Expand Down Expand Up @@ -304,27 +290,21 @@ def trigger_dags_function(*, flowetl_container):


@pytest.fixture(scope="function")
def write_files_to_dump(flowetl_mounts_dir):
def write_files_to_files(flowetl_mounts_dir):
"""
Returns a function that allows for writing a list
of empty files to the dump location. Also cleans
up dump, archive and quarantine.
of empty files to the files location. Also cleans
up the files location.
"""
dump_dir = f"{flowetl_mounts_dir}/dump"
archive_dir = f"{flowetl_mounts_dir}/archive"
quarantine_dir = f"{flowetl_mounts_dir}/quarantine"
files_dir = f"{flowetl_mounts_dir}/files"

def write_files_to_dump_function(*, file_names):
def write_files_to_files_function(*, file_names):
for file_name in file_names:
Path(f"{dump_dir}/{file_name}").touch()
Path(f"{files_dir}/{file_name}").touch()

yield write_files_to_dump_function
yield write_files_to_files_function

files_to_remove = chain(
Path(dump_dir).glob("*"),
Path(archive_dir).glob("*"),
Path(quarantine_dir).glob("*"),
)
files_to_remove = Path(files_dir).glob("*")
files_to_remove = filter(lambda file: file.name != "README.md", files_to_remove)

[file.unlink() for file in files_to_remove]
Expand Down
27 changes: 9 additions & 18 deletions flowetl/tests/integration/test_full_pipeline.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@

def test_single_file_previously_quarantined(
flowetl_container,
write_files_to_dump,
write_files_to_files,
trigger_dags,
wait_for_completion,
flowetl_db_session,
Expand All @@ -18,7 +18,7 @@ def test_single_file_previously_quarantined(
"""
Test for full pipeline. We want to test the following things;
1. Do files in the dump location get picked up?
1. Do files in the files location get picked up?
2. Do files that do not match a configuration pattern get ignored?
3. Do files (cdr_type, cdr_date pairs) that have a state of archive
in etl.etl_records get ignored?
Expand All @@ -29,7 +29,7 @@ def test_single_file_previously_quarantined(
6. Do child tables get created under the associated parent table in
the events schema?
"""
write_files_to_dump(
write_files_to_files(
file_names=[
"CALLS_20160101.csv.gz",
"CALLS_20160102.csv.gz",
Expand Down Expand Up @@ -76,27 +76,18 @@ def test_single_file_previously_quarantined(

# make sure files are where they should be

dump_files = ["CALLS_20160101.csv.gz", "bad_file.bad"] # should have been ignored
archive_files = [
all_files = [
"CALLS_20160101.csv.gz",
"bad_file.bad",
"CALLS_20160102.csv.gz",
"SMS_20160101.csv.gz",
"MDS_20160101.csv.gz",
"TOPUPS_20160101.csv.gz",
] # ingested so now in archive
] # all files

dump = [file.name for file in Path(f"{os.getcwd()}/mounts/dump").glob("*")]
archive = [file.name for file in Path(f"{os.getcwd()}/mounts/archive").glob("*")]
quarantine = [
file.name for file in Path(f"{os.getcwd()}/mounts/quarantine").glob("*")
]
ingest = [file.name for file in Path(f"{os.getcwd()}/mounts/ingest").glob("*")]
files = [file.name for file in Path(f"{os.getcwd()}/mounts/files").glob("*")]

assert set(dump_files) == (set(dump) - set(["README.md"]))
assert set(archive_files) == (set(archive) - set(["README.md"]))

# quarantine and ingest should be empty
assert set() == (set(quarantine) - set(["README.md"]))
assert set() == (set(ingest) - set(["README.md"]))
assert set(all_files) == (set(files) - set(["README.md"]))

# make sure tables expected exist in flowdb
connection, _ = flowdb_connection
Expand Down

0 comments on commit 8083494

Please sign in to comment.