Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix bugs in HEAL ingest #13

Open
wants to merge 15 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions charts/dug-data-ingest/values/heal-ingest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,3 +5,5 @@ dataStorage: 200Mi
jobExecutor:
script: heal/ingest.sh
lakeFSRepository: heal-mds-import
image:
tag: latest
283 changes: 192 additions & 91 deletions scripts/heal/get_heal_platform_mds_data_dicts.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
#
# If no MDS endpoint is specified, we default to the production endpoint at https://healdata.org/mds/metadata
#
import csv
import json
import os
import re
Expand All @@ -26,6 +27,38 @@
logging.basicConfig(level=logging.INFO)


def translate_data_dictionary_field(field):
"""
Translate a data dictionary field into the internal format needed by generate_dbgap_files().

:param field: A dictionary representing a single field from the Platform MDS.
:return: A dictionary representing a single field from the Platform MDS in a standard format.
:raise ValueError: if we can't figure out the information in the input field.
"""

result = {}

if 'name' in field:
result['name'] = field['name']
elif 'property' in field:
result['name'] = field['property']
else:
raise ValueError(f"Unable to translate field {field}: missing name or property")

if 'title' in field:
result['title'] = field['title']

if 'description' in field:
result['description'] = field['description']

if 'section' in field:
result['section'] = field['section']
elif 'node' in field:
result['section'] = field['node']

return result


def download_from_mds(studies_dir, data_dicts_dir, studies_with_data_dicts_dir, mds_metadata_endpoint, mds_limit):
"""
Download all the studies and data dictionaries from the Platform MDS.
Expand Down Expand Up @@ -104,21 +137,25 @@ def download_from_mds(studies_dir, data_dicts_dir, studies_with_data_dicts_dir,
# download separately from the MDS.
data_dict_ids_within_studies = set()
for count, study_id in enumerate(studies_to_dds.keys()):
logging.debug(f"Adding data dictionaries to study {study_id} ({count + 1}/{len(studies_to_dds)})")

study_json = studies[study_id]
study_json['data_dictionaries'] = []

for dd in studies_to_dds[study_id]:
dd_id = dd['id']
dd_label = dd['label']

logging.info(f"Adding data dictionary to study {study_id} ({count + 1}/{len(studies_to_dds)}): {dd_id} ({dd_label})")

result = requests.get(mds_metadata_endpoint + '/' + dd_id)
if result.status_code == 404:
logging.warning(
f"Study {study_id} refers to data dictionary {dd_id}, but no such data dictionary was found in "
f"the MDS.")
result_json = {'error': result.json()}
result_json = {
'@id': dd_id,
'error': result.json(),
'fields': [],
}
elif not result.ok:
raise RuntimeError(f'Could not retrieve data dictionary {dd_id}: {result}')
else:
Expand All @@ -130,37 +167,43 @@ def download_from_mds(studies_dir, data_dicts_dir, studies_with_data_dicts_dir,

# Sometimes 'data_dictionary' is a list of fields, and sometimes it is a dictionary with a 'fields' field.
# We standardize so that the top-level 'fields' field is always a list of fields.
if "data_dictionary" in result_json and isinstance(result_json["data_dictionary"], list):
result_json["fields"] = result_json["data_dictionary"]
elif (
"data_dictionary" in result_json
and isinstance(result_json["data_dictionary"], dict)
and "fields" in result_json["data_dictionary"]
):
result_json["fields"] = list(
map(
lambda x: {"name": x["property"], "title": x["description"]},
result_json["data_dictionary"]["fields"],
try:
if "data_dictionary" in result_json and isinstance(result_json["data_dictionary"], list):
result_json["fields"] = result_json["data_dictionary"]
elif (
"data_dictionary" in result_json
and isinstance(result_json["data_dictionary"], dict)
and "fields" in result_json["data_dictionary"]
):
result_json["fields"] = list(
map(
translate_data_dictionary_field,
result_json["data_dictionary"]["fields"],
)
)
)
elif (
"data_dictionary" in result_json
and isinstance(result_json["data_dictionary"], dict)
and "data_dictionary" in result_json["data_dictionary"]
):
result_json["fields"] = list(
map(
lambda x: {"name": x["name"], "title": x.get("description", "NA")},
result_json["data_dictionary"]["data_dictionary"],
elif (
"data_dictionary" in result_json
and isinstance(result_json["data_dictionary"], dict)
and "data_dictionary" in result_json["data_dictionary"]
):
result_json["fields"] = list(
map(
translate_data_dictionary_field,
result_json["data_dictionary"]["data_dictionary"],
)
)
)
if (not dd_label or dd_label == "NA") and "title" in result_json[
"data_dictionary"
]:
result_json["label"] = result_json["data_dictionary"]["title"]
else:
logging.warning(
f"Could not determine fields for data dictionary {dd_id}: {result_json}"
if (not dd_label or dd_label == "NA") and "title" in result_json[
"data_dictionary"
]:
result_json["label"] = result_json["data_dictionary"]["title"]
else:
logging.error(
f"Could not determine fields for data dictionary {dd_id}, skipping: {result_json}"
)
result_json["fields"] = []
except ValueError as ve:
logging.error(
f"Could not determine fields for data dictionary {dd_id}, skipping: {ve}"
)
result_json["fields"] = []

Expand Down Expand Up @@ -213,6 +256,9 @@ def generate_dbgap_files(dbgap_dir, studies_with_data_dicts_dir):

dbgap_files_generated = set()

# Create a complete variable index for every variable we find.
all_variable_index = []

data_dict_files = os.listdir(studies_with_data_dicts_dir)
for data_dict_file in data_dict_files:
file_path = os.path.join(studies_with_data_dicts_dir, data_dict_file)
Expand Down Expand Up @@ -243,59 +289,83 @@ def generate_dbgap_files(dbgap_dir, studies_with_data_dicts_dir):
else:
raise RuntimeError(f"Could not read {file_path}: unknown format.")

# Begin writing a dbGaP file for each data dictionary.
# Prepare data table to write out.
data_table = ET.Element('data_table')

if 'gen3_discovery' not in study:
logging.error(f"No gen3_discovery field found in data dictionary file {file_path}, skipping.")
continue

# Every data dictionary from the HEAL Data Platform should have an ID, and the previous code should have
# stored it in the `@id` field in the data dictionary JSON file.
#
# There may also be a `label`, which is the key of the data dictionary in the study.
if '@id' in study['gen3_discovery']:
data_table.set('id', study['gen3_discovery']['@id'])
study_id = study['gen3_discovery']['@id']
else:
logging.warning(f"No identifier found in data dictionary file {file_path}")
study_name = study.get('gen3_discovery', {}).get('label') or study.get('gen3_discovery', {}).get('study_metadata',{}).get('minimal_info',{}).get('study_name')
if study_name:
data_table.set('study_name', study_name)
study_description = study.get('gen3_discovery', {}).get('study_metadata',{}).get('minimal_info',{}).get('study_description')
if study_description:
data_table.set('study_description', study_description)

# Determine the data_table study_id from the internal HEAL Data Platform (HDP) identifier.
if '_hdp_uid' in study['gen3_discovery']:
data_table.set('study_id', HDP_ID_PREFIX + study['gen3_discovery']['_hdp_uid'])
else:
logging.warning(f"No HDP ID found in data dictionary file {file_path}")

# Create a non-standard appl_id field just in case we need it later.
# This should be fine for now, but there is also a `comments` element that we can
# store information like this in if we need to.
if 'appl_id' in study['gen3_discovery']:
data_table.set('appl_id', study['gen3_discovery']['appl_id'])
else:
logging.warning(f"No APPL ID found in data dictionary file {file_path}")

# Determine the data_table date_created
if 'date_added' in study['gen3_discovery']:
data_table.set('date_created', study['gen3_discovery']['date_added'])
else:
logging.warning(f"No date_added found in data dictionary file {file_path}")

# A list of unique variable identifiers in this data dictionary file.
# If you need to make sure every variable from MDS is uniquely identified, you can move this set to the
# top-level of this file.
unique_variable_ids = set()

total_variable_count = 0
count_data_dictionaries = 0
for data_dict in data_dicts:
# A list of unique variable identifiers in this data dictionary file.
# If you need to make sure every variable from MDS is uniquely identified, you can move this set to the
# top-level of this file.
unique_variable_ids = set()

data_table = ET.Element('data_table')

if 'gen3_discovery' in study:
# Every data dictionary from the HEAL Data Platform should have an ID, and the previous code should have
# stored it in the `@id` field in the data dictionary JSON file.
#
# There may also be a `label`, which is the key of the data dictionary in the study.
if '@id' in study['gen3_discovery']:
data_table.set('id', study['gen3_discovery']['@id'])
else:
logging.warning(f"No identifier found in data dictionary file {file_path}")
study_name = study.get('gen3_discovery', {}).get('label') or study.get('gen3_discovery', {}).get('study_metadata',{}).get('minimal_info',{}).get('study_name')
if study_name:
data_table.set('study_name', study_name)
study_description = study.get('gen3_discovery', {}).get('study_metadata',{}).get('minimal_info',{}).get('study_description')
if study_description:
data_table.set('study_description', study_description)

# Determine the data_table study_id from the internal HEAL Data Platform (HDP) identifier.
if '_hdp_uid' in study['gen3_discovery']:
data_table.set('study_id', HDP_ID_PREFIX + study['gen3_discovery']['_hdp_uid'])
else:
logging.warning(f"No HDP ID found in data dictionary file {file_path}")

# Create a non-standard appl_id field just in case we need it later.
# This should be fine for now, but there is also a `comments` element that we can
# store information like this in if we need to.
if 'appl_id' in study['gen3_discovery']:
data_table.set('appl_id', study['gen3_discovery']['appl_id'])
else:
logging.warning(f"No APPL ID found in data dictionary file {file_path}")

# Determine the data_table date_created
if 'date_added' in study['gen3_discovery']:
data_table.set('date_created', study['gen3_discovery']['date_added'])
else:
logging.warning(f"No date_added found in data dictionary file {file_path}")

for var_dict in data_dict['fields']:
count_data_dictionaries += 1
variable_count = 0

# Check for data_dict['error']
if 'error' in data_dict:
logging.warning(f"Could not retrieve data dictionary {data_dict['@id']} from MDS: {data_dict['error']['detail']}")
continue

for var_dict in data_dict.get('fields', []):
total_variable_count += 1
variable_count += 1

variable_entry = {}

logging.debug(f"Generating dbGaP for variable {var_dict} in {file_path}")

# Retrieve the variable name.
variable = ET.SubElement(data_table, 'variable')

# Let's create a dd_id field for each variable, even if nobody supports it yet.
variable.set('dd_id', data_dict['@id'])
variable_entry['study_id'] = data_table.get('study_id')
variable_entry['dd_id'] = data_dict['@id']

# Make sure the variable ID is unique (by adding `_1`, `_2`, ... to the end of it).
name_or_node = var_dict.get('name', var_dict.get('node', ''))
name_or_node = var_dict.get('name', var_dict.get('property', ''))
var_name = name_or_node
variable_index = 0
while var_name in unique_variable_ids:
Expand All @@ -308,27 +378,39 @@ def generate_dbgap_files(dbgap_dir, studies_with_data_dicts_dir):

# Create a name element for the variable. We don't uniquify this field.
name = ET.SubElement(variable, 'name')
name.text = var_name
name.text = name_or_node

variable_entry['name'] = name_or_node

# Create a title element for the variable.
if 'title' in var_dict:
title = ET.SubElement(variable, 'title')
title.text = var_dict['title']
variable_entry['title'] = var_dict['title']

if 'description' in var_dict:
desc = ET.SubElement(variable, 'description')
desc.text = var_dict['description']
variable_entry['description'] = var_dict['description']

# Export the `module` field so that we can look for instruments.
# TODO: this is a custom field. Instead of this, we could export each data dictionary as a separate dbGaP
# file. Need to check to see what works better for Dug ingest.
if 'module' in var_dict:
variable.set('module', var_dict['module'])
if 'section' in var_dict:
variable.set('section', var_dict['section'])
variable_entry['section'] = var_dict['section']

# Add constraints.
if 'constraints' in var_dict:
# Check for minimum and maximum constraints.
if 'minimum' in var_dict['constraints']:
logical_min = ET.SubElement(variable, 'logical_min')
logical_min.text = str(var_dict['constraints']['minimum'])
variable_entry['logical_min'] = str(var_dict['constraints']['minimum'])
if 'maximum' in var_dict['constraints']:
logical_max = ET.SubElement(variable, 'logical_max')
logical_max.text = str(var_dict['constraints']['maximum'])
variable_entry['logical_max'] = str(var_dict['constraints']['maximum'])

# Determine a type for this variable.
typ = var_dict.get('type')
Expand All @@ -337,6 +419,7 @@ def generate_dbgap_files(dbgap_dir, studies_with_data_dicts_dir):
if typ:
type_element = ET.SubElement(variable, 'type')
type_element.text = typ
variable_entry['type'] = typ

# If there are encodings, we need to convert them into values.
if 'encodings' in var_dict:
Expand All @@ -358,18 +441,36 @@ def generate_dbgap_files(dbgap_dir, studies_with_data_dicts_dir):
value_element.set('code', key)
value_element.text = value

# Write out XML.
xml_str = ET.tostring(data_table, encoding='unicode')
pretty_xml_str = minidom.parseString(xml_str).toprettyxml()
variable_entry['encodings'] = "||".join(map(lambda x: f"{x[0]}={x[1]}", encs.items()))

all_variable_index.append(variable_entry)

logging.info(f"Added {variable_count} variables in data dictionary {data_dict['@id']} in {file_path} for study {study_name}.")

# Write out XML.
xml_str = ET.tostring(data_table, encoding='unicode')
pretty_xml_str = minidom.parseString(xml_str).toprettyxml()

# Produce the XML file by changing the .json to .xml.
output_xml_filename = os.path.join(dbgap_dir, data_dict_file.replace('.json', '.xml'))
with open(output_xml_filename, 'w') as f:
f.write(pretty_xml_str)
logging.info(f"Wrote {data_table} (containing {total_variable_count} variables from {count_data_dictionaries} data dictionaries) to {output_xml_filename}")

# Make a list of dbGaP files to report to the main program.
dbgap_files_generated.add(output_xml_filename)

# Write a full variable index to the output XML filename directory.
variable_index_filename = os.path.join(dbgap_dir, 'variable_index.csv')
with open(variable_index_filename, 'w') as f:
header = ['study_id', 'dd_id', 'name', 'section', 'title', 'description', 'type', 'encodings', 'logical_min', 'logical_max']

# Produce the XML file by changing the .json to .xml.
output_xml_filename = os.path.join(dbgap_dir, data_dict_file.replace('.json', '.xml'))
with open(output_xml_filename, 'w') as f:
f.write(pretty_xml_str)
logging.info(f"Writing {data_table} to {output_xml_filename}")
csv_writer = csv.DictWriter(f, fieldnames=header)
csv_writer.writeheader()
for row in all_variable_index:
csv_writer.writerow(row)

# Make a list of dbGaP files to report to the main program.
dbgap_files_generated.add(output_xml_filename)
logging.info(f"Wrote variable index of {len(all_variable_index)} variables to {variable_index_filename}.")

return dbgap_files_generated

Expand Down
Loading