Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

not all schema collections suffix with _sets #620

Merged
merged 4 commits into from
Aug 8, 2024
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions nmdc_runtime/api/db/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def nmdc_schema_collection_names(mdb: MongoDatabase) -> Set[str]:
return {name for name in names if mdb[name].estimated_document_count() > 0}


@lru_cache
def get_collection_names_from_schema() -> list[str]:
"""
Returns the names of the slots of the `Database` class that describe database collections.
Expand Down
41 changes: 28 additions & 13 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
RuntimeApiSiteClient,
RuntimeApiUserClient,
NeonApiClient,
MongoDB as MongoDBResource,
)
from nmdc_runtime.site.translation.gold_translator import GoldStudyTranslator
from nmdc_runtime.site.translation.neon_soil_translator import NeonSoilDataTranslator
Expand All @@ -87,7 +88,7 @@
from nmdc_runtime.site.translation.submission_portal_translator import (
SubmissionPortalTranslator,
)
from nmdc_runtime.site.util import collection_indexed_on_id, run_and_log
from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id
from nmdc_runtime.util import (
drs_object_in_for,
pluralize,
Expand Down Expand Up @@ -527,29 +528,43 @@ def perform_mongo_updates(context, json_in):
if rv["result"] == "errors":
raise Failure(str(rv["detail"]))

coll_has_id_index = collection_indexed_on_id(mongo.db)
if all(coll_has_id_index[coll] for coll in docs.keys()):
add_docs_result = _add_schema_docs_with_or_without_replacement()
op_patch = UpdateOperationRequest(
done=True,
result=add_docs_result,
metadata={"done_at": datetime.now(timezone.utc).isoformat(timespec="seconds")},
)
op_doc = client.update_operation(op_id, op_patch).json()
return ["/operations/" + op_doc["id"]]


def _add_schema_docs_with_or_without_replacement(
mongo: MongoDBResource, docs: Dict[str, list]
):
coll_index_on_id_map = schema_collection_has_index_on_id(mongo.db)
if all(coll_index_on_id_map[coll] for coll in docs.keys()):
replace = True
elif all(not coll_has_id_index[coll] for coll in docs.keys()):
elif all(not coll_index_on_id_map[coll] for coll in docs.keys()):
# XXX: This is a hack because e.g. <https://w3id.org/nmdc/FunctionalAnnotationAggMember>
# documents should be unique with compound key (metagenome_annotation_id, gene_function_id)
# and yet this is not explicit in the schema. One potential solution is to auto-generate an `id`
# as a deterministic hash of the compound key.
#
# For now, decision is to potentially re-insert "duplicate" documents, i.e. to interpret
# lack of `id` as lack of unique document identity for de-duplication.
replace = False # wasting time trying to upsert by `id`.
else:
colls_not_id_indexed = [
coll for coll in docs.keys() if not coll_has_id_index[coll]
coll for coll in docs.keys() if not coll_index_on_id_map[coll]
]
colls_id_indexed = [coll for coll in docs.keys() if coll_has_id_index[coll]]
colls_id_indexed = [coll for coll in docs.keys() if coll_index_on_id_map[coll]]
raise Failure(
"Simultaneous addition of non-`id`ed collections and `id`-ed collections"
" is not supported at this time."
f"{colls_not_id_indexed=} ; {colls_id_indexed=}"
)
op_result = mongo.add_docs(docs, validate=False, replace=replace)
op_patch = UpdateOperationRequest(
done=True,
result=mongo_add_docs_result_as_dict(op_result),
metadata={"done_at": datetime.now(timezone.utc).isoformat(timespec="seconds")},
)
op_doc = client.update_operation(op_id, op_patch).json()
return ["/operations/" + op_doc["id"]]
return mongo_add_docs_result_as_dict(op_result)


@op(required_resource_keys={"mongo"})
Expand Down
12 changes: 7 additions & 5 deletions nmdc_runtime/site/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pymongo.database import Database as MongoDatabase

from nmdc_runtime.api.db.mongo import get_collection_names_from_schema
from nmdc_runtime.site.resources import mongo_resource

mode_test = {
Expand Down Expand Up @@ -34,12 +35,13 @@ def run_and_log(shell_cmd, context):


@lru_cache
def collection_indexed_on_id(mdb: MongoDatabase) -> dict:
set_collection_names = [
name for name in mdb.list_collection_names() if name.endswith("_set")
]
def schema_collection_has_index_on_id(mdb: MongoDatabase) -> dict:
present_collection_names = set(mdb.list_collection_names())
return {
name: ("id_1" in mdb[name].index_information()) for name in set_collection_names
name: (
name in present_collection_names and "id_1" in mdb[name].index_information()
)
for name in get_collection_names_from_schema()
}


Expand Down
8 changes: 0 additions & 8 deletions tests/test_ops/test_hello.py

This file was deleted.

79 changes: 79 additions & 0 deletions tests/test_ops/test_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import json
import os

import pytest
from dagster import build_op_context

from nmdc_runtime.api.endpoints.util import persist_content_and_get_drs_object
from nmdc_runtime.site.resources import (
mongo_resource,
runtime_api_site_client_resource,
RuntimeApiSiteClient,
runtime_api_user_client_resource,
RuntimeApiUserClient,
)

from nmdc_runtime.site.ops import (
perform_mongo_updates,
_add_schema_docs_with_or_without_replacement,
)


@pytest.fixture
def op_context():
return build_op_context(
resources={
"mongo": mongo_resource.configured(
{
"dbname": os.getenv("MONGO_DBNAME"),
"host": os.getenv("MONGO_HOST"),
"password": os.getenv("MONGO_PASSWORD"),
"username": os.getenv("MONGO_USERNAME"),
}
),
"runtime_api_user_client": runtime_api_user_client_resource.configured(
{
"base_url": os.getenv("API_HOST"),
"username": os.getenv("API_ADMIN_USER"),
"password": os.getenv("API_ADMIN_PASS"),
},
),
"runtime_api_site_client": runtime_api_site_client_resource.configured(
{
"base_url": os.getenv("API_HOST"),
"site_id": os.getenv("API_SITE_ID"),
"client_id": os.getenv("API_SITE_CLIENT_ID"),
"client_secret": os.getenv("API_SITE_CLIENT_SECRET"),
}
),
}
)


def test_perform_mongo_updates_functional_annotation_agg(op_context):
mongo = op_context.resources.mongo
docs = {
"functional_annotation_agg": [
{
"metagenome_annotation_id": "nmdc:wfmtan-13-hemh0a82.1",
"gene_function_id": "KEGG.ORTHOLOGY:K00005",
"count": 10,
},
{
"metagenome_annotation_id": "nmdc:wfmtan-13-hemh0a82.1",
"gene_function_id": "KEGG.ORTHOLOGY:K01426",
"count": 5,
},
]
}
# Ensure the docs are not already in the test database.
for doc_spec in docs["functional_annotation_agg"]:
mongo.db.functional_annotation_agg.delete_many(doc_spec)

_add_schema_docs_with_or_without_replacement(mongo, docs)
assert (
mongo.db.functional_annotation_agg.count_documents(
{"$or": docs["functional_annotation_agg"]}
)
== 2
)
Loading