Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Dagster job to create rollup JSON/collection #643

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,9 @@
get_ncbi_export_pipeline_inputs,
ncbi_submission_xml_from_nmdc_study,
ncbi_submission_xml_asset,
get_biosample_rollup_pipeline_input,
materialize_biosample_rollup,
biosample_rollup_filename,
)
from nmdc_runtime.site.export.study_metadata import get_biosamples_by_study_id

Expand Down Expand Up @@ -105,6 +108,16 @@ def ensure_alldocs():
materialize_alldocs()


@graph
def biosample_rollup_export():
nmdc_study_id = get_biosample_rollup_pipeline_input()
biosample_rollup = materialize_biosample_rollup(nmdc_study_id)

filename = biosample_rollup_filename()
outputs = export_json_to_drs(biosample_rollup, filename)
add_output_run_event(outputs)


@graph
def ensure_jobs():
jobs = construct_jobs()
Expand Down
138 changes: 130 additions & 8 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
from collections import defaultdict
from datetime import datetime, timezone
from io import BytesIO, StringIO
from typing import Tuple
from zipfile import ZipFile
from itertools import chain
from typing import Dict, List, Tuple

import pandas as pd
import requests
Expand All @@ -20,9 +20,7 @@
Any,
AssetKey,
AssetMaterialization,
Dict,
Failure,
List,
MetadataValue,
OpExecutionContext,
Out,
Expand All @@ -34,22 +32,30 @@
Optional,
Field,
Permissive,
Bool,
)
from gridfs import GridFS
from linkml_runtime.dumpers import json_dumper
from linkml_runtime.utils.yamlutils import YAMLRoot
from nmdc_runtime.api.db.mongo import get_mongo_db
from nmdc_runtime.api.db.mongo import get_collection_names_from_schema, get_mongo_db
from nmdc_runtime.api.core.idgen import generate_one_id
from nmdc_runtime.api.core.metadata import (
_validate_changesheet,
df_from_sheet_in,
get_collection_for_id,
map_id_to_collection,
)
from nmdc_runtime.api.core.util import dotted_path_for, hash_from_str, json_clean, now
from nmdc_runtime.api.core.util import (
dotted_path_for,
hash_from_str,
json_clean,
now,
raise404_if_none,
)
from nmdc_runtime.api.endpoints.util import persist_content_and_get_drs_object
from nmdc_runtime.api.endpoints.find import find_study_by_id
from nmdc_runtime.api.endpoints.find import (
find_study_by_id,
get_classname_from_typecode,
)
from nmdc_runtime.api.models.job import Job, JobOperationMetadata
from nmdc_runtime.api.models.metadata import ChangesheetIn
from nmdc_runtime.api.models.operation import (
Expand All @@ -63,6 +69,7 @@
_add_run_complete_event,
)
from nmdc_runtime.api.models.util import ResultT
from nmdc_runtime.minter.config import typecodes
from nmdc_runtime.site.export.ncbi_xml import NCBISubmissionXML
from nmdc_runtime.site.export.ncbi_xml_utils import (
fetch_data_objects_from_biosamples,
Expand All @@ -89,7 +96,11 @@
from nmdc_runtime.site.translation.submission_portal_translator import (
SubmissionPortalTranslator,
)
from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id
from nmdc_runtime.site.util import (
get_collection_from_typecode,
run_and_log,
schema_collection_has_index_on_id,
)
from nmdc_runtime.util import (
drs_object_in_for,
pluralize,
Expand All @@ -102,6 +113,7 @@
)
from nmdc_schema import nmdc
from nmdc_schema.nmdc import Database as NMDCDatabase
from nmdc_schema.get_nmdc_view import ViewGetter
from pydantic import BaseModel
from pymongo.database import Database as MongoDatabase
from starlette import status
Expand Down Expand Up @@ -1141,6 +1153,116 @@ def materialize_alldocs(context) -> int:
return mdb.alldocs.estimated_document_count()


@op
def biosample_rollup_filename() -> str:
return "biosample_rollup.json"


@op(
config_schema={"nmdc_study_id": str},
out={"nmdc_study_id": Out(str)},
)
def get_biosample_rollup_pipeline_input(context: OpExecutionContext) -> str:
return context.op_config["nmdc_study_id"]


@op(required_resource_keys={"mongo"})
def materialize_biosample_rollup(
context: OpExecutionContext, nmdc_study_id: str
) -> Dict:
mdb = context.resources.mongo.db

study = raise404_if_none(
mdb.study_set.find_one({"id": nmdc_study_id}, projection={"id": 1}),
detail="Study not found",
)
if not study:
return []

# Note: With nmdc-schema v10 (legacy schema), we used the field named `part_of` here.
# With nmdc-schema v11 (Berkeley schema), we use the field named `associated_studies` here.
biosamples = mdb.biosample_set.find({"associated_studies": study["id"]}, ["id"])
biosample_ids = [biosample["id"] for biosample in biosamples]
if not biosample_ids:
return []

biosample_associated_ids = []

# SchemaView interface to NMDC Schema
nmdc_view = ViewGetter()
nmdc_sv = nmdc_view.get_view()
dg_descendants = nmdc_sv.class_descendants("DataGeneration")

for biosample_id in biosample_ids:
current_ids = [biosample_id]

# List to capture all document IDs that are related to a Biosample ID
all_collected_ids = []

while current_ids:
new_current_ids = []
for current_id in current_ids:
query = {"has_input": current_id}
documents = mdb.alldocs.find(query)

if not documents:
continue

for document in documents:
doc_id = document.get("id")
all_collected_ids.append(doc_id)
has_output = document.get("has_output", [])

if not has_output and any(
doc_type in dg_descendants
for doc_type in document.get("type", [])
):
was_informed_by_query = {"was_informed_by": doc_id}
informed_by_docs = mdb.alldocs.find(was_informed_by_query)

for informed_by_doc in informed_by_docs:
all_collected_ids.append(informed_by_doc.get("id"))
all_collected_ids.extend(
informed_by_doc.get("has_input", [])
)
all_collected_ids.extend(
informed_by_doc.get("has_output", [])
)
continue

new_current_ids.extend(
op
for op in has_output
if get_classname_from_typecode(op) != "DataObject"
)
all_collected_ids.extend(has_output)

if any(
doc_type in dg_descendants
for doc_type in document.get("type", [])
):
was_informed_by_query = {"was_informed_by": doc_id}
informed_by_docs = mdb.alldocs.find(was_informed_by_query)
for informed_by_doc in informed_by_docs:
all_collected_ids.append(informed_by_doc.get("id"))
all_collected_ids.extend(
informed_by_doc.get("has_input", [])
)
all_collected_ids.extend(
informed_by_doc.get("has_output", [])
)

current_ids = new_current_ids

result = {
"biosample_id": biosample_id,
"associated_ids": all_collected_ids,
}
biosample_associated_ids.append(result)

return {"biosample_rollup": biosample_associated_ids}


@op(config_schema={"nmdc_study_id": str}, required_resource_keys={"mongo"})
def get_ncbi_export_pipeline_study(context: OpExecutionContext) -> Any:
nmdc_study = find_study_by_id(
Expand Down
27 changes: 27 additions & 0 deletions nmdc_runtime/site/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@
ingest_neon_benthic_metadata,
ingest_neon_surface_water_metadata,
ensure_alldocs,
biosample_rollup_export,
nmdc_study_to_ncbi_submission_export,
)
from nmdc_runtime.site.resources import (
Expand Down Expand Up @@ -911,6 +912,32 @@ def biosample_export():
},
},
),
biosample_rollup_export.to_job(
resource_defs=resource_defs,
config={
"resources": merge(
unfreeze(normal_resources),
{
"mongo": {
"config": {
"host": {"env": "MONGO_HOST"},
"username": {"env": "MONGO_USERNAME"},
"password": {"env": "MONGO_PASSWORD"},
"dbname": {"env": "MONGO_DBNAME"},
},
},
},
),
"ops": {
"get_biosample_rollup_pipeline_input": {
"config": {
"nmdc_study_id": "",
}
},
"export_json_to_drs": {"config": {"username": ""}},
},
},
),
]


Expand Down
45 changes: 45 additions & 0 deletions nmdc_runtime/site/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,10 @@

from pymongo.database import Database as MongoDatabase

from linkml_runtime.utils.schemaview import SchemaView
from nmdc_schema.get_nmdc_view import ViewGetter
from nmdc_runtime.api.db.mongo import get_collection_names_from_schema
from nmdc_runtime.minter.config import typecodes
from nmdc_runtime.site.resources import mongo_resource

mode_test = {
Expand All @@ -24,6 +27,8 @@
},
}

DATABASE_CLASS_NAME = "Database"


def run_and_log(shell_cmd, context):
process = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=STDOUT)
Expand All @@ -47,3 +52,43 @@ def schema_collection_has_index_on_id(mdb: MongoDatabase) -> dict:

def get_basename(filename: str) -> str:
return os.path.basename(filename)


def get_name_of_class_objects_in_collection(
schema_view: SchemaView, collection_name: str
) -> str:
slot_definition = schema_view.induced_slot(collection_name, DATABASE_CLASS_NAME)
return slot_definition.range


@lru_cache
def get_class_names_to_collection_names_map():
vg = ViewGetter()
schema_view = vg.get_view()

collection_names = get_collection_names_from_schema()

class_names_to_collection_names = {}
for collection_name in collection_names:
class_name = get_name_of_class_objects_in_collection(
schema_view, collection_name
)
class_names_to_collection_names[class_name] = collection_name

return class_names_to_collection_names


def get_collection_from_typecode(doc_id: str):
typecode = doc_id.split(":")[1].split("-")[0]
class_map_data = typecodes()

class_map = {
entry["name"]: entry["schema_class"].split(":")[1] for entry in class_map_data
}
class_name = class_map.get(typecode)
if class_name:
collection_dict = get_class_names_to_collection_names_map()
collection_name = collection_dict.get(class_name)
return collection_name

return None
Loading