Skip to content

Commit

Permalink
Support a flat ndjson dir layout as well as a hierarchical one
Browse files Browse the repository at this point in the history
Previously, --load-ndjson-dir would only look for an ETL output-style
format like this:

dir/condition/*.ndjson
dir/patient/*.ndjson

But now it will also look for flat files as well (i.e. ETL input-style
format) like this:

dir/1.Patient.ndjson
dir/Patient.october.ndjson
dir/Patient.ndjson

This will make it nicer to use the --load-ndjson-dir flow when you are
working on ndjson files directly, without going through Cumulus ETL
first.
  • Loading branch information
mikix committed May 29, 2024
1 parent 031bbd8 commit 08cc963
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 13 deletions.
42 changes: 29 additions & 13 deletions cumulus_library/databases.py
Original file line number Diff line number Diff line change
Expand Up @@ -515,21 +515,37 @@ def parse_found_schema(self, schema: dict[str, str]) -> dict:
return parsed


def _read_rows_from_table_dir(path: str) -> list[dict]:
# Grab filenames to load (ignoring .meta files and handling missing folders)
folder = Path(path)
filenames = []
if folder.exists():
filenames = sorted(
str(x) for x in folder.iterdir() if x.name.endswith(".ndjson")
)

# Read all ndjson directly into memory
def _read_rows_from_files(filenames: list[str]) -> list[dict]:
"""Reads all provided ndjson files directly into memory"""
rows = []
for filename in filenames:
for filename in sorted(filenames):
with open(filename, encoding="utf8") as f:
for line in f:
rows.append(json.loads(line))
return rows


def _read_rows_from_table_dir(path: Path) -> list[dict]:
"""Grab ndjson files in the Cumulus ETL output format: path/tablename/*.ndjson"""
if not path.exists():
return []

filenames = [str(x) for x in path.iterdir() if x.name.endswith(".ndjson")]
return _read_rows_from_files(filenames)


def _read_rows_for_resource(path: Path, resource: str) -> list[dict]:
rows = []

# Grab any ndjson files in Cumulus ETL input format: path/*.Resource.*.ndjson
if path.exists():
# This pattern is copied from the ETL, allowing a suffix or a numbered prefix.
pattern = re.compile(rf"([0-9]+\.)?{resource}(\.[^/]+)?\.ndjson")
filenames = [str(x) for x in path.iterdir() if pattern.match(x.name)]
rows += _read_rows_from_files(filenames)

# Also grab any ndjson files in Cumulus ETL output format
rows += _read_rows_from_table_dir(path / resource.lower())

return rows

Expand Down Expand Up @@ -562,7 +578,7 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
]
for resource in resources:
table_name = resource.lower()
rows = _read_rows_from_table_dir(f"{path}/{table_name}")
rows = _read_rows_for_resource(Path(path), resource)

# Make a pyarrow table with full schema from the data
schema = cumulus_fhir_support.pyarrow_schema_from_rows(resource, rows)
Expand All @@ -574,7 +590,7 @@ def read_ndjson_dir(path: str) -> dict[str, pyarrow.Table]:
"etl__completion_encounters",
]
for metadata_table in metadata_tables:
rows = _read_rows_from_table_dir(f"{path}/{metadata_table}")
rows = _read_rows_from_table_dir(Path(f"{path}/{metadata_table}"))
if rows:
# Auto-detecting the schema works for these simple tables
all_tables[metadata_table] = pyarrow.Table.from_pylist(rows)
Expand Down
35 changes: 35 additions & 0 deletions tests/test_duckdb.py
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,41 @@ def test_duckdb_from_iso8601_timestamp(timestamp, expected):
assert parsed == expected


def test_duckdb_load_ndjson_dir(tmp_path):
filenames = {
"A.Patient.ndjson": False,
"1.Patient.ndjson": True,
"Patient.ndjson": True,
"patient.ndjson": False,
"Patient.hello.bye.ndjson": True,
"Patient.nope": False,
"patient/blarg.ndjson": True,
"patient/blarg.meta": False,
}
os.mkdir(f"{tmp_path}/patient")
for index, (filename, valid) in enumerate(filenames.items()):
with open(f"{tmp_path}/{filename}", "w", encoding="utf8") as f:
row_id = f"Good{index}" if valid else f"Bad{index}"
f.write(f'{{"id":"{row_id}"}}')

db = databases.create_db_backend(
{
"db_type": "duckdb",
"schema_name": ":memory:",
"load_ndjson_dir": tmp_path,
}
)

expected_good_count = len({f for f, v in filenames.items() if v})
found_ids = {
row[0] for row in db.cursor().execute(f"select id from patient").fetchall()
}
found_good = {row_id for row_id in found_ids if row_id.startswith("Good")}
found_bad = found_ids - found_good
assert len(found_good) == expected_good_count
assert len(found_bad) == 0


def test_duckdb_table_schema():
"""Verify we can detect schemas correctly, even for nested camel case fields"""
db = databases.DuckDatabaseBackend(":memory:")
Expand Down

0 comments on commit 08cc963

Please sign in to comment.