Skip to content

Commit

Permalink
feat: test dependent on alldocs materialization
Browse files Browse the repository at this point in the history
towards #608
  • Loading branch information
dwinston committed Aug 8, 2024
1 parent 80daca1 commit 14f217f
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 3 deletions.
28 changes: 28 additions & 0 deletions nmdc_runtime/api/db/mongo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import gzip
import json
import os
from contextlib import AbstractContextManager
from functools import lru_cache
from typing import Set, Dict, Any, Iterable
from uuid import uuid4

import bson
from linkml_runtime import SchemaView
from nmdc_schema.get_nmdc_view import ViewGetter
from nmdc_schema.nmdc_data import get_nmdc_schema_definition
Expand Down Expand Up @@ -107,3 +110,28 @@ def mongodump_excluded_collections():
)
)
return excluded_collections


def mongorestore_collection(mdb, collection_name, bson_file_path):
with gzip.open(bson_file_path, "rb") as bson_file:
data = bson.decode_all(bson_file.read())
if data:
mdb.drop_collection(collection_name)
mdb[collection_name].insert_many(data)
print(
f"mongorestore_collection: {len(data)} documents into {collection_name} after drop"
)


def mongorestore_from_dir(mdb, dump_directory, skip_collections=None):
skip_collections = skip_collections or []
for root, dirs, files in os.walk(dump_directory):
for file in files:
if file.endswith(".bson.gz"):
collection_name = file.replace(".bson.gz", "")
if collection_name in skip_collections:
continue
bson_file_path = os.path.join(root, file)
mongorestore_collection(mdb, collection_name, bson_file_path)

print("mongorestore_from_dir completed successfully.")
74 changes: 71 additions & 3 deletions tests/test_api/test_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,83 @@
import json
import os
import re
import subprocess
import sys

import bson
import pytest
import requests
from dagster import build_op_context
from starlette import status
from tenacity import wait_random_exponential, retry
from toolz import get_in

from nmdc_runtime.api.core.auth import get_password_hash
from nmdc_runtime.api.core.metadata import df_from_sheet_in, _validate_changesheet
from nmdc_runtime.api.core.util import generate_secret, dotted_path_for
from nmdc_runtime.api.db.mongo import get_mongo_db
from nmdc_runtime.api.db.mongo import get_mongo_db, mongorestore_from_dir
from nmdc_runtime.api.endpoints.util import persist_content_and_get_drs_object
from nmdc_runtime.api.models.job import Job, JobOperationMetadata
from nmdc_runtime.api.models.metadata import ChangesheetIn
from nmdc_runtime.api.models.site import SiteInDB, SiteClientInDB
from nmdc_runtime.api.models.user import UserInDB, UserIn, User
from nmdc_runtime.site.ops import materialize_alldocs
from nmdc_runtime.site.repository import run_config_frozen__normal_env
from nmdc_runtime.site.resources import get_mongo, RuntimeApiSiteClient
from nmdc_runtime.util import REPO_ROOT_DIR
from nmdc_runtime.site.resources import get_mongo, RuntimeApiSiteClient, mongo_resource
from nmdc_runtime.util import REPO_ROOT_DIR, ensure_unique_id_indexes
from tests.test_util import download_and_extract_tar
from tests.test_ops.test_ops import op_context as test_op_context

TEST_MONGODUMPS_DIR = REPO_ROOT_DIR.joinpath("tests", "nmdcdb")
SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_BASENAME = (
"nmdc-prod-schema-collections__2024-07-29_20-12-07"
)
SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_URL = (
"https://portal.nersc.gov/cfs/m3408/meta/mongodumps/"
f"{SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_BASENAME}.tar"
) # 84MB. Should be < 100MB.


def ensure_local_mongodump():
dump_dir = TEST_MONGODUMPS_DIR.joinpath(
SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_BASENAME
)
if not os.path.exists(dump_dir):
download_and_extract_tar(
url=SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_URL, extract_to=TEST_MONGODUMPS_DIR
)
else:
print(f"local mongodump already exists at {TEST_MONGODUMPS_DIR}")
return dump_dir


def ensure_schema_collections_and_alldocs():
# Return if `alldocs` collection has already been materialized.
mdb = get_mongo_db()
if mdb.alldocs.estimated_document_count() > 0:
print(
"ensure_schema_collections_and_alldocs: `alldocs` collection already materialized"
)
return

dump_dir = ensure_local_mongodump()
mongorestore_from_dir(mdb, dump_dir, skip_collections=["functional_annotation_agg"])
ensure_unique_id_indexes(mdb)
print("materializing alldocs...")
materialize_alldocs(
build_op_context(
resources={
"mongo": mongo_resource.configured(
{
"dbname": os.getenv("MONGO_DBNAME"),
"host": os.getenv("MONGO_HOST"),
"password": os.getenv("MONGO_PASSWORD"),
"username": os.getenv("MONGO_USERNAME"),
}
)
}
)
)


def ensure_test_resources(mdb):
Expand Down Expand Up @@ -60,6 +118,7 @@ def ensure_test_resources(mdb):
{"id": job_id}, job.model_dump(exclude_unset=True), upsert=True
)
mdb["minter.requesters"].replace_one({"id": site_id}, {"id": site_id}, upsert=True)
ensure_schema_collections_and_alldocs()
return {
"site_client": {
"site_id": site_id,
Expand Down Expand Up @@ -313,3 +372,12 @@ def test_get_class_name_and_collection_names_by_doc_id():
"GET", f"{base_url}/nmdcschema/ids/{id_}/collection-name"
)
assert response.status_code == 404


def test_find_data_objects_for_study(api_site_client):
ensure_schema_collections_and_alldocs()
rv = api_site_client.request(
"GET",
"/data_objects/study/nmdc:sty-11-hdd4bf83",
)
assert len(rv.json()) >= 60
27 changes: 27 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import tarfile
from copy import deepcopy

import json
Expand All @@ -13,6 +15,7 @@
from nmdc_runtime.util import get_nmdc_jsonschema_dict
from pymongo.database import Database as MongoDatabase
from pymongo.write_concern import WriteConcern
import requests

from nmdc_runtime.site.repository import run_config_frozen__normal_env
from nmdc_runtime.site.resources import get_mongo
Expand Down Expand Up @@ -133,6 +136,30 @@ def test_multiple_errors():
print(validation_errors)


def download_and_extract_tar(url, extract_to="."):
# Download the file
response = requests.get(url, stream=True)
if response.status_code == 200:
tar_path = os.path.join(extract_to, "downloaded_file.tar")
with open(tar_path, "wb") as file:
chunk_size = 8192
print(f"Downloading tar file using stream {chunk_size=}")
for chunk in response.iter_content(chunk_size=chunk_size):
file.write(chunk)
print(f"Downloaded tar file to {tar_path}")

# Extract the tar file
with tarfile.open(tar_path, "r") as tar:
tar.extractall(path=extract_to)
print(f"Extracted tar file to {extract_to}")

# Optionally, remove the tar file after extraction
os.remove(tar_path)
print(f"Removed tar file {tar_path}")
else:
print(f"Failed to download file. Status code: {response.status_code}")


if __name__ == "__main__":
if len(sys.argv) > 1:
eval(f"{sys.argv[1]}()")
Expand Down

0 comments on commit 14f217f

Please sign in to comment.