Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Replace the use of alldocs with respective collections for record retrieval in NCBI exporter #907

25 changes: 23 additions & 2 deletions nmdc_runtime/site/export/ncbi_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import xml.etree.ElementTree as ET
import xml.dom.minidom

from typing import Any
from typing import Any, List, Union
from urllib.parse import urlparse
from nmdc_runtime.site.export.ncbi_xml_utils import (
get_instruments,
Expand Down Expand Up @@ -366,7 +366,14 @@ def set_fastq(
)
# Currently, we are making the assumption that only one instrument
# is used to sequence a Biosample
instrument_id = ntseq.get("instrument_used", "")[0]
instrument_used: Union[str, List[str]] = ntseq.get(
"instrument_used", []
)
if not instrument_used:
instrument_id = None
else:
instrument_id = instrument_used[0]

instrument = all_instruments.get(instrument_id, {})
instrument_vendor = instrument.get("vendor", "")
instrument_model = instrument.get("model", "")
Expand Down Expand Up @@ -448,6 +455,20 @@ def set_fastq(
"Attribute", "NextSeq 550", {"name": "instrument_model"}
)
)
elif instrument_model == "novaseq_6000":
sra_attributes.append(
self.set_element(
"Attribute",
"NovaSeq 6000",
{"name": "instrument_model"},
)
)
elif instrument_model == "hiseq":
sra_attributes.append(
self.set_element(
"Attribute", "HiSeq", {"name": "instrument_model"}
)
)

if analyte_category == "metagenome":
sra_attributes.append(
Expand Down
111 changes: 81 additions & 30 deletions nmdc_runtime/site/export/ncbi_xml_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from io import BytesIO, StringIO
from typing import Any, Dict, List, Union

from nmdc_runtime.api.endpoints.util import strip_oid
from nmdc_runtime.minter.config import typecodes
from lxml import etree
from pymongo.collection import Collection

import csv
import requests
Expand Down Expand Up @@ -45,35 +49,53 @@ def get_instruments(instrument_set_collection):
raise RuntimeError(f"An error occurred while fetching instrument data: {e}")


def fetch_data_objects_from_biosamples(all_docs_collection, biosamples_list):
def fetch_data_objects_from_biosamples(
all_docs_collection: Collection,
data_object_set: Collection,
biosamples_list: List[Dict[str, Any]],
) -> List[Dict[str, Dict[str, Any]]]:
"""This method fetches the data objects that are "associated" (derived from/products of)
with their respective biosamples by iterating over the alldocs collection recursively.
The methods returns a dictionary with biosample ids as keys and the associated list of
data objects as values.

:param all_docs_collection: reference to the alldocs collection
:param data_object_set: reference to the data_object_set collection
:param biosamples_list: list of biosamples as JSON documents
:return: list of dictionaries with biosample ids as keys and associated data objects as values
"""
biosample_data_objects = []

def collect_data_objects(doc_ids, collected_objects, unique_ids):
for doc_id in doc_ids:
if (
get_classname_from_typecode(doc_id) == "DataObject"
and doc_id not in unique_ids
):
data_obj = data_object_set.find_one({"id": doc_id})
if data_obj:
collected_objects.append(strip_oid(data_obj))
unique_ids.add(doc_id)

biosample_data_objects = []

for biosample in biosamples_list:
current_ids = [biosample["id"]]
collected_data_objects = []
unique_ids = set()

while current_ids:
new_current_ids = []
for current_id in current_ids:
query = {"has_input": current_id}
document = all_docs_collection.find_one(query)
for doc in all_docs_collection.find({"has_input": current_id}):
has_output = doc.get("has_output", [])

if not document:
continue

has_output = document.get("has_output")
if not has_output:
continue

for output_id in has_output:
if get_classname_from_typecode(output_id) == "DataObject":
data_object_doc = all_docs_collection.find_one(
{"id": output_id}
)
if data_object_doc:
collected_data_objects.append(data_object_doc)
else:
new_current_ids.append(output_id)
collect_data_objects(has_output, collected_data_objects, unique_ids)
new_current_ids.extend(
op
for op in has_output
if get_classname_from_typecode(op) != "DataObject"
)

current_ids = new_current_ids

Expand All @@ -83,12 +105,25 @@ def fetch_data_objects_from_biosamples(all_docs_collection, biosamples_list):
return biosample_data_objects


def fetch_nucleotide_sequencing_from_biosamples(all_docs_collection, biosamples_list):
biosample_data_objects = []
def fetch_nucleotide_sequencing_from_biosamples(
all_docs_collection: Collection,
data_generation_set: Collection,
biosamples_list: List[Dict[str, Any]],
) -> List[Dict[str, Dict[str, Any]]]:
"""This method fetches the nucleotide sequencing process records that create data objects
for biosamples by iterating over the alldocs collection recursively.

:param all_docs_collection: reference to the alldocs collection
:param data_generation_set: reference to the data_generation_set collection
:param biosamples_list: list of biosamples as JSON documents
:return: list of dictionaries with biosample ids as keys and associated nucleotide sequencing
process objects as values
"""
biosample_ntseq_objects = []

for biosample in biosamples_list:
current_ids = [biosample["id"]]
collected_data_objects = []
collected_ntseq_objects = []

while current_ids:
new_current_ids = []
Expand All @@ -105,23 +140,39 @@ def fetch_nucleotide_sequencing_from_biosamples(all_docs_collection, biosamples_

for output_id in has_output:
if get_classname_from_typecode(output_id) == "DataObject":
nucleotide_sequencing_doc = all_docs_collection.find_one(
nucleotide_sequencing_doc = data_generation_set.find_one(
{"id": document["id"]}
)
if nucleotide_sequencing_doc:
collected_data_objects.append(nucleotide_sequencing_doc)
collected_ntseq_objects.append(
strip_oid(nucleotide_sequencing_doc)
)
else:
new_current_ids.append(output_id)

current_ids = new_current_ids

if collected_data_objects:
biosample_data_objects.append({biosample["id"]: collected_data_objects})
if collected_ntseq_objects:
biosample_ntseq_objects.append({biosample["id"]: collected_ntseq_objects})

return biosample_ntseq_objects

return biosample_data_objects

def fetch_library_preparation_from_biosamples(
all_docs_collection: Collection,
material_processing_set: Collection,
biosamples_list: List[Dict[str, Any]],
) -> List[Dict[str, Dict[str, Any]]]:
"""This method fetches the library preparation process records that create processed samples,
which are further fed/inputted into (by `has_input` slot) a nucleotide sequencing process
for biosamples by iterating over the alldocs collection recursively.

def fetch_library_preparation_from_biosamples(all_docs_collection, biosamples_list):
:param all_docs_collection: reference to the alldocs collection
:param material_processing_set: reference to the material_processing_set collection
:param biosamples_list: list of biosamples as JSON documents
:return: list of dictionaries with biosample ids as keys and associated library preparation process
objects as values
"""
biosample_lib_prep = []

for biosample in biosamples_list:
Expand All @@ -144,10 +195,10 @@ def fetch_library_preparation_from_biosamples(all_docs_collection, biosamples_li
"has_input": output_id,
"type": {"$in": ["LibraryPreparation"]},
}
lib_prep_doc = all_docs_collection.find_one(lib_prep_query)
lib_prep_doc = material_processing_set.find_one(lib_prep_query)

if lib_prep_doc:
biosample_lib_prep.append({biosample_id: lib_prep_doc})
biosample_lib_prep.append({biosample_id: strip_oid(lib_prep_doc)})
break # Stop at the first document that meets the criteria

return biosample_lib_prep
Expand Down
9 changes: 6 additions & 3 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,8 +1188,9 @@ def get_ncbi_export_pipeline_inputs(context: OpExecutionContext) -> str:
def get_data_objects_from_biosamples(context: OpExecutionContext, biosamples: list):
mdb = context.resources.mongo.db
alldocs_collection = mdb["alldocs"]
data_object_set = mdb["data_object_set"]
biosample_data_objects = fetch_data_objects_from_biosamples(
alldocs_collection, biosamples
alldocs_collection, data_object_set, biosamples
)
return biosample_data_objects

Expand All @@ -1200,8 +1201,9 @@ def get_nucleotide_sequencing_from_biosamples(
):
mdb = context.resources.mongo.db
alldocs_collection = mdb["alldocs"]
data_generation_set = mdb["data_generation_set"]
biosample_omics_processing = fetch_nucleotide_sequencing_from_biosamples(
alldocs_collection, biosamples
alldocs_collection, data_generation_set, biosamples
)
return biosample_omics_processing

Expand All @@ -1212,8 +1214,9 @@ def get_library_preparation_from_biosamples(
):
mdb = context.resources.mongo.db
alldocs_collection = mdb["alldocs"]
material_processing_set = mdb["material_processing_set"]
biosample_lib_prep = fetch_library_preparation_from_biosamples(
alldocs_collection, biosamples
alldocs_collection, material_processing_set, biosamples
)
return biosample_lib_prep

Expand Down