From 6e8233c346be217984c2747234f31b50d5c5d5fb Mon Sep 17 00:00:00 2001 From: Sujay Patil Date: Fri, 16 Aug 2024 12:25:19 -0700 Subject: [PATCH 1/4] add Dagster job to create rollup JSON - partially complete --- nmdc_runtime/site/graphs.py | 6 ++ nmdc_runtime/site/ops.py | 100 ++++++++++++++++++++++++++++++-- nmdc_runtime/site/repository.py | 2 + nmdc_runtime/site/util.py | 45 ++++++++++++++ 4 files changed, 147 insertions(+), 6 deletions(-) diff --git a/nmdc_runtime/site/graphs.py b/nmdc_runtime/site/graphs.py index 8d0cb9bb..3e2a63be 100644 --- a/nmdc_runtime/site/graphs.py +++ b/nmdc_runtime/site/graphs.py @@ -56,6 +56,7 @@ get_ncbi_export_pipeline_inputs, ncbi_submission_xml_from_nmdc_study, ncbi_submission_xml_asset, + materialize_rollup_collection, ) from nmdc_runtime.site.export.study_metadata import get_biosamples_by_study_id @@ -105,6 +106,11 @@ def ensure_alldocs(): materialize_alldocs() +@graph +def ensure_rollup(): + materialize_rollup_collection() + + @graph def ensure_jobs(): jobs = construct_jobs() diff --git a/nmdc_runtime/site/ops.py b/nmdc_runtime/site/ops.py index ea8b1ac2..63f209f4 100644 --- a/nmdc_runtime/site/ops.py +++ b/nmdc_runtime/site/ops.py @@ -7,7 +7,6 @@ from collections import defaultdict from datetime import datetime, timezone from io import BytesIO, StringIO -from typing import Tuple from zipfile import ZipFile import pandas as pd @@ -19,9 +18,7 @@ Any, AssetKey, AssetMaterialization, - Dict, Failure, - List, MetadataValue, OpExecutionContext, Out, @@ -33,12 +30,15 @@ Optional, Field, Permissive, + Dict, + List, + Tuple, Bool, ) from gridfs import GridFS from linkml_runtime.dumpers import json_dumper from linkml_runtime.utils.yamlutils import YAMLRoot -from nmdc_runtime.api.db.mongo import get_mongo_db +from nmdc_runtime.api.db.mongo import get_collection_names_from_schema, get_mongo_db from nmdc_runtime.api.core.idgen import generate_one_id from nmdc_runtime.api.core.metadata import ( _validate_changesheet, @@ -46,7 +46,13 @@ get_collection_for_id, map_id_to_collection, ) -from nmdc_runtime.api.core.util import dotted_path_for, hash_from_str, json_clean, now +from nmdc_runtime.api.core.util import ( + dotted_path_for, + hash_from_str, + json_clean, + now, + raise404_if_none, +) from nmdc_runtime.api.endpoints.util import persist_content_and_get_drs_object from nmdc_runtime.api.endpoints.find import find_study_by_id from nmdc_runtime.api.models.job import Job, JobOperationMetadata @@ -62,6 +68,7 @@ _add_run_complete_event, ) from nmdc_runtime.api.models.util import ResultT +from nmdc_runtime.minter.config import typecodes from nmdc_runtime.site.export.ncbi_xml import NCBISubmissionXML from nmdc_runtime.site.export.ncbi_xml_utils import ( fetch_data_objects_from_biosamples, @@ -88,7 +95,11 @@ from nmdc_runtime.site.translation.submission_portal_translator import ( SubmissionPortalTranslator, ) -from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id +from nmdc_runtime.site.util import ( + get_collection_from_typecode, + run_and_log, + schema_collection_has_index_on_id, +) from nmdc_runtime.util import ( drs_object_in_for, pluralize, @@ -1051,6 +1062,83 @@ def materialize_alldocs(context) -> int: return mdb.alldocs.estimated_document_count() +@op(config_schema={"nmdc_study_id": str}, required_resource_keys={"mongo"}) +def materialize_rollup_collection(context: OpExecutionContext) -> List: + mdb = context.resources.mongo.db + study_id = context.op_config["nmdc_study_id"] + + biosample_associated_objects = [] + study = raise404_if_none( + mdb.study_set.find_one({"id": study_id}, projection={"id": 1}), + detail="Study not found", + ) + if not study: + return [] + + biosamples = mdb.biosample_set.find({"part_of": study["id"]}, projection={"id": 1}) + biosample_ids = [biosample["id"] for biosample in biosamples] + if not biosample_ids: + return [] + + for biosample_id in biosample_ids: + current_ids = [biosample_id] + collected_ids_by_collection = {} + + while current_ids: + documents = list( + mdb.alldocs.find( + {"has_input": {"$in": current_ids}}, + projection={"id": 1, "has_input": 1, "has_output": 1}, + ) + ) + if not documents: + break + + new_current_ids = [] + for document in documents: + document_id = document["id"] + collection_name = get_collection_from_typecode(document_id) + if collection_name: + collected_ids_by_collection.setdefault(collection_name, []).append( + document_id + ) + + if "has_input" in document: + for inp in document["has_input"]: + inp_collection_name = get_collection_from_typecode(inp) + if inp_collection_name: + collected_ids_by_collection.setdefault( + inp_collection_name, [] + ).append(inp) + + if "has_output" in document: + for out in document["has_output"]: + out_collection_name = get_collection_from_typecode(out) + if out_collection_name: + collected_ids_by_collection.setdefault( + out_collection_name, [] + ).append(out) + new_current_ids.append(out) + + if mdb.alldocs.find_one( + {"was_generated_by": out}, projection={"id": 1} + ): + collected_ids_by_collection.setdefault( + out_collection_name, [] + ).append(out) + + current_ids = new_current_ids + + if collected_ids_by_collection: + formatted_document = { + "biosample_id": biosample_id, + "associated_ids_by_collection": collected_ids_by_collection, + } + biosample_associated_objects.append(formatted_document) + + return biosample_associated_objects + + @op(config_schema={"nmdc_study_id": str}, required_resource_keys={"mongo"}) def get_ncbi_export_pipeline_study(context: OpExecutionContext) -> Any: nmdc_study = find_study_by_id( diff --git a/nmdc_runtime/site/repository.py b/nmdc_runtime/site/repository.py index fbe5105b..9c725d5c 100644 --- a/nmdc_runtime/site/repository.py +++ b/nmdc_runtime/site/repository.py @@ -43,6 +43,7 @@ ingest_neon_benthic_metadata, ingest_neon_surface_water_metadata, ensure_alldocs, + ensure_rollup, nmdc_study_to_ncbi_submission_export, ) from nmdc_runtime.site.resources import ( @@ -452,6 +453,7 @@ def repo(): apply_metadata_in.to_job(**preset_normal), export_study_biosamples_metadata.to_job(**preset_normal), ensure_alldocs.to_job(**preset_normal), + ensure_rollup.to_job(**preset_normal), ] schedules = [housekeeping_weekly] sensors = [ diff --git a/nmdc_runtime/site/util.py b/nmdc_runtime/site/util.py index 4280fe65..5a805ae8 100644 --- a/nmdc_runtime/site/util.py +++ b/nmdc_runtime/site/util.py @@ -4,7 +4,10 @@ from pymongo.database import Database as MongoDatabase +from linkml_runtime.utils.schemaview import SchemaView +from nmdc_schema.get_nmdc_view import ViewGetter from nmdc_runtime.api.db.mongo import get_collection_names_from_schema +from nmdc_runtime.minter.config import typecodes from nmdc_runtime.site.resources import mongo_resource mode_test = { @@ -24,6 +27,8 @@ }, } +DATABASE_CLASS_NAME = "Database" + def run_and_log(shell_cmd, context): process = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=STDOUT) @@ -47,3 +52,43 @@ def schema_collection_has_index_on_id(mdb: MongoDatabase) -> dict: def get_basename(filename: str) -> str: return os.path.basename(filename) + + +def get_name_of_class_objects_in_collection( + schema_view: SchemaView, collection_name: str +) -> str: + slot_definition = schema_view.induced_slot(collection_name, DATABASE_CLASS_NAME) + return slot_definition.range + + +@lru_cache +def get_class_names_to_collection_names_map(): + vg = ViewGetter() + schema_view = vg.get_view() + + collection_names = get_collection_names_from_schema() + + class_names_to_collection_names = {} + for collection_name in collection_names: + class_name = get_name_of_class_objects_in_collection( + schema_view, collection_name + ) + class_names_to_collection_names[class_name] = collection_name + + return class_names_to_collection_names + + +def get_collection_from_typecode(doc_id: str): + typecode = doc_id.split(":")[1].split("-")[0] + class_map_data = typecodes() + + class_map = { + entry["name"]: entry["schema_class"].split(":")[1] for entry in class_map_data + } + class_name = class_map.get(typecode) + if class_name: + collection_dict = get_class_names_to_collection_names_map() + collection_name = collection_dict.get(class_name) + return collection_name + + return None From 5dab30820214f236e3c77761b73f70740ae5444a Mon Sep 17 00:00:00 2001 From: Sujay Patil Date: Wed, 30 Oct 2024 09:33:13 -0700 Subject: [PATCH 2/4] Dagster job that creates biosample rollup JSON --- nmdc_runtime/site/graphs.py | 13 ++- nmdc_runtime/site/ops.py | 150 ++++++++++++++++++++------------ nmdc_runtime/site/repository.py | 29 +++++- nmdc_runtime/site/util.py | 46 ---------- 4 files changed, 129 insertions(+), 109 deletions(-) diff --git a/nmdc_runtime/site/graphs.py b/nmdc_runtime/site/graphs.py index ccf4d74c..3f06f6f4 100644 --- a/nmdc_runtime/site/graphs.py +++ b/nmdc_runtime/site/graphs.py @@ -56,7 +56,9 @@ get_ncbi_export_pipeline_inputs, ncbi_submission_xml_from_nmdc_study, ncbi_submission_xml_asset, - materialize_rollup_collection, + get_biosample_rollup_pipeline_input, + materialize_biosample_rollup, + biosample_rollup_filename, ) from nmdc_runtime.site.export.study_metadata import get_biosamples_by_study_id @@ -107,8 +109,13 @@ def ensure_alldocs(): @graph -def ensure_rollup(): - materialize_rollup_collection() +def biosample_rollup_export(): + nmdc_study_id = get_biosample_rollup_pipeline_input() + biosample_rollup = materialize_biosample_rollup(nmdc_study_id) + + filename = biosample_rollup_filename() + outputs = export_json_to_drs(biosample_rollup, filename) + add_output_run_event(outputs) @graph diff --git a/nmdc_runtime/site/ops.py b/nmdc_runtime/site/ops.py index c7e8180b..b4baf39d 100644 --- a/nmdc_runtime/site/ops.py +++ b/nmdc_runtime/site/ops.py @@ -9,6 +9,7 @@ from io import BytesIO, StringIO from zipfile import ZipFile from itertools import chain +from typing import Dict, List, Tuple import pandas as pd import requests @@ -31,10 +32,6 @@ Optional, Field, Permissive, - Dict, - List, - Tuple, - Bool, ) from gridfs import GridFS from linkml_runtime.dumpers import json_dumper @@ -55,7 +52,10 @@ raise404_if_none, ) from nmdc_runtime.api.endpoints.util import persist_content_and_get_drs_object -from nmdc_runtime.api.endpoints.find import find_study_by_id +from nmdc_runtime.api.endpoints.find import ( + find_study_by_id, + get_classname_from_typecode, +) from nmdc_runtime.api.models.job import Job, JobOperationMetadata from nmdc_runtime.api.models.metadata import ChangesheetIn from nmdc_runtime.api.models.operation import ( @@ -113,6 +113,7 @@ ) from nmdc_schema import nmdc from nmdc_schema.nmdc import Database as NMDCDatabase +from nmdc_schema.get_nmdc_view import ViewGetter from pydantic import BaseModel from pymongo.database import Database as MongoDatabase from starlette import status @@ -1152,81 +1153,114 @@ def materialize_alldocs(context) -> int: return mdb.alldocs.estimated_document_count() -@op(config_schema={"nmdc_study_id": str}, required_resource_keys={"mongo"}) -def materialize_rollup_collection(context: OpExecutionContext) -> List: +@op +def biosample_rollup_filename() -> str: + return "biosample_rollup.json" + + +@op( + config_schema={"nmdc_study_id": str}, + out={"nmdc_study_id": Out(str)}, +) +def get_biosample_rollup_pipeline_input(context: OpExecutionContext) -> str: + return context.op_config["nmdc_study_id"] + + +@op(required_resource_keys={"mongo"}) +def materialize_biosample_rollup( + context: OpExecutionContext, nmdc_study_id: str +) -> Dict: mdb = context.resources.mongo.db - study_id = context.op_config["nmdc_study_id"] - biosample_associated_objects = [] study = raise404_if_none( - mdb.study_set.find_one({"id": study_id}, projection={"id": 1}), + mdb.study_set.find_one({"id": nmdc_study_id}, projection={"id": 1}), detail="Study not found", ) if not study: return [] - biosamples = mdb.biosample_set.find({"part_of": study["id"]}, projection={"id": 1}) + # Note: With nmdc-schema v10 (legacy schema), we used the field named `part_of` here. + # With nmdc-schema v11 (Berkeley schema), we use the field named `associated_studies` here. + biosamples = mdb.biosample_set.find({"associated_studies": study["id"]}, ["id"]) biosample_ids = [biosample["id"] for biosample in biosamples] if not biosample_ids: return [] + biosample_associated_ids = [] + + # SchemaView interface to NMDC Schema + nmdc_view = ViewGetter() + nmdc_sv = nmdc_view.get_view() + dg_descendants = nmdc_sv.class_descendants("DataGeneration") + for biosample_id in biosample_ids: current_ids = [biosample_id] - collected_ids_by_collection = {} - while current_ids: - documents = list( - mdb.alldocs.find( - {"has_input": {"$in": current_ids}}, - projection={"id": 1, "has_input": 1, "has_output": 1}, - ) - ) - if not documents: - break + # List to capture all document IDs that are related to a Biosample ID + all_collected_ids = [] + while current_ids: new_current_ids = [] - for document in documents: - document_id = document["id"] - collection_name = get_collection_from_typecode(document_id) - if collection_name: - collected_ids_by_collection.setdefault(collection_name, []).append( - document_id + for current_id in current_ids: + query = {"has_input": current_id} + documents = mdb.alldocs.find(query) + + if not documents: + continue + + for document in documents: + doc_id = document.get("id") + all_collected_ids.append(doc_id) + has_output = document.get("has_output", []) + + if not has_output and any( + doc_type in dg_descendants + for doc_type in document.get("type", []) + ): + was_informed_by_query = {"was_informed_by": doc_id} + informed_by_docs = mdb.alldocs.find(was_informed_by_query) + + for informed_by_doc in informed_by_docs: + all_collected_ids.append(informed_by_doc.get("id")) + all_collected_ids.extend( + informed_by_doc.get("has_input", []) + ) + all_collected_ids.extend( + informed_by_doc.get("has_output", []) + ) + continue + + new_current_ids.extend( + op + for op in has_output + if get_classname_from_typecode(op) != "DataObject" ) - - if "has_input" in document: - for inp in document["has_input"]: - inp_collection_name = get_collection_from_typecode(inp) - if inp_collection_name: - collected_ids_by_collection.setdefault( - inp_collection_name, [] - ).append(inp) - - if "has_output" in document: - for out in document["has_output"]: - out_collection_name = get_collection_from_typecode(out) - if out_collection_name: - collected_ids_by_collection.setdefault( - out_collection_name, [] - ).append(out) - new_current_ids.append(out) - - if mdb.alldocs.find_one( - {"was_generated_by": out}, projection={"id": 1} - ): - collected_ids_by_collection.setdefault( - out_collection_name, [] - ).append(out) + all_collected_ids.extend(has_output) + + if any( + doc_type in dg_descendants + for doc_type in document.get("type", []) + ): + was_informed_by_query = {"was_informed_by": doc_id} + informed_by_docs = mdb.alldocs.find(was_informed_by_query) + for informed_by_doc in informed_by_docs: + all_collected_ids.append(informed_by_doc.get("id")) + all_collected_ids.extend( + informed_by_doc.get("has_input", []) + ) + all_collected_ids.extend( + informed_by_doc.get("has_output", []) + ) current_ids = new_current_ids - if collected_ids_by_collection: - formatted_document = { - "biosample_id": biosample_id, - "associated_ids_by_collection": collected_ids_by_collection, - } - biosample_associated_objects.append(formatted_document) + result = { + "biosample_id": biosample_id, + "associated_ids": all_collected_ids, + } + biosample_associated_ids.append(result) - return biosample_associated_objects + return {"biosample_rollup": biosample_associated_ids} @op(config_schema={"nmdc_study_id": str}, required_resource_keys={"mongo"}) diff --git a/nmdc_runtime/site/repository.py b/nmdc_runtime/site/repository.py index c7da716b..7c75f796 100644 --- a/nmdc_runtime/site/repository.py +++ b/nmdc_runtime/site/repository.py @@ -43,7 +43,7 @@ ingest_neon_benthic_metadata, ingest_neon_surface_water_metadata, ensure_alldocs, - ensure_rollup, + biosample_rollup_export, nmdc_study_to_ncbi_submission_export, ) from nmdc_runtime.site.resources import ( @@ -453,7 +453,6 @@ def repo(): apply_metadata_in.to_job(**preset_normal), export_study_biosamples_metadata.to_job(**preset_normal), ensure_alldocs.to_job(**preset_normal), - ensure_rollup.to_job(**preset_normal), ] schedules = [housekeeping_weekly] sensors = [ @@ -913,6 +912,32 @@ def biosample_export(): }, }, ), + biosample_rollup_export.to_job( + resource_defs=resource_defs, + config={ + "resources": merge( + unfreeze(normal_resources), + { + "mongo": { + "config": { + "host": {"env": "MONGO_HOST"}, + "username": {"env": "MONGO_USERNAME"}, + "password": {"env": "MONGO_PASSWORD"}, + "dbname": {"env": "MONGO_DBNAME"}, + }, + }, + }, + ), + "ops": { + "get_biosample_rollup_pipeline_input": { + "config": { + "nmdc_study_id": "", + } + }, + "export_json_to_drs": {"config": {"username": ""}}, + }, + }, + ), ] diff --git a/nmdc_runtime/site/util.py b/nmdc_runtime/site/util.py index 5a805ae8..aa00f358 100644 --- a/nmdc_runtime/site/util.py +++ b/nmdc_runtime/site/util.py @@ -4,10 +4,7 @@ from pymongo.database import Database as MongoDatabase -from linkml_runtime.utils.schemaview import SchemaView -from nmdc_schema.get_nmdc_view import ViewGetter from nmdc_runtime.api.db.mongo import get_collection_names_from_schema -from nmdc_runtime.minter.config import typecodes from nmdc_runtime.site.resources import mongo_resource mode_test = { @@ -27,9 +24,6 @@ }, } -DATABASE_CLASS_NAME = "Database" - - def run_and_log(shell_cmd, context): process = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=STDOUT) for line in process.stdout: @@ -52,43 +46,3 @@ def schema_collection_has_index_on_id(mdb: MongoDatabase) -> dict: def get_basename(filename: str) -> str: return os.path.basename(filename) - - -def get_name_of_class_objects_in_collection( - schema_view: SchemaView, collection_name: str -) -> str: - slot_definition = schema_view.induced_slot(collection_name, DATABASE_CLASS_NAME) - return slot_definition.range - - -@lru_cache -def get_class_names_to_collection_names_map(): - vg = ViewGetter() - schema_view = vg.get_view() - - collection_names = get_collection_names_from_schema() - - class_names_to_collection_names = {} - for collection_name in collection_names: - class_name = get_name_of_class_objects_in_collection( - schema_view, collection_name - ) - class_names_to_collection_names[class_name] = collection_name - - return class_names_to_collection_names - - -def get_collection_from_typecode(doc_id: str): - typecode = doc_id.split(":")[1].split("-")[0] - class_map_data = typecodes() - - class_map = { - entry["name"]: entry["schema_class"].split(":")[1] for entry in class_map_data - } - class_name = class_map.get(typecode) - if class_name: - collection_dict = get_class_names_to_collection_names_map() - collection_name = collection_dict.get(class_name) - return collection_name - - return None From a06a2ddd5bb831fb802cceb334eb2fcf8c5b0447 Mon Sep 17 00:00:00 2001 From: github-actions Date: Wed, 30 Oct 2024 16:33:50 +0000 Subject: [PATCH 3/4] style: reformat --- nmdc_runtime/site/util.py | 1 + 1 file changed, 1 insertion(+) diff --git a/nmdc_runtime/site/util.py b/nmdc_runtime/site/util.py index aa00f358..4280fe65 100644 --- a/nmdc_runtime/site/util.py +++ b/nmdc_runtime/site/util.py @@ -24,6 +24,7 @@ }, } + def run_and_log(shell_cmd, context): process = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=STDOUT) for line in process.stdout: From 15d445836b68d748db255b2ea7137b41e9e38f7a Mon Sep 17 00:00:00 2001 From: Sujay Patil Date: Thu, 31 Oct 2024 08:06:29 -0700 Subject: [PATCH 4/4] add back necessary rollup helper to util.py --- nmdc_runtime/site/util.py | 45 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 45 insertions(+) diff --git a/nmdc_runtime/site/util.py b/nmdc_runtime/site/util.py index 4280fe65..5a805ae8 100644 --- a/nmdc_runtime/site/util.py +++ b/nmdc_runtime/site/util.py @@ -4,7 +4,10 @@ from pymongo.database import Database as MongoDatabase +from linkml_runtime.utils.schemaview import SchemaView +from nmdc_schema.get_nmdc_view import ViewGetter from nmdc_runtime.api.db.mongo import get_collection_names_from_schema +from nmdc_runtime.minter.config import typecodes from nmdc_runtime.site.resources import mongo_resource mode_test = { @@ -24,6 +27,8 @@ }, } +DATABASE_CLASS_NAME = "Database" + def run_and_log(shell_cmd, context): process = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=STDOUT) @@ -47,3 +52,43 @@ def schema_collection_has_index_on_id(mdb: MongoDatabase) -> dict: def get_basename(filename: str) -> str: return os.path.basename(filename) + + +def get_name_of_class_objects_in_collection( + schema_view: SchemaView, collection_name: str +) -> str: + slot_definition = schema_view.induced_slot(collection_name, DATABASE_CLASS_NAME) + return slot_definition.range + + +@lru_cache +def get_class_names_to_collection_names_map(): + vg = ViewGetter() + schema_view = vg.get_view() + + collection_names = get_collection_names_from_schema() + + class_names_to_collection_names = {} + for collection_name in collection_names: + class_name = get_name_of_class_objects_in_collection( + schema_view, collection_name + ) + class_names_to_collection_names[class_name] = collection_name + + return class_names_to_collection_names + + +def get_collection_from_typecode(doc_id: str): + typecode = doc_id.split(":")[1].split("-")[0] + class_map_data = typecodes() + + class_map = { + entry["name"]: entry["schema_class"].split(":")[1] for entry in class_map_data + } + class_name = class_map.get(typecode) + if class_name: + collection_dict = get_class_names_to_collection_names_map() + collection_name = collection_dict.get(class_name) + return collection_name + + return None