Skip to content

Commit

Permalink
fix: don't deserialize and reserialize json for no reason
Browse files Browse the repository at this point in the history
This makes the core ETL loop much faster for basic FHIR tables.

We previously were reading the de-identified input ndjson,
turning it into fhirclient resource objects, scrubbing it, then
writing it back as json.

The value of the fhirclient step was validation and ease of use.
But the MS deid tool validates for us. And we can live with loss
of nice typed objects if it means huge speed wins.

And they do seem to be huge. Quick napkin math looks like we use 10%
of the CPU and 25% of the wall clock time as before. (Of the core
loop -- not counting the MS deid time.)

Which might take us from a CPU-bound profile to an IO-bound profile.

This removes all uses of fhirclient except for the i2b2 code (which
would benefit from a similar change, if we cared about speeding that
up) and the bulk exporter (for some oauth code).
  • Loading branch information
mikix committed Jan 25, 2023
1 parent dbe17e6 commit 19387b5
Show file tree
Hide file tree
Showing 15 changed files with 196 additions and 414 deletions.
18 changes: 0 additions & 18 deletions cumulus/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,6 @@
from urllib.parse import urlparse

import fsspec
from fhirclient.models.resource import Resource
from fhirclient.models.fhirabstractbase import FHIRAbstractBase


###############################################################################
Expand Down Expand Up @@ -177,22 +175,6 @@ def warn_mode():
logging.getLogger().setLevel(logging.WARN)


def error_fhir(fhir_resource):
if isinstance(fhir_resource, Resource):
logging.error(json.dumps(fhir_resource.as_json(), indent=4))
else:
logging.error("expected FHIR Resource got %s", type(fhir_resource))


def print_json(jsonable):
if isinstance(jsonable, dict):
print(json.dumps(jsonable, indent=4))
if isinstance(jsonable, list):
print(json.dumps(jsonable, indent=4))
if isinstance(jsonable, FHIRAbstractBase):
print(json.dumps(jsonable.as_json(), indent=4))


_first_header = True


Expand Down
22 changes: 11 additions & 11 deletions cumulus/ctakes.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,34 +8,34 @@
from typing import List

import ctakesclient
from fhirclient.models.documentreference import DocumentReference

from cumulus import common, fhir_common, store


def covid_symptoms_extract(cache: store.Root, docref: DocumentReference) -> List[dict]:
def covid_symptoms_extract(cache: store.Root, docref: dict) -> List[dict]:
"""
Extract a list of Observations from NLP-detected symptoms in physician notes
:param cache: Where to cache NLP results
:param docref: Physician Note
:return: list of NLP results encoded as FHIR observations
"""
docref_id = docref.id
_, subject_id = fhir_common.unref_resource(docref.subject)
docref_id = docref["id"]
_, subject_id = fhir_common.unref_resource(docref["subject"])

if not docref.context or not docref.context.encounter:
encounters = docref.get("context", {}).get("encounter", [])
if not encounters:
logging.warning("No valid encounters for symptoms") # ideally would print identifier, but it's PHI...
return []
_, encounter_id = fhir_common.unref_resource(docref.context.encounter[0])
_, encounter_id = fhir_common.unref_resource(encounters[0])

# Find the physician note among the attachments
for content in docref.content:
if content.attachment.contentType and content.attachment.data:
mimetype, params = cgi.parse_header(content.attachment.contentType)
for content in docref["content"]:
if "contentType" in content["attachment"] and "data" in content["attachment"]:
mimetype, params = cgi.parse_header(content["attachment"]["contentType"])
if mimetype == "text/plain": # just grab first text we find
charset = params.get("charset", "utf8")
physician_note = base64.standard_b64decode(content.attachment.data).decode(charset)
physician_note = base64.standard_b64decode(content["attachment"]["data"]).decode(charset)
break
else:
logging.warning("No text/plain content for symptoms") # ideally would print identifier, but it's PHI...
Expand Down Expand Up @@ -87,7 +87,7 @@ def is_covid_match(m: ctakesclient.typesystem.MatchText):
positive_matches.append(
{
"id": f"{docref_id}.{i}",
"docref_id": docref.id,
"docref_id": docref_id,
"encounter_id": encounter_id,
"subject_id": subject_id,
"match": match.as_json(),
Expand Down
81 changes: 33 additions & 48 deletions cumulus/deid/scrubber.py
Original file line number Diff line number Diff line change
@@ -1,26 +1,13 @@
"""Cleans (de-identifies) a single resource node recursively"""

import collections
import logging
import tempfile
from typing import Any

from fhirclient.models.attachment import Attachment
from fhirclient.models.fhirabstractbase import FHIRAbstractBase
from fhirclient.models.fhirabstractresource import FHIRAbstractResource
from fhirclient.models.fhirreference import FHIRReference
from fhirclient.models.meta import Meta

from cumulus import fhir_common
from cumulus.deid import codebook, mstool


FHIRProperty = collections.namedtuple(
"FHIRProperty",
["name", "json_name", "type", "is_list", "of_many", "not_optional"],
)


class SkipResource(Exception):
pass

Expand Down Expand Up @@ -60,7 +47,7 @@ def scrub_bulk_data(input_dir: str) -> tempfile.TemporaryDirectory:
mstool.run_mstool(input_dir, tmpdir.name)
return tmpdir

def scrub_resource(self, node: FHIRAbstractBase, scrub_attachments: bool = True) -> bool:
def scrub_resource(self, node: dict, scrub_attachments: bool = True) -> bool:
"""
Cleans/de-identifies resource (in-place) and returns False if it should be rejected
Expand All @@ -72,7 +59,7 @@ def scrub_resource(self, node: FHIRAbstractBase, scrub_attachments: bool = True)
:returns: whether this resource is allowed to be used
"""
try:
self._scrub_node(node, scrub_attachments=scrub_attachments)
self._scrub_node("root", node, scrub_attachments=scrub_attachments)
except SkipResource as exc:
logging.warning("Ignoring resource of type %s: %s", node.__class__.__name__, exc)
return False
Expand All @@ -93,35 +80,31 @@ def save(self) -> None:
#
###############################################################################

def _scrub_node(self, node: FHIRAbstractBase, scrub_attachments: bool) -> None:
def _scrub_node(self, node_path: str, node: dict, scrub_attachments: bool) -> None:
"""Examines all properties of a node"""
fhir_properties = node.elementProperties()
for fhir_property in map(FHIRProperty._make, fhir_properties):
values = getattr(node, fhir_property.name)
for key, values in list(node.items()):
if values is None:
continue

if not fhir_property.is_list:
if not isinstance(values, list):
# Make everything a list for ease of the next bit where we iterate a list
values = [values]

for value in values:
self._scrub_single_value(node, fhir_property, value, scrub_attachments=scrub_attachments)
self._scrub_single_value(node_path, node, key, value, scrub_attachments=scrub_attachments)

def _scrub_single_value(
self, node: FHIRAbstractBase, fhir_property: FHIRProperty, value: Any, scrub_attachments: bool
) -> None:
def _scrub_single_value(self, node_path: str, node: dict, key: str, value: Any, scrub_attachments: bool) -> None:
"""Examines one single property of a node"""
# For now, just manually run each operation. If this grows further, we can abstract it more.
self._check_ids(node, fhir_property, value)
self._check_modifier_extensions(fhir_property, value)
self._check_security(node, fhir_property, value)
self._check_ids(node_path, node, key, value)
self._check_modifier_extensions(key, value)
self._check_security(node_path, node, key, value)
if scrub_attachments:
self._check_attachments(node, fhir_property)
self._check_attachments(node_path, node, key)

# Recurse if we are holding a real FHIR object (instead of, say, a string)
if isinstance(value, FHIRAbstractBase):
self._scrub_node(value, scrub_attachments=scrub_attachments)
# Recurse if we are holding another FHIR object (i.e. a dict instead of a string)
if isinstance(value, dict):
self._scrub_node(f"{node_path}.{key}", value, scrub_attachments=scrub_attachments)

###############################################################################
#
Expand All @@ -130,46 +113,48 @@ def _scrub_single_value(
###############################################################################

@staticmethod
def _check_modifier_extensions(fhir_property: FHIRProperty, value: Any) -> None:
def _check_modifier_extensions(key: str, value: Any) -> None:
"""If there's any unrecognized modifierExtensions, raise a SkipResource exception"""
if fhir_property.name == "modifierExtension":
if key == "modifierExtension" and isinstance(value, dict):
known_extensions = [
"http://fhir-registry.smarthealthit.org/StructureDefinition/nlp-polarity",
"http://fhir-registry.smarthealthit.org/StructureDefinition/nlp-source",
]
url = getattr(value, "url", None)
url = value.get("url")
if url not in known_extensions:
raise SkipResource(f'Unrecognized modifierExtension with URL "{url}"')

def _check_ids(self, node: FHIRAbstractBase, fhir_property: FHIRProperty, value: Any) -> None:
def _check_ids(self, node_path: str, node: dict, key: str, value: Any) -> None:
"""Converts any IDs and references to a de-identified version"""
# ID values
if isinstance(node, FHIRAbstractResource) and fhir_property.name == "id":
node.id = self.codebook.fake_id(node.resource_type, value)
# ID values ("id" is only ever used as a resource ID)
if node_path == "root" and key == "id":
node["id"] = self.codebook.fake_id(node["resourceType"], value)

# References
elif isinstance(node, FHIRReference) and fhir_property.name == "reference":
# "reference" can sometimes be a URL or non-FHIRReference element -- at some point we'll need to be smarter.
elif key == "reference":
resource_type, real_id = fhir_common.unref_resource(node)
fake_id = self.codebook.fake_id(resource_type, real_id)
node.reference = fhir_common.ref_resource(resource_type, fake_id).reference
node["reference"] = fhir_common.ref_resource(resource_type, fake_id)["reference"]

@staticmethod
def _check_attachments(node: FHIRAbstractBase, fhir_property: FHIRProperty) -> None:
def _check_attachments(node_path: str, node: dict, key: str) -> None:
"""Strip any attachment data"""
if isinstance(node, Attachment) and fhir_property.name == "data":
node.data = None
if node_path == "root.content.attachment" and key == "data":
del node["data"]

@staticmethod
def _check_security(node: FHIRAbstractBase, fhir_property: FHIRProperty, value: Any) -> None:
def _check_security(node_path: str, node: dict, key: str, value: Any) -> None:
"""
Strip any security data that the MS tool injects
It takes up space in the result and anyone using Cumulus ETL understands that there was ETL applied.
"""
if fhir_property.name == "meta" and isinstance(value, Meta):
value.security = None # maybe too aggressive -- is there data we care about in meta.security?
if node_path == "root" and key == "meta":
if "security" in value:
del value["security"] # maybe too aggressive -- is there data we care about in meta.security?

# If we wiped out the only content in Meta, remove it so as not to confuse downstream bits like parquet
# writers which try to infer values from an empty struct and fail.
if value.as_json() == {}:
node.meta = None
if not value:
del node["meta"]
2 changes: 1 addition & 1 deletion cumulus/etl.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ def load_and_deidentify(
:returns: a temporary directory holding the de-identified files in FHIR ndjson format
"""
# Grab a list of all required resource types for the tasks we are running
required_resources = set(t.resource.__name__ for t in selected_tasks)
required_resources = set(t.resource for t in selected_tasks)

# First step is loading all the data into a local ndjson format
loaded_dir = loader.load_all(list(required_resources))
Expand Down
Loading

0 comments on commit 19387b5

Please sign in to comment.