From cf86eb5cf2d5b290d0a119b5f63eb5b77591d5e8 Mon Sep 17 00:00:00 2001 From: Bell Eapen Date: Tue, 28 Jan 2025 02:00:53 +0000 Subject: [PATCH] style: clean up code formatting and improve consistency in string quotes --- src/fhiry/base_fhiry.py | 79 +++++++++++++++++------------- src/fhiry/bqsearch.py | 6 +-- src/fhiry/fhirndjson.py | 5 +- src/fhiry/fhirsearch.py | 27 ++++++----- src/fhiry/fhiry.py | 10 ++-- src/fhiry/flattenfhir.py | 102 ++++++++++++++++++++++++++------------- src/fhiry/parallel.py | 5 +- 7 files changed, 142 insertions(+), 92 deletions(-) diff --git a/src/fhiry/base_fhiry.py b/src/fhiry/base_fhiry.py index 85c9228..d64ecbf 100644 --- a/src/fhiry/base_fhiry.py +++ b/src/fhiry/base_fhiry.py @@ -15,6 +15,7 @@ def default_output_processor( ) -> str: return output + class BaseFhiry(object): def __init__(self, config_json=None): self._df = None @@ -33,12 +34,14 @@ def __init__(self, config_json=None): self._delete_col_raw_coding = True if config_json is not None: try: - with open(config_json, 'r') as f: # config_json is a file path + with open(config_json, "r") as f: # config_json is a file path self.config = json.load(f) except: - self.config = json.loads(config_json) # config_json is a json string + self.config = json.loads(config_json) # config_json is a json string else: - self.config = json.loads('{ "REMOVE": ["resource.text.div"], "RENAME": { "resource.id": "id" } }') + self.config = json.loads( + '{ "REMOVE": ["resource.text.div"], "RENAME": { "resource.id": "id" } }' + ) @property def df(self): @@ -53,15 +56,15 @@ def delete_col_raw_coding(self, delete_col_raw_coding): self._delete_col_raw_coding = delete_col_raw_coding def read_bundle_from_bundle_dict(self, bundle_dict): - return pd.json_normalize(bundle_dict['entry']) + return pd.json_normalize(bundle_dict["entry"]) def delete_unwanted_cols(self): - for col in self.config['REMOVE']: + for col in self.config["REMOVE"]: if col in self._df.columns: del self._df[col] def rename_cols(self): - self._df.rename(columns=self.config['RENAME'], inplace=True) + self._df.rename(columns=self.config["RENAME"], inplace=True) def process_df(self): self.delete_unwanted_cols() @@ -69,7 +72,6 @@ def process_df(self): self.add_patient_id() self.rename_cols() - def process_bundle_dict(self, bundle_dict): self._df = self.read_bundle_from_bundle_dict(bundle_dict) self.delete_unwanted_cols() @@ -79,44 +81,54 @@ def process_bundle_dict(self, bundle_dict): return self._df def convert_object_to_list(self): - """Convert object to a list of codes - """ + """Convert object to a list of codes""" for col in self._df.columns: - if 'coding' in col: - codes = self._df.apply( - lambda x: self.process_list(x[col]), axis=1) + if "coding" in col: + codes = self._df.apply(lambda x: self.process_list(x[col]), axis=1) self._df = pd.concat( - [self._df, codes.to_frame(name=col+'codes')], axis=1) + [self._df, codes.to_frame(name=col + "codes")], axis=1 + ) if self._delete_col_raw_coding: del self._df[col] - if 'display' in col: - codes = self._df.apply( - lambda x: self.process_list(x[col]), axis=1) + if "display" in col: + codes = self._df.apply(lambda x: self.process_list(x[col]), axis=1) self._df = pd.concat( - [self._df, codes.to_frame(name=col+'display')], axis=1) + [self._df, codes.to_frame(name=col + "display")], axis=1 + ) del self._df[col] def add_patient_id(self): - """Create a patientId column with the resource.id if a Patient resource or with the resource.subject.reference if other resource type - """ + """Create a patientId column with the resource.id if a Patient resource or with the resource.subject.reference if other resource type""" try: # PerformanceWarning: DataFrame is highly fragmented. This is usually the result of calling `frame.insert` many times, which has poor performance. Consider joining all columns at once using pd.concat(axis=1) instead. To get a de-fragmented frame, use `newframe = frame.copy()` newframe = self._df.copy() - newframe['patientId'] = self._df.apply(lambda x: x['resource.id'] if x['resource.resourceType'] - == 'Patient' else self.check_subject_reference(x), axis=1) + newframe["patientId"] = self._df.apply( + lambda x: ( + x["resource.id"] + if x["resource.resourceType"] == "Patient" + else self.check_subject_reference(x) + ), + axis=1, + ) self._df = newframe except: try: newframe = self._df.copy() - newframe['patientId'] = self._df.apply(lambda x: x['id'] if x['resourceType'] - == 'Patient' else self.check_subject_reference(x), axis=1) + newframe["patientId"] = self._df.apply( + lambda x: ( + x["id"] + if x["resourceType"] == "Patient" + else self.check_subject_reference(x) + ), + axis=1, + ) self._df = newframe except: pass def check_subject_reference(self, row): try: - return row['resource.subject.reference'].replace('Patient/', '') + return row["resource.subject.reference"].replace("Patient/", "") except: return "" @@ -137,10 +149,10 @@ def process_list(self, myList): myCodes = [] if isinstance(myList, list): for entry in myList: - if 'code' in entry: - myCodes.append(entry['code']) - elif 'display' in entry: - myCodes.append(entry['display']) + if "code" in entry: + myCodes.append(entry["code"]) + elif "display" in entry: + myCodes.append(entry["display"]) return myCodes def llm_query(self, query, llm, embed_model=None, verbose=True): @@ -177,12 +189,13 @@ def llm_query(self, query, llm, embed_model=None, verbose=True): else: embed_model = HuggingFaceEmbeddings(model_name=embed_model) service_context = ServiceContext.from_defaults( - llm=llm, - embed_model=embed_model, - ) + llm=llm, + embed_model=embed_model, + ) query_engine = PandasQueryEngine( df=self._df, service_context=service_context, output_processor=default_output_processor, - verbose=verbose) - return query_engine.query(query) \ No newline at end of file + verbose=verbose, + ) + return query_engine.query(query) diff --git a/src/fhiry/bqsearch.py b/src/fhiry/bqsearch.py index 92d4db2..38ea4a8 100644 --- a/src/fhiry/bqsearch.py +++ b/src/fhiry/bqsearch.py @@ -5,7 +5,6 @@ https://opensource.org/licenses/MIT """ - from google.cloud import bigquery from .base_fhiry import BaseFhiry @@ -18,7 +17,7 @@ def __init__(self, config_json=None): self._client = bigquery.Client() super().__init__(config_json=config_json) - def search(self, query = None): + def search(self, query=None): if query is None: _query = """ SELECT * @@ -27,7 +26,7 @@ def search(self, query = None): """ else: try: - with open(query, 'r') as f: + with open(query, "r") as f: _query = f.read() except: _query = query @@ -35,4 +34,3 @@ def search(self, query = None): self._df = self._client.query(_query).to_dataframe() super().process_df() return self._df - diff --git a/src/fhiry/fhirndjson.py b/src/fhiry/fhirndjson.py index 233c04c..d823676 100644 --- a/src/fhiry/fhirndjson.py +++ b/src/fhiry/fhirndjson.py @@ -5,13 +5,13 @@ https://opensource.org/licenses/MIT """ - import pandas as pd import json import os from .base_fhiry import BaseFhiry from tqdm import tqdm + class Fhirndjson(BaseFhiry): def __init__(self, config_json=None): self._folder = "" @@ -29,7 +29,6 @@ def folder(self): def folder(self, folder): self._folder = folder - def read_resource_from_line(self, line): return pd.json_normalize(json.loads(line)) @@ -52,5 +51,3 @@ def process_file(self, file): df = pd.concat([df, self._df]) self._df = df return self._df - - diff --git a/src/fhiry/fhirsearch.py b/src/fhiry/fhirsearch.py index fd42b94..00688b3 100644 --- a/src/fhiry/fhirsearch.py +++ b/src/fhiry/fhirsearch.py @@ -2,6 +2,7 @@ import requests from .base_fhiry import BaseFhiry + class Fhirsearch(BaseFhiry): def __init__(self, fhir_base_url, config_json=None): @@ -23,15 +24,20 @@ def search(self, resource_type="Patient", search_parameters={}): headers = {"Content-Type": "application/fhir+json"} - if '_count' not in search_parameters: - search_parameters['_count'] = self.page_size + if "_count" not in search_parameters: + search_parameters["_count"] = self.page_size - search_url = f'{self.fhir_base_url}/{resource_type}' - r = requests.get(search_url, params=search_parameters, headers=headers, **self.requests_kwargs) + search_url = f"{self.fhir_base_url}/{resource_type}" + r = requests.get( + search_url, + params=search_parameters, + headers=headers, + **self.requests_kwargs, + ) r.raise_for_status() bundle_dict = r.json() - if 'entry' in bundle_dict: + if "entry" in bundle_dict: df = super().process_bundle_dict(bundle_dict) next_page_url = get_next_page_url(bundle_dict) @@ -51,13 +57,12 @@ def search(self, resource_type="Patient", search_parameters={}): return self._df - def get_next_page_url(bundle_dict): - links = bundle_dict.get('link') + links = bundle_dict.get("link") if links: - for link in links: - relation = link.get('relation') - if relation == 'next': - return link.get('url') + for link in links: + relation = link.get("relation") + if relation == "next": + return link.get("url") return None diff --git a/src/fhiry/fhiry.py b/src/fhiry/fhiry.py index 3cb0bfa..68fc97a 100644 --- a/src/fhiry/fhiry.py +++ b/src/fhiry/fhiry.py @@ -14,6 +14,7 @@ logger = logging.getLogger(__name__) + class Fhiry(BaseFhiry): def __init__(self, config_json=None): self._filename = "" @@ -50,10 +51,10 @@ def delete_col_raw_coding(self, delete_col_raw_coding): self._delete_col_raw_coding = delete_col_raw_coding def read_bundle_from_file(self, filename): - with open(filename, encoding='utf8', mode='r') as f: + with open(filename, encoding="utf8", mode="r") as f: json_in = f.read() json_in = json.loads(json_in) - return pd.json_normalize(json_in['entry']) + return pd.json_normalize(json_in["entry"]) def process_source(self): """Read a single JSON resource or a directory full of JSON resources @@ -64,7 +65,8 @@ def process_source(self): for file in tqdm(os.listdir(self._folder)): if file.endswith(".json"): self._df = self.read_bundle_from_file( - os.path.join(self._folder, file)) + os.path.join(self._folder, file) + ) self.process_df() if df.empty: df = self._df @@ -84,5 +86,3 @@ def process_bundle_dict(self, bundle_dict): self._df = self.read_bundle_from_bundle_dict(bundle_dict) self.process_df() return self._df - - diff --git a/src/fhiry/flattenfhir.py b/src/fhiry/flattenfhir.py index 8330d87..2c505b6 100644 --- a/src/fhiry/flattenfhir.py +++ b/src/fhiry/flattenfhir.py @@ -4,6 +4,7 @@ This software is released under the MIT License. https://opensource.org/licenses/MIT """ + import datetime import logging from abc import ABC @@ -12,6 +13,7 @@ _logger = logging.getLogger(__name__) + class FlattenFhir(ABC): def __init__(self, fhirobject=None, config_json=None): @@ -46,9 +48,6 @@ def flatten(self): self.get_flattened_text(self._fhirobject) return self._flattened - - - def get_flattened_text(self, entry): if entry.resourceType == "Patient": self._flattened += self.flatten_patient(entry) @@ -91,12 +90,12 @@ def flatten_patient(self, patient) -> str: str: The flattened string representation of the patient object. """ flat_patient = "" - if 'gender' in patient: + if "gender" in patient: flat_patient += f"Medical record of a {patient.gender} patient " else: _logger.info(f"Gender not found for patient {patient.id}") flat_patient += "Medical record of a patient " - if 'birthDate' in patient: + if "birthDate" in patient: flat_patient += f"born {self.get_timeago(patient.birthDate)}. " else: _logger.info(f"Birthdate not found for patient {patient.id}") @@ -114,44 +113,63 @@ def flatten_observation(self, observation) -> str: str: The flattened string representation of the observation object. """ flat_observation = "" - if 'code' in observation: + if "code" in observation: _display = observation.code.coding[0] flat_observation += f"{_display['display']} " else: _logger.info(f"Code not found for observation {observation.id}") flat_observation += "Observation " - if 'effectiveDateTime' in observation: - flat_observation += f"recorded {self.get_timeago(observation.effectiveDateTime)} was " + if "effectiveDateTime" in observation: + flat_observation += ( + f"recorded {self.get_timeago(observation.effectiveDateTime)} was " + ) else: _logger.info(f"Effective date not found for observation {observation.id}") flat_observation += "of unknown date was " if "valueQuantity" in observation and "value" in observation.valueQuantity: flat_observation += f"Value: {observation.valueQuantity.value} " - if 'unit' in observation.valueQuantity: + if "unit" in observation.valueQuantity: flat_observation += f"{observation.valueQuantity.unit}. " elif "valueString" in observation: flat_observation += f"Value: {observation.valueString}. " elif "valueBoolean" in observation: flat_observation += f"Value: {observation.valueBoolean}. " - elif "valueRange" in observation and "low" in observation.valueRange and "high" in observation.valueRange: + elif ( + "valueRange" in observation + and "low" in observation.valueRange + and "high" in observation.valueRange + ): flat_observation += f"Value: {observation.valueRange.low.value} - {observation.valueRange.high.value} {observation.valueRange.low.unit}. " - elif "valueRatio" in observation and "numerator" in observation.valueRatio and "denominator" in observation.valueRatio: + elif ( + "valueRatio" in observation + and "numerator" in observation.valueRatio + and "denominator" in observation.valueRatio + ): flat_observation += f"Value: {observation.valueRatio.numerator.value} {observation.valueRatio.numerator.unit} / {observation.valueRatio.denominator.value} {observation.valueRatio.denominator.unit}. " - elif "valuePeriod" in observation and "start" in observation.valuePeriod and "end" in observation.valuePeriod: + elif ( + "valuePeriod" in observation + and "start" in observation.valuePeriod + and "end" in observation.valuePeriod + ): flat_observation += f"Value: {observation.valuePeriod.start} - {observation.valuePeriod.end}. " elif "valueDateTime" in observation and observation.valueDateTime != "": flat_observation += f"Value: {observation.valueDateTime}. " elif "valueTime" in observation and observation.valueTime != "": flat_observation += f"Value: {observation.valueTime}. " - elif "valueSampledData" in observation and "data" in observation.valueSampledData: + elif ( + "valueSampledData" in observation and "data" in observation.valueSampledData + ): flat_observation += f"Value: {observation.valueSampledData.data}. " else: _logger.info(f"Value not found for observation {observation.id}") flat_observation += "Value: unknown. " try: - if 'interpretation' in observation and 'coding' in observation.interpretation[0]: - if 'coding' in observation.interpretation[0]: - _text = observation.interpretation[0]['coding'][0] + if ( + "interpretation" in observation + and "coding" in observation.interpretation[0] + ): + if "coding" in observation.interpretation[0]: + _text = observation.interpretation[0]["coding"][0] flat_observation += f"Interpretation: {_text['display']}. " except: _logger.info(f"Interpretation not found for observation {observation.id}") @@ -169,12 +187,12 @@ def flatten_medication(self, medication) -> str: str: The flattened string representation of the medication object. """ flat_medication = "" - if 'code' in medication: + if "code" in medication: flat_medication += f"{medication.code.coding[0]['display']} " else: _logger.info(f"Code not found for medication {medication.id}") flat_medication += "Medication " - if 'status' in medication: + if "status" in medication: flat_medication += f"Status: {medication.status}. " else: _logger.info(f"Status not found for medication {medication.id}") @@ -192,14 +210,20 @@ def flatten_procedure(self, procedure) -> str: str: The flattened string representation of the procedure object. """ flat_procedure = "" - if 'code' in procedure and 'coding' in procedure.code and 'display' in procedure.code.coding[0]: + if ( + "code" in procedure + and "coding" in procedure.code + and "display" in procedure.code.coding[0] + ): flat_procedure += f"{procedure.code.coding[0]['display']} was " else: _logger.info(f"Code not found for procedure {procedure.id}") flat_procedure += "Procedure was" - if 'occurrenceDateTime' in procedure: - flat_procedure += f"{procedure.status} {self.get_timeago(procedure.occurrenceDateTime)}. " - elif 'occurrencePeriod' in procedure: + if "occurrenceDateTime" in procedure: + flat_procedure += ( + f"{procedure.status} {self.get_timeago(procedure.occurrenceDateTime)}. " + ) + elif "occurrencePeriod" in procedure: flat_procedure += f"{procedure.status} {self.get_timeago(procedure.occurrencePeriod.start)}. " else: _logger.info(f"Performed date not found for procedure {procedure.id}") @@ -217,13 +241,15 @@ def flatten_condition(self, condition) -> str: str: The flattened string representation of the condition object. """ flat_condition = "" - if 'code' in condition: + if "code" in condition: flat_condition += f"{condition.code.coding[0]['display']} " else: _logger.info(f"Code not found for condition {condition.id}") flat_condition += "Condition " if condition.onsetDateTime: - flat_condition += f"was diagnosed {self.get_timeago(condition.onsetDateTime)}. " + flat_condition += ( + f"was diagnosed {self.get_timeago(condition.onsetDateTime)}. " + ) else: _logger.info(f"Onset date not found for condition {condition.id}") flat_condition += "was diagnosed. " @@ -241,15 +267,19 @@ def flatten_allergyintolerance(self, allergyintolerance) -> str: """ flat_allergyintolerance = "" _display = allergyintolerance.code.coding[0] - if 'code' in allergyintolerance and 'display' in _display: + if "code" in allergyintolerance and "display" in _display: flat_allergyintolerance += f"{_display['display']} " else: - _logger.info(f"Code not found for allergyintolerance {allergyintolerance.id}") + _logger.info( + f"Code not found for allergyintolerance {allergyintolerance.id}" + ) flat_allergyintolerance += "AllergyIntolerance " - if 'onsetDateTime' in allergyintolerance: + if "onsetDateTime" in allergyintolerance: flat_allergyintolerance += f" allergy was reported on {self.get_timeago(allergyintolerance.onsetDateTime)}. " else: - _logger.info(f"Onset date not found for allergyintolerance {allergyintolerance.id}") + _logger.info( + f"Onset date not found for allergyintolerance {allergyintolerance.id}" + ) flat_allergyintolerance += "allergy reported. " return flat_allergyintolerance @@ -267,12 +297,18 @@ def flatten_documentreference(self, documentreference) -> str: for content in documentreference.content: content = Prodict.from_dict(content) if content.attachment.contentType == "text/plain": - flat_documentreference += f"{content.attachment.title}: {content.attachment.data}" + flat_documentreference += ( + f"{content.attachment.title}: {content.attachment.data}" + ) else: - _logger.info(f"Attachment for documentreference {documentreference.id} is not text/plain.") - if 'date' in documentreference: - flat_documentreference += f" was created {self.get_timeago(documentreference.date)}. " + _logger.info( + f"Attachment for documentreference {documentreference.id} is not text/plain." + ) + if "date" in documentreference: + flat_documentreference += ( + f" was created {self.get_timeago(documentreference.date)}. " + ) else: _logger.info(f"Date not found for documentreference {documentreference.id}") flat_documentreference += " was created. " - return flat_documentreference \ No newline at end of file + return flat_documentreference diff --git a/src/fhiry/parallel.py b/src/fhiry/parallel.py index 5b12bff..9f3a374 100644 --- a/src/fhiry/parallel.py +++ b/src/fhiry/parallel.py @@ -13,6 +13,7 @@ logger = logging.getLogger(__name__) + def process(folder, config_json=None): logger.info("CPU count: {}".format(mp.cpu_count())) pool = mp.Pool(mp.cpu_count()) @@ -21,7 +22,7 @@ def process(folder, config_json=None): if os.path.isdir(folder): for filename in os.listdir(folder): if filename.endswith(".json"): - filenames.append(folder + '/' + filename) + filenames.append(folder + "/" + filename) else: filenames.append(folder) @@ -39,7 +40,7 @@ def ndjson(folder, config_json=None): if os.path.isdir(folder): for filename in os.listdir(folder): if filename.endswith(".ndjson"): - filenames.append(folder + '/' + filename) + filenames.append(folder + "/" + filename) else: filenames.append(folder)