Skip to content

Commit

Permalink
Merge pull request #694 from microbiomedata/690B-berkeley-dagster-job…
Browse files Browse the repository at this point in the history
…-ensure_alldocs-fails-with-assertionerror-1

`berkeley`: Determine class ancestry per-type value (within collection) instead of per-collection (when generating `alldocs`)
  • Loading branch information
eecavanna authored Sep 20, 2024
2 parents d098620 + 6bb4524 commit 4f1ee79
Showing 1 changed file with 76 additions and 28 deletions.
104 changes: 76 additions & 28 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1022,47 +1022,95 @@ def materialize_alldocs(context) -> int:
mdb = context.resources.mongo.db
collection_names = populated_schema_collection_names_with_id_field(mdb)

for name in collection_names:
assert (
len(collection_name_to_class_names[name]) == 1
), f"{name} collection has class name of {collection_name_to_class_names[name]} and len {len(collection_name_to_class_names[name])}"
# Insert a no-op as an anchor point for this comment.
#
# Note: There used to be code here that `assert`-ed that each collection could only contain documents of a single
# type. With the legacy schema, that assertion was true. With the Berkeley schema, it is false. That code was
# in place because subsequent code (further below) used a single document in a collection as the source of the
# class ancestry information of _all_ documents in that collection; an optimization that spared us from
# having to do the same for every single document in that collection. With the Berkeley schema, we have
# eliminated that optimization (since it is inadequate; it would produce some incorrect class ancestries
# for descendants of `PlannedProcess`, for example).
#
pass

context.log.info(f"{collection_names=}")

# Drop any existing `alldocs` collection (e.g. from previous use of this op).
#
# FIXME: This "nuke and pave" approach introduces a race condition.
# For example, if someone were to visit an API endpoint that uses the "alldocs" collection,
# the endpoint would fail to perform its job since the "alldocs" collection is temporarily missing.
#
mdb.alldocs.drop()

# Build alldocs
context.log.info("constructing `alldocs` collection")

for collection in collection_names:
# Calculate class_hierarchy_as_list once per collection, using the first document in list
try:
nmdcdb = NMDCDatabase(
**{collection: [dissoc(mdb[collection].find_one(), "_id")]}
)
exemplar = getattr(nmdcdb, collection)[0]
newdoc_type: list[str] = class_hierarchy_as_list(exemplar)
except ValueError as e:
context.log.info(f"Collection {collection} does not exist.")
raise e

# For each collection, group its documents by their `type` value, transform them, and load them into `alldocs`.
for collection_name in collection_names:
context.log.info(
f"Found {mdb[collection].estimated_document_count()} estimated documents for {collection=}."
)
# For each document in this collection, replace the value of the `type` field with
# a _list_ of the document's own class and ancestor classes, remove the `_id` field,
# and insert the resulting document into the `alldocs` collection.

inserted_many_result = mdb.alldocs.insert_many(
[
assoc(dissoc(doc, "type", "_id"), "type", newdoc_type)
for doc in mdb[collection].find()
]
f"Found {mdb[collection_name].estimated_document_count()} estimated documents for {collection_name=}."
)

# Process all the distinct `type` values (i.e. value in the `type` field) of the documents in this collection.
#
# References:
# - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.distinct
#
distinct_type_values = mdb[collection_name].distinct(key="type")
context.log.info(
f"Inserted {len(inserted_many_result.inserted_ids)} documents for {collection=}."
f"Found {len(distinct_type_values)} distinct `type` values in {collection_name=}: {distinct_type_values=}"
)
for type_value in distinct_type_values:

# Process all the documents in this collection that have this value in their `type` field.
#
# References:
# - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.count_documents
# - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find
#
filter_ = {"type": type_value}
num_docs_having_type = mdb[collection_name].count_documents(filter=filter_)
docs_having_type = mdb[collection_name].find(filter=filter_)
context.log.info(
f"Found {num_docs_having_type} documents having {type_value=} in {collection_name=}."
)

# Get a "representative" document from the result.
#
# Note: Since all of the documents in this batch have the same class ancestry, we will save time by
# determining the class ancestry of only _one_ of them (we call this the "representative") and then
# (later) attributing that class ancestry to all of them.
#
representative_doc = next(docs_having_type)

# Instantiate the Python class represented by the "representative" document.
db_dict = {
# Shed the `_id` attribute, since the constructor doesn't allow it.
collection_name: [dissoc(representative_doc, "_id")]
}
nmdc_db = NMDCDatabase(**db_dict)
representative_instance = getattr(nmdc_db, collection_name)[0]

# Get the class ancestry of that instance, as a list of class names (including its own class name).
ancestor_class_names = class_hierarchy_as_list(representative_instance)

# Store the documents belonging to this group, in the `alldocs` collection, setting their `type` field
# to the list of class names obtained from the "representative" document above.
#
# TODO: Document why clobbering the existing contents of the `type` field is OK.
#
inserted_many_result = mdb.alldocs.insert_many(
[
assoc(dissoc(doc, "type", "_id"), "type", ancestor_class_names)
for doc in docs_having_type
]
)
context.log.info(
f"Inserted {len(inserted_many_result.inserted_ids)} documents from {collection_name=} "
f"originally having {type_value=}."
)

# Re-idx for `alldocs` collection
mdb.alldocs.create_index("id", unique=True)
Expand Down

0 comments on commit 4f1ee79

Please sign in to comment.