Skip to content

Commit

Permalink
not all schema collections suffix with _sets (#620)
Browse files Browse the repository at this point in the history
* fix: allow "update" of non-`id`-having document collections

interpret as simple insertion. leave note in code about decision to insist on schema-supplied uniqueness signal.

fix #611

* refactor to add test

* fix: rm abandoned candidate test

* Update nmdc_runtime/site/ops.py

Co-authored-by: eecavanna <[email protected]>

---------

Co-authored-by: eecavanna <[email protected]>
  • Loading branch information
dwinston and eecavanna committed Aug 8, 2024
1 parent 41de33f commit 80daca1
Show file tree
Hide file tree
Showing 5 changed files with 115 additions and 26 deletions.
1 change: 1 addition & 0 deletions nmdc_runtime/api/db/mongo.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ def nmdc_schema_collection_names(mdb: MongoDatabase) -> Set[str]:
return {name for name in names if mdb[name].estimated_document_count() > 0}


@lru_cache
def get_collection_names_from_schema() -> list[str]:
"""
Returns the names of the slots of the `Database` class that describe database collections.
Expand Down
41 changes: 28 additions & 13 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@
RuntimeApiSiteClient,
RuntimeApiUserClient,
NeonApiClient,
MongoDB as MongoDBResource,
)
from nmdc_runtime.site.translation.gold_translator import GoldStudyTranslator
from nmdc_runtime.site.translation.neon_soil_translator import NeonSoilDataTranslator
Expand All @@ -87,7 +88,7 @@
from nmdc_runtime.site.translation.submission_portal_translator import (
SubmissionPortalTranslator,
)
from nmdc_runtime.site.util import collection_indexed_on_id, run_and_log
from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id
from nmdc_runtime.util import (
drs_object_in_for,
pluralize,
Expand Down Expand Up @@ -527,29 +528,43 @@ def perform_mongo_updates(context, json_in):
if rv["result"] == "errors":
raise Failure(str(rv["detail"]))

coll_has_id_index = collection_indexed_on_id(mongo.db)
if all(coll_has_id_index[coll] for coll in docs.keys()):
add_docs_result = _add_schema_docs_with_or_without_replacement()
op_patch = UpdateOperationRequest(
done=True,
result=add_docs_result,
metadata={"done_at": datetime.now(timezone.utc).isoformat(timespec="seconds")},
)
op_doc = client.update_operation(op_id, op_patch).json()
return ["/operations/" + op_doc["id"]]


def _add_schema_docs_with_or_without_replacement(
mongo: MongoDBResource, docs: Dict[str, list]
):
coll_index_on_id_map = schema_collection_has_index_on_id(mongo.db)
if all(coll_index_on_id_map[coll] for coll in docs.keys()):
replace = True
elif all(not coll_has_id_index[coll] for coll in docs.keys()):
elif all(not coll_index_on_id_map[coll] for coll in docs.keys()):
# FIXME: XXX: This is a hack because e.g. <https://w3id.org/nmdc/FunctionalAnnotationAggMember>
# documents should be unique with compound key (metagenome_annotation_id, gene_function_id)
# and yet this is not explicit in the schema. One potential solution is to auto-generate an `id`
# as a deterministic hash of the compound key.
#
# For now, decision is to potentially re-insert "duplicate" documents, i.e. to interpret
# lack of `id` as lack of unique document identity for de-duplication.
replace = False # wasting time trying to upsert by `id`.
else:
colls_not_id_indexed = [
coll for coll in docs.keys() if not coll_has_id_index[coll]
coll for coll in docs.keys() if not coll_index_on_id_map[coll]
]
colls_id_indexed = [coll for coll in docs.keys() if coll_has_id_index[coll]]
colls_id_indexed = [coll for coll in docs.keys() if coll_index_on_id_map[coll]]
raise Failure(
"Simultaneous addition of non-`id`ed collections and `id`-ed collections"
" is not supported at this time."
f"{colls_not_id_indexed=} ; {colls_id_indexed=}"
)
op_result = mongo.add_docs(docs, validate=False, replace=replace)
op_patch = UpdateOperationRequest(
done=True,
result=mongo_add_docs_result_as_dict(op_result),
metadata={"done_at": datetime.now(timezone.utc).isoformat(timespec="seconds")},
)
op_doc = client.update_operation(op_id, op_patch).json()
return ["/operations/" + op_doc["id"]]
return mongo_add_docs_result_as_dict(op_result)


@op(required_resource_keys={"mongo"})
Expand Down
12 changes: 7 additions & 5 deletions nmdc_runtime/site/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

from pymongo.database import Database as MongoDatabase

from nmdc_runtime.api.db.mongo import get_collection_names_from_schema
from nmdc_runtime.site.resources import mongo_resource

mode_test = {
Expand Down Expand Up @@ -34,12 +35,13 @@ def run_and_log(shell_cmd, context):


@lru_cache
def collection_indexed_on_id(mdb: MongoDatabase) -> dict:
set_collection_names = [
name for name in mdb.list_collection_names() if name.endswith("_set")
]
def schema_collection_has_index_on_id(mdb: MongoDatabase) -> dict:
present_collection_names = set(mdb.list_collection_names())
return {
name: ("id_1" in mdb[name].index_information()) for name in set_collection_names
name: (
name in present_collection_names and "id_1" in mdb[name].index_information()
)
for name in get_collection_names_from_schema()
}


Expand Down
8 changes: 0 additions & 8 deletions tests/test_ops/test_hello.py

This file was deleted.

79 changes: 79 additions & 0 deletions tests/test_ops/test_ops.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
import json
import os

import pytest
from dagster import build_op_context

from nmdc_runtime.api.endpoints.util import persist_content_and_get_drs_object
from nmdc_runtime.site.resources import (
mongo_resource,
runtime_api_site_client_resource,
RuntimeApiSiteClient,
runtime_api_user_client_resource,
RuntimeApiUserClient,
)

from nmdc_runtime.site.ops import (
perform_mongo_updates,
_add_schema_docs_with_or_without_replacement,
)


@pytest.fixture
def op_context():
return build_op_context(
resources={
"mongo": mongo_resource.configured(
{
"dbname": os.getenv("MONGO_DBNAME"),
"host": os.getenv("MONGO_HOST"),
"password": os.getenv("MONGO_PASSWORD"),
"username": os.getenv("MONGO_USERNAME"),
}
),
"runtime_api_user_client": runtime_api_user_client_resource.configured(
{
"base_url": os.getenv("API_HOST"),
"username": os.getenv("API_ADMIN_USER"),
"password": os.getenv("API_ADMIN_PASS"),
},
),
"runtime_api_site_client": runtime_api_site_client_resource.configured(
{
"base_url": os.getenv("API_HOST"),
"site_id": os.getenv("API_SITE_ID"),
"client_id": os.getenv("API_SITE_CLIENT_ID"),
"client_secret": os.getenv("API_SITE_CLIENT_SECRET"),
}
),
}
)


def test_perform_mongo_updates_functional_annotation_agg(op_context):
mongo = op_context.resources.mongo
docs = {
"functional_annotation_agg": [
{
"metagenome_annotation_id": "nmdc:wfmtan-13-hemh0a82.1",
"gene_function_id": "KEGG.ORTHOLOGY:K00005",
"count": 10,
},
{
"metagenome_annotation_id": "nmdc:wfmtan-13-hemh0a82.1",
"gene_function_id": "KEGG.ORTHOLOGY:K01426",
"count": 5,
},
]
}
# Ensure the docs are not already in the test database.
for doc_spec in docs["functional_annotation_agg"]:
mongo.db.functional_annotation_agg.delete_many(doc_spec)

_add_schema_docs_with_or_without_replacement(mongo, docs)
assert (
mongo.db.functional_annotation_agg.count_documents(
{"$or": docs["functional_annotation_agg"]}
)
== 2
)

0 comments on commit 80daca1

Please sign in to comment.