-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathPatientStructure.py
338 lines (294 loc) · 17.5 KB
/
PatientStructure.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
import os
import numpy as np
import re
import pandas as pd
import logging
import nibabel as nib
from typing import List
from ..configuration_parser import ResourcesConfiguration
from ..utilities import input_file_category_disambiguation, get_type_from_enum_name
from .RadiologicalVolumeStructure import RadiologicalVolume
from .AnnotationStructure import Annotation, AnnotationClassType
from .RegistrationStructure import Registration
class PatientParameters:
_unique_id = None # Internal unique identifier for the patient
_input_filepath = None # Folder path containing all the data for the current patient.
_timestamps = {} # All timestamps for the current patient.
_radiological_volumes = {} # All radiological volume instances loaded for the current patient.
_annotation_volumes = {} # All Annotation instances loaded for the current patient.
_atlas_volumes = {} # All Atlas instances loaded for the current patient.
_registrations = {} # All registration transforms.
_reportings = {} # All clinical reports (if applicable).
def __init__(self, id: str, patient_filepath: str):
"""
"""
self.__reset()
self._unique_id = id
self._input_filepath = patient_filepath
if not patient_filepath or not os.path.exists(patient_filepath):
# Error case
return
self.__init_from_scratch()
def __reset(self):
"""
All objects share class or static variables.
An instance or non-static variables are different for different objects (every object has a copy).
"""
self._unique_id = None
self._input_filepath = None
self._timestamps = {}
self._radiological_volumes = {}
self._annotation_volumes = {}
self._atlas_volumes = {}
self._registrations = {}
self._reportings = {}
def __init_from_scratch(self):
"""
Iterating through the patient folder to identify the radiological volumes for each timestamp.
In case of stripped inputs (i.e., skull-stripped or lung-stripped), the corresponding mask should be created
for each input
"""
try:
timestamp_folders = []
for _, dirs, _ in os.walk(self._input_filepath):
for d in dirs:
timestamp_folders.append(d)
break
ts_folders_dict = {}
for i in timestamp_folders:
if re.search(r'\d+', i): # Skipping folders without an integer inside, otherwise assuming timestamps from 0 onwards
ts_folders_dict[int(re.search(r'\d+', i).group())] = i
ordered_ts_folders = dict(sorted(ts_folders_dict.items(), key=lambda item: item[0], reverse=False))
for i, ts in enumerate(list(ordered_ts_folders.keys())):
ts_folder = os.path.join(self._input_filepath, ordered_ts_folders[ts])
if ResourcesConfiguration.getInstance().caller == 'raidionics': # Specifics to cater to Raidionics
ts_folder = os.path.join(ts_folder, 'raw')
patient_files = []
timestamp_uid = "T" + str(i)
timestamp_instance = TimestampParameters(id=timestamp_uid, timestamp_filepath=ts_folder)
self._timestamps[timestamp_uid] = timestamp_instance
for _, _, files in os.walk(ts_folder):
for f in files:
if '.'.join(f.split('.')[1:]) in ResourcesConfiguration.getInstance().get_accepted_image_formats():
patient_files.append(f)
break
annotation_files = []
for f in patient_files:
file_content_type = input_file_category_disambiguation(os.path.join(ts_folder, f))
# Generating a unique id for the radiological volume
if file_content_type == "Volume":
base_data_uid = os.path.basename(f).strip().split('.')[0]
non_available_uid = True
while non_available_uid:
data_uid = 'V' + str(np.random.randint(0, 10000)) + '_' + base_data_uid
if data_uid not in list(self._radiological_volumes.keys()):
non_available_uid = False
self._radiological_volumes[data_uid] = RadiologicalVolume(uid=data_uid,
input_filename=os.path.join(ts_folder, f),
timestamp_uid=timestamp_uid)
elif file_content_type == "Annotation":
annotation_files.append(f)
# Iterating over the annotation files in a second time, when all the parent objects have been created
for f in annotation_files:
# Collecting the base name of the radiological volume, often before a label or annotation tag
base_name = os.path.basename(f).strip().split('.')[0].split('label')[0][:-1]
if ResourcesConfiguration.getInstance().caller == 'raidionics':
base_name = os.path.basename(f).strip().split('.')[0].split('annotation')[0][:-1]
parent_link = [base_name in x for x in list(self._radiological_volumes.keys())]
if True in parent_link:
parent_uid = list(self._radiological_volumes.keys())[parent_link.index(True)]
non_available_uid = True
while non_available_uid:
data_uid = 'A' + str(np.random.randint(0, 10000)) + '_' + base_name
if data_uid not in list(self._annotation_volumes.keys()):
non_available_uid = False
if ResourcesConfiguration.getInstance().caller == 'raidionics':
class_name = os.path.basename(f).strip().split('.')[0].split('annotation')[1][1:]
else:
class_name = os.path.basename(f).strip().split('.')[0].split('label')[1][1:]
self._annotation_volumes[data_uid] = Annotation(uid=data_uid,
input_filename=os.path.join(ts_folder, f),
output_folder=self._radiological_volumes[parent_uid].get_output_folder(),
radiological_volume_uid=parent_uid,
annotation_class=class_name)
else:
# Case where the annotation does not match any radiological volume, has to be left aside
pass
sequences_filename = os.path.join(self._input_filepath, 'mri_sequences.csv')
if os.path.exists(sequences_filename):
df = pd.read_csv(sequences_filename)
volume_basenames = list(df['File'].values)
for vn in volume_basenames:
volume_object = self.get_radiological_volume_by_base_filename(vn)
if volume_object:
volume_object.set_sequence_type(df.loc[df['File'] == vn]['MRI sequence'].values[0])
else:
logging.warning("[PatientStructure] Filename {} not matching any radiological volume volume.".format(vn))
# Setting up masks (i.e., brain or lungs) if stripped inputs are used.
if ResourcesConfiguration.getInstance().predictions_use_stripped_data:
target_type = AnnotationClassType.Brain if ResourcesConfiguration.getInstance().diagnosis_task == 'neuro_diagnosis' else AnnotationClassType.Lungs
for uid in self.get_all_radiological_volume_uids():
volume = self.get_radiological_volume(uid)
volume_nib = nib.load(volume.get_raw_input_filepath())
img_data = volume_nib.get_fdata()[:]
mask = np.zeros(img_data.shape)
mask[img_data != 0] = 1
mask_fn = os.path.join(volume.get_output_folder(),
os.path.basename(volume.get_raw_input_filepath()).split('.')[0] + '_label_' + str(target_type) + '.nii.gz')
nib.save(nib.Nifti1Image(mask, affine=volume_nib.affine), mask_fn)
non_available_uid = True
anno_uid = None
while non_available_uid:
anno_uid = 'A' + str(np.random.randint(0, 10000))
if anno_uid not in self.get_all_annotations_uids():
non_available_uid = False
self._annotation_volumes[anno_uid] = Annotation(uid=data_uid, input_filename=mask_fn,
output_folder=volume.get_output_folder(),
radiological_volume_uid=uid,
annotation_class=target_type)
except Exception as e:
raise ValueError("Patient structure setup from disk folder failed with: {}".format(e))
def include_annotation(self, anno_uid, annotation):
self._annotation_volumes[anno_uid] = annotation
def include_registration(self, reg_uid, registration):
self._registrations[reg_uid] = registration
def include_reporting(self, report_uid, report):
self._reportings[report_uid] = report
def get_input_from_json(self, input_json: dict):
"""
Automatic identifies and returns the proper structure instance based on the content of the input json dict.
"""
# Use-case where the input is in its original reference space
if input_json["space"]["timestamp"] == input_json["timestamp"] and \
input_json["space"]["sequence"] == input_json["sequence"]:
volume_uid = self.get_radiological_volume_uid(timestamp=input_json["timestamp"],
sequence=input_json["sequence"])
# Use-case where the input is actually an annotation and not a raw radiological volume
if input_json["labels"]:
annotation_type = get_type_from_enum_name(AnnotationClassType, input_json["labels"])
anno_uids = self.get_all_annotations_uids_class_radiological_volume(volume_uid=volume_uid,
annotation_class=annotation_type)
if len(anno_uids) != 0:
return self.get_annotation(anno_uids[0])
else:
raise ValueError("No annotation file existing for the specified json input with:\n {}.\n".format(input_json))
else:
return self.get_radiological_volume(volume_uid)
else: # The input is in a registered space
volume_uid = self.get_radiological_volume_uid(timestamp=input_json["timestamp"],
sequence=input_json["sequence"])
if volume_uid == "-1":
raise ValueError("No radiological volume for {}.".format(input_json))
ref_space_uid = self.get_radiological_volume_uid(timestamp=input_json["space"]["timestamp"],
sequence=input_json["space"]["sequence"])
if ref_space_uid == "-1" and input_json["space"]["timestamp"] != "-1":
raise ValueError("No radiological volume for {}.".format(input_json))
else: # @TODO. The reference space is an atlas, have to make an extra-pass for this.
pass
# Use-case where the input is actually an annotation and not a raw radiological volume
if input_json["labels"]:
annotation_type = get_type_from_enum_name(AnnotationClassType, input_json["labels"])
if annotation_type == -1:
raise ValueError("No radiological volume for {}.".format(input_json))
anno_uids = self.get_all_annotations_uids_class_radiological_volume(volume_uid=volume_uid,
annotation_class=annotation_type)
if len(anno_uids) == 0:
raise ValueError("No radiological volume for {}.".format(input_json))
anno_uid = anno_uids[0]
volume = self.get_annotation(annotation_uid=anno_uid).get_registered_volume_info(ref_space_uid)
input_fp = volume["filepath"]
if not os.path.exists(input_fp):
raise ValueError("No radiological volume for {}.".format(input_json))
else:
return volume
# Use-case where the provided inputs are already co-registered
elif ResourcesConfiguration.getInstance().predictions_use_registered_data:
volume = self.get_radiological_volume(volume_uid=volume_uid)
input_fp = volume.get_usable_input_filepath()
if not os.path.exists(input_fp):
raise ValueError("No radiological volume for {}.".format(input_json))
else:
return volume
else:
volume = self.get_radiological_volume(volume_uid=volume_uid).get_registered_volume_info(ref_space_uid)
reg_fp = volume["filepath"]
if not os.path.exists(reg_fp):
raise ValueError("No radiological volume for {}.".format(input_json))
else:
return volume
def get_all_radiological_volume_uids(self) -> List[str]:
return list(self._radiological_volumes.keys())
def get_radiological_volume_uid(self, timestamp: int, sequence: str) -> str:
for v in self._radiological_volumes.keys():
if self._radiological_volumes[v]._timestamp_id == "T" + str(timestamp) and str(self._radiological_volumes[v]._sequence_type) == sequence:
return v
return "-1"
def get_radiological_volume(self, volume_uid: str) -> RadiologicalVolume:
return self._radiological_volumes[volume_uid]
def get_radiological_volume_by_base_filename(self, base_fn: str):
result = None
for im in self._radiological_volumes:
if os.path.basename(self._radiological_volumes[im].get_usable_input_filepath()) == base_fn:
return self._radiological_volumes[im]
return result
def get_all_annotations_uids(self) -> List[str]:
return list(self._annotation_volumes.keys())
def get_annotation(self, annotation_uid: str) -> Annotation:
return self._annotation_volumes[annotation_uid]
def get_all_annotations_uids_radiological_volume(self, volume_uid: str) -> List[str]:
res = []
for v in self._annotation_volumes.keys():
if self._annotation_volumes[v]._radiological_volume_uid == volume_uid:
res.append(v)
return res
def get_all_annotations_uids_class_radiological_volume(self, volume_uid: str,
annotation_class: AnnotationClassType) -> List[str]:
res = []
for v in self._annotation_volumes.keys():
if self._annotation_volumes[v]._radiological_volume_uid == volume_uid and \
self._annotation_volumes[v]._annotation_type == annotation_class:
res.append(v)
return res
def get_registration_by_uids(self, fixed_uid: str, moving_uid: str) -> Registration:
registration = None
for r in list(self._registrations.keys()):
if self._registrations[r]._fixed_uid == fixed_uid and self._registrations[r]._moving_uid == moving_uid:
return self._registrations[r]
return registration
def get_registration_by_json(self, fixed: dict, moving: dict) -> Registration:
fixed_ts = fixed["timestamp"]
fixed_seq = fixed["sequence"]
moving_ts = moving["timestamp"]
moving_seq = moving["sequence"]
fixed_uid = None
moving_uid = None
if fixed_ts == -1:
fixed_uid = 'MNI'
else:
fixed_uid = self.get_radiological_volume_uid(fixed_ts, fixed_seq)
if moving_ts == -1:
moving_uid = 'MNI'
else:
moving_uid = self.get_radiological_volume_uid(moving_ts, moving_seq)
return self.get_registration_by_uids(fixed_uid=fixed_uid, moving_uid=moving_uid)
def get_all_reportings_uids(self) -> List[str]:
return list(self._reportings.keys())
class TimestampParameters:
_unique_id = None # Internal unique identifier for the patient
_input_filepath = None #
def __init__(self, id: str, timestamp_filepath: str):
"""
"""
self.__reset()
self._unique_id = id
self._input_filepath = timestamp_filepath
self.__init_from_scratch()
def __reset(self):
"""
All objects share class or static variables.
An instance or non-static variables are different for different objects (every object has a copy).
"""
self._unique_id = None
self._input_filepath = None
def __init_from_scratch(self):
pass