-
Notifications
You must be signed in to change notification settings - Fork 21
/
Copy pathARAX_expander.py
1555 lines (1419 loc) · 96.1 KB
/
ARAX_expander.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
#!/bin/env python3
import asyncio
import copy
import logging
import multiprocessing
import pickle
import sys
import os
import time
import traceback
from collections import defaultdict
from typing import List, Dict, Tuple, Union, Set, Optional
sys.path.append(os.path.dirname(os.path.abspath(__file__))) # ARAXQuery directory
from ARAX_response import ARAXResponse
from ARAX_decorator import ARAXDecorator
sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../") # code directory
from RTXConfiguration import RTXConfiguration
sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../BiolinkHelper/")
from biolink_helper import BiolinkHelper
sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/Expand/")
import expand_utilities as eu
from expand_utilities import QGOrganizedKnowledgeGraph
from kp_selector import KPSelector
sys.path.append(os.path.dirname(os.path.abspath(__file__))+"/../../UI/OpenAPI/python-flask-server/")
from openapi_server.models.knowledge_graph import KnowledgeGraph
from openapi_server.models.query_graph import QueryGraph
from openapi_server.models.q_edge import QEdge
from openapi_server.models.q_node import QNode
from openapi_server.models.edge import Edge
from openapi_server.models.attribute_constraint import AttributeConstraint
def eprint(*args, **kwargs): print(*args, file=sys.stderr, **kwargs)
def trim_to_size(input_list, length):
if input_list is None:
return None
if len(input_list) > length+1:
n_more = len(input_list) - length
output_list = input_list[:length]
output_list.append(f"+{n_more}")
return output_list
else:
return input_list
class ARAXExpander:
def __init__(self):
self.logger = logging.getLogger('log')
self.logger.setLevel(logging.INFO)
handler = logging.FileHandler(os.path.dirname(os.path.abspath(__file__)) + "/Expand/expand.log")
handler.setFormatter(logging.Formatter('%(asctime)s %(levelname)s: %(message)s'))
if self.logger.hasHandlers():
self.logger.handlers.clear()
self.logger.addHandler(handler)
self.bh = BiolinkHelper()
# Keep record of which constraints we support (format is: {constraint_id: {operator: {values}}})
self.supported_qnode_constraints = {"biolink:highest_FDA_approval_status": {"==": {"regular approval"}}}
self.supported_qedge_constraints = {"biolink:knowledge_source": {"==": "*"}}
def describe_me(self):
"""
Little helper function for internal use that describes the actions and what they can do. (Also used for
auto-documentation.)
:return:
"""
kp_selector = KPSelector()
all_kps = sorted(list(kp_selector.valid_kps)) # TODO: Should we include with any maturity here? any TRAPI version?
rtxc = RTXConfiguration()
command_definition = {
"dsl_command": "expand()",
"description": f"This command will expand (aka, answer/fill) your query graph in an edge-by-edge "
f"fashion, intelligently selecting which KPs to use for each edge. It selects KPs from the SmartAPI Registry "
f"based on the meta information provided by their TRAPI APIs (when available), whether they "
f"have an endpoint running a matching TRAPI version, and whether they have an endpoint with matching "
f"maturity. For each QEdge, it queries the selected KPs in parallel; it will "
f"timeout for a particular KP if it decides it's taking too long to respond. You may also "
f"optionally specify a particular KP to use via the 'kp' parameter (described below).\n\n"
f"Current candidate KPs include (for TRAPI {rtxc.trapi_major_version}, maturity '{rtxc.maturity}'): \n"
f"{', '.join(all_kps)}. \n"
f"\n(Note that this list of KPs may change unexpectedly based on the SmartAPI registry.)"
f"\n\n**Notes specific to usage of ARAX's internal KPs:**\n "
f"1. NGD: The 'infores:arax-normalized-google-distance' KP uses ARAX's in-house normalized "
f"google distance (NGD) database to expand "
"a query graph; it returns edges between nodes with an NGD value below a certain "
"threshold. This threshold is currently hardcoded as 0.5, though this will be made "
"configurable/smarter in the future.\n"
"2. DTD: The 'infores:arax-drug-treats-disease' KP uses ARAX's in-house drug-treats-disease (DTD) database (built from GraphSage model) to expand "
"a query graph; it returns edges between nodes with a DTD probability above a certain "
"threshold. The default threshold is currently set to 0.8. If you set this threshold below 0.8, you should also "
"set DTD_slow_mode=True otherwise a warninig will occur. This is because the current DTD database only stores the pre-calcualted "
"DTD probability above or equal to 0.8. Therefore, if an user set threshold below 0.8, it will automatically switch to call DTD model "
"to do a real-time calculation and this will be quite time-consuming. In addition, if you call DTD database, your query node type would be checked. "
"In other words, the query node has to have a sysnonym which is drug or disease. If you don't want to check node type, set DTD_slow_mode=true "
"to call DTD model to do a real-time calculation.",
"parameters": self.get_parameter_info_dict()
}
return [command_definition]
@staticmethod
def get_parameter_info_dict():
parameter_info_dict = {
"kp": {
"is_required": False,
"examples": ["infores:rtx-kg2, infores:spoke, infores:genetics-data-provider, infores:molepro"],
"type": "string",
"description": "The KP to ask for answers to the given query. KPs must be referred to by their"
" 'infores' curies."
},
"edge_key": {
"is_required": False,
"examples": ["e00", "[e00, e01]"],
"type": "string",
"description": "A query graph edge ID or list of such IDs to expand (default is to expand "
"entire query graph)."
},
"node_key": {
"is_required": False,
"examples": ["n00", "[n00, n01]"],
"type": "string",
"description": "A query graph node ID or list of such IDs to expand (default is to expand "
"entire query graph)."
},
"prune_threshold": {
"is_required": False,
"type": "integer",
"default": None,
"examples": [500, 2000],
"description": "The max number of nodes allowed to fulfill any intermediate QNode. Nodes in "
"excess of this threshold will be pruned, using Fisher Exact Test to rank answers."
},
"kp_timeout": {
"is_required": False,
"type": "integer",
"default": None,
"examples": [30, 120],
"description": "The number of seconds Expand will wait for a response from a KP before "
"cutting the query off and proceeding without results from that KP."
},
"return_minimal_metadata": {
"is_required": False,
"examples": ["true", "false"],
"type": "boolean",
"description": "Whether to omit supporting data on nodes/edges in the results (e.g., publications, "
"description, etc.)."
},
"DTD_threshold": {
"is_required": False,
"examples": [0.8, 0.5],
"min": 0,
"max": 1,
"default": 0.8,
"type": "float",
"description": "Applicable only when the 'infores:arax-drug-treats-disease' KP is used. "
"Defines what cut-off/threshold to use for expanding the DTD virtual edges."
},
"DTD_slow_mode": {
"is_required": False,
"examples": ["true", "false"],
"enum": ["true", "false", "True", "False", "t", "f", "T", "F"],
"default": "false",
"type": "boolean",
"description": "Applicable only when the 'infores:arax-drug-treats-disease' KP is used. "
"Specifies whether to call DTD model rather than DTD database to do a real-time "
"calculation for DTD probability."
}
}
return parameter_info_dict
def apply(self, response, input_parameters, mode: str = "ARAX"):
force_local = False # Flip this to make your machine act as the KG2 'API' (do not commit! for local use only)
message = response.envelope.message
# Initiate an empty knowledge graph if one doesn't already exist
if message.knowledge_graph is None:
message.knowledge_graph = KnowledgeGraph(nodes=dict(), edges=dict())
log = response
# this fetches the list of all registered kps with compatible versions
# TODO: really we don't want to fetch meta maps and stuff if we're in RTXKG2 mode... add way to turn that off?
kp_selector = KPSelector(log)
# If this is a query for the KG2 API, ignore all option_group_id and exclude properties (only does one-hop)
if mode == "RTXKG2":
log.debug(f"Ignoring all 'option_group_id' and 'exclude' properties on qnodes/qedges since we're in RTXKG2 mode")
for qnode in message.query_graph.nodes.values():
qnode.option_group_id = None
for qedge in message.query_graph.edges.values():
qedge.option_group_id = None
qedge.exclude = None
# We'll use a copy of the QG because we modify it for internal use within Expand
query_graph = copy.deepcopy(message.query_graph)
# Check for any self-qedges; ignore those that are 'subclass_of'
for qedge_key in set(query_graph.edges):
qedge = query_graph.edges[qedge_key]
if qedge.subject == qedge.object:
if qedge.predicates == ["biolink:subclass_of"]:
log.warning(f"Expand will ignore the 'subclass_of' self-qedge in your QG because KPs "
f"automatically take care of subclass reasoning")
del query_graph.edges[qedge_key]
else:
log.error(f"ARAX does not support queries with self-qedges (qedges whose subject == object)",
error_code="InvalidQG")
# Make sure the QG structure appears to be valid (cannot be disjoint, unless it consists only of qnodes)
required_portion_of_qg = eu.get_required_portion_of_qg(message.query_graph)
if required_portion_of_qg.edges and eu.qg_is_disconnected(required_portion_of_qg):
log.error(f"Required portion of QG is disconnected. This is not allowed.", error_code="InvalidQG")
return response
# Create global slots to store some info that needs to persist between expand() calls
if not hasattr(message, "encountered_kryptonite_edges_info"):
message.encountered_kryptonite_edges_info = dict()
# Basic checks on arguments
if not isinstance(input_parameters, dict):
log.error("Provided parameters is not a dict", error_code="ParametersNotDict")
return response
# Define a complete set of allowed parameters and their defaults (if the user specified a particular KP to use)
kp = input_parameters.get("kp")
if kp and kp not in kp_selector.valid_kps:
log.error(f"Invalid KP. Options are: {kp_selector.valid_kps}", error_code="InvalidKP")
return response
parameters = self._set_and_validate_parameters(kp, input_parameters, log)
# Check if at least one query node has a non-empty ids property
all_ids = [node.ids for node in message.query_graph.nodes.values()]
if not any(all_ids):
log.error("QueryGraph has no nodes with ids. At least one node must have a specified 'ids'", error_code="QueryGraphNoIds")
# Default to expanding the entire query graph if the user didn't specify what to expand
if not parameters['edge_key'] and not parameters['node_key']:
parameters['edge_key'] = list(message.query_graph.edges)
parameters['node_key'] = self._get_orphan_qnode_keys(message.query_graph)
# set timeout based on input parameters, it'll be used later
if parameters.get("kp_timeout"):
kp_timeout = parameters["kp_timeout"]
else:
kp_timeout = None
# Verify we understand all constraints
for qnode_key, qnode in query_graph.nodes.items():
if qnode.constraints:
for constraint in qnode.constraints:
if not self.is_supported_constraint(constraint, self.supported_qnode_constraints):
log.error(f"Unsupported constraint(s) detected on qnode {qnode_key}: \n{constraint}\n"
f"Don't know how to handle! Supported qnode constraints are: "
f"{self.supported_qnode_constraints}", error_code="UnsupportedConstraint")
for qedge_key, qedge in query_graph.edges.items():
if qedge.attribute_constraints:
for constraint in qedge.attribute_constraints:
if not self.is_supported_constraint(constraint, self.supported_qedge_constraints):
log.error(f"Unsupported constraint(s) detected on qedge {qedge_key}: \n{constraint}\n"
f"Don't know how to handle! Supported qedge constraints are: "
f"{self.supported_qedge_constraints}", error_code="UnsupportedConstraint")
if qedge.qualifier_constraints:
log.warning(f"Qualifier constraints are not yet supported! Will answer the query anyway, but "
f"qualifier constraints will not be respected.")
if response.status != 'OK':
return response
response.data['parameters'] = parameters
# Do the actual expansion
log.debug(f"Applying Expand to Message with parameters {parameters}")
input_qedge_keys = eu.convert_to_list(parameters['edge_key'])
input_qnode_keys = eu.convert_to_list(parameters['node_key'])
user_specified_kp = True if parameters['kp'] else False
# Convert message knowledge graph to format organized by QG keys, for faster processing
overarching_kg = eu.convert_standard_kg_to_qg_organized_kg(message.knowledge_graph)
# Add in any category equivalencies to the QG (e.g., protein == gene, since KPs handle these differently)
for qnode_key, qnode in query_graph.nodes.items():
if qnode.ids and not qnode.categories:
# Infer categories for expand's internal use (in KP selection and etc.)
qnode.categories = eu.get_preferred_categories(qnode.ids, log)
# remove all descendent categories of "biolink:ChemicalEntity" and replace them with "biolink:ChemicalEntity"
# This is so SPOKE will be correctly chosen as a KP for queries where a pinned qnode has a category which descends from ChemicalEntity. More info on Github issue1773
categories_set = set(qnode.categories)
chem_entity_descendents = set(self.bh.get_descendants("biolink:ChemicalEntity"))
filtered_categories = categories_set - chem_entity_descendents
if categories_set != filtered_categories:
filtered_categories.add("biolink:ChemicalEntity")
qnode.categories = list(filtered_categories)
log.debug(f"Inferred category for qnode {qnode_key} is {qnode.categories}")
elif not qnode.categories:
# Default to NamedThing if no category was specified
qnode.categories = [self.bh.get_root_category()]
qnode.categories = self.bh.add_conflations(qnode.categories)
# Make sure QG only uses canonical predicates
if mode != "RTXKG2":
log.debug(f"Making sure QG only uses canonical predicates")
qedge_keys = set(query_graph.edges)
for qedge_key in qedge_keys:
qedge = query_graph.edges[qedge_key]
if qedge.predicates:
# Convert predicates to their canonical form as needed/possible
qedge_predicates = set(qedge.predicates)
symmetric_predicates = {predicate for predicate in qedge_predicates
if self.bh.is_symmetric(predicate)}
asymmetric_predicates = qedge_predicates.difference(symmetric_predicates)
canonical_predicates = set(self.bh.get_canonical_predicates(qedge.predicates))
if canonical_predicates != qedge_predicates:
asymmetric_non_canonical = asymmetric_predicates.difference(canonical_predicates)
asymmetric_canonical = asymmetric_predicates.intersection(canonical_predicates)
symmetric_non_canonical = symmetric_predicates.difference(canonical_predicates)
if symmetric_non_canonical:
# Switch to canonical predicates, but no need to flip the qedge since they're symmetric
log.debug(f"Converting symmetric predicates {symmetric_non_canonical} on {qedge_key} to "
f"their canonical forms.")
converted_symmetric = self.bh.get_canonical_predicates(symmetric_non_canonical)
qedge.predicates = list(qedge_predicates.difference(symmetric_non_canonical).union(converted_symmetric))
if asymmetric_non_canonical and asymmetric_canonical:
log.error(f"Qedge {qedge_key} has asymmetric predicates in both canonical and non-canonical"
f" forms, which isn't allowed. Non-canonical asymmetric predicates are: "
f"{asymmetric_non_canonical}", error_code="InvalidPredicates")
elif asymmetric_non_canonical:
# Flip the qedge in this case (OK to do since only other predicates are symmetric)
log.debug(f"Converting {qedge_key}'s asymmetric non-canonical predicates to canonical "
f"form; requires flipping the qedge, but this is OK since there are no "
f"asymmetric canonical predicates on this qedge.")
converted_asymmetric = self.bh.get_canonical_predicates(asymmetric_non_canonical)
final_predicates = set(qedge.predicates).difference(asymmetric_non_canonical).union(converted_asymmetric)
eu.flip_qedge(qedge, list(final_predicates))
# Handle special situation where user entered treats edge in wrong direction
if qedge.predicates == ["biolink:treats"]:
subject_qnode = query_graph.nodes[qedge.subject]
if "biolink:Disease" in self.bh.get_descendants(subject_qnode.categories):
log.warning(f"{qedge_key} seems to be pointing in the wrong direction (you have "
f"(disease-like node)-[treats]->(something)). Will flip this qedge.")
eu.flip_qedge(qedge, qedge.predicates)
else:
# Default to related_to if no predicate was specified
qedge.predicates = [self.bh.get_root_predicate()]
# Send ARAXInfer any one-hop, "inferred", "treats" queries (temporary way of making creative mode work)
inferred_qedge_keys = [qedge_key for qedge_key, qedge in query_graph.edges.items() if qedge.knowledge_type == "inferred"]
if inferred_qedge_keys:
if len(query_graph.edges) == 1:
inferred_qedge_key = inferred_qedge_keys[0]
qedge = query_graph.edges[inferred_qedge_key]
# Figure out if this is a "treats" query
treats_ancestors = self.bh.get_ancestors("biolink:treats")
if set(treats_ancestors).intersection(set(qedge.predicates)):
# Call XDTD and simply return whatever it returns
# Get the subject of this edge
subject_qnode = query_graph.nodes[qedge.subject] # drug
object_qnode = query_graph.nodes[qedge.object] # disease
if object_qnode.ids and len(object_qnode.ids) >= 1:
object_curie = object_qnode.ids[0] #FIXME: will need a way to handle multiple IDs
else:
log.error(f"No CURIEs found for {object_qnode.name}", error_code="NoCURIEs")
#raise Exception(f"No CURIEs found for {object_qnode.name}")
return
log.info(f"Calling XDTD from Expand for qedge {inferred_qedge_key} (has knowledge_type == inferred) and the subject is {object_curie}")
from ARAX_infer import ARAXInfer
infer_input_parameters = {"action": "drug_treatment_graph_expansion",'node_curie': object_curie, 'qedge_id': inferred_qedge_key}
inferer = ARAXInfer()
infer_response = inferer.apply(response, infer_input_parameters)
return infer_response
else:
log.info(f"Qedge {inferred_qedge_key} has knowledge_type == inferred, but the query is not "
f"DTD-related. Will answer using the normal 'fill' strategy (not creative mode).")
else:
log.warning(f"Expand does not yet know how to answer multi-qedge query graphs when one or more of "
f"the qedges has knowledge_type == inferred. Will answer using the normal 'fill' strategy "
f"(not creative mode).")
# Expand any specified edges
if input_qedge_keys:
query_sub_graph = self._extract_query_subgraph(input_qedge_keys, query_graph, log)
if log.status != 'OK':
return response
log.debug(f"Query graph for this Expand() call is: {query_sub_graph.to_dict()}")
ordered_qedge_keys_to_expand = self._get_order_to_expand_qedges_in(query_sub_graph, log)
# Pre-populate the query plan with an entry for each qedge that will be expanded in this Expand() call
for qedge_key in ordered_qedge_keys_to_expand:
qedge = query_sub_graph.edges[qedge_key]
response.update_query_plan(qedge_key, 'edge_properties', 'status', 'Waiting')
subject_qnode = query_sub_graph.nodes[qedge.subject]
object_qnode = query_sub_graph.nodes[qedge.object]
subject_details = subject_qnode.ids if subject_qnode.ids else subject_qnode.categories
object_details = object_qnode.ids if object_qnode.ids else object_qnode.categories
subject_details = trim_to_size(subject_details, 5)
object_details = trim_to_size(object_details, 5)
predicate_details = trim_to_size(qedge.predicates, 5)
response.update_query_plan(qedge_key, 'edge_properties', 'subject', subject_details)
response.update_query_plan(qedge_key, 'edge_properties', 'object', object_details)
response.update_query_plan(qedge_key, 'edge_properties', 'predicate', predicate_details)
for kp in kp_selector.valid_kps:
response.update_query_plan(qedge_key, kp, 'Waiting', f'Waiting for processing of {qedge_key} to begin')
# Expand the query graph edge-by-edge
for qedge_key in ordered_qedge_keys_to_expand:
log.debug(f"Expanding qedge {qedge_key}")
response.update_query_plan(qedge_key, 'edge_properties', 'status', 'Expanding')
for kp in kp_selector.valid_kps:
response.update_query_plan(qedge_key, kp, 'Waiting', 'Prepping query to send to KP')
message.query_graph.edges[qedge_key].filled = True # Mark as expanded in overarching QG #1848
qedge = query_graph.edges[qedge_key]
qedge.filled = True # Also mark as expanded in local QG #1848
# Create a query graph for this edge (that uses curies found in prior steps)
one_hop_qg = self._get_query_graph_for_edge(qedge_key, query_graph, overarching_kg, log)
# Figure out the prune threshold (use what user provided or otherwise do something intelligent)
if parameters.get("prune_threshold"):
pre_prune_threshold = parameters["prune_threshold"]
else:
pre_prune_threshold = self._get_prune_threshold(one_hop_qg)
# Prune back any nodes with more than the specified max of answers
if mode != "RTXKG2":
log.debug(f"For {qedge_key}, pre-prune threshold is {pre_prune_threshold}")
fulfilled_qnode_keys = set(one_hop_qg.nodes).intersection(set(overarching_kg.nodes_by_qg_id))
for qnode_key in fulfilled_qnode_keys:
num_kg_nodes = len(overarching_kg.nodes_by_qg_id[qnode_key])
if num_kg_nodes > pre_prune_threshold:
overarching_kg = self._prune_kg(qnode_key, pre_prune_threshold, overarching_kg, query_graph, log)
# Re-formulate the QG for this edge now that the KG has been slimmed down
one_hop_qg = self._get_query_graph_for_edge(qedge_key, query_graph, overarching_kg, log)
if log.status != 'OK':
return response
# Figure out which KPs would be best to expand this edge with (if no KP was specified)
if not user_specified_kp:
queriable_kps = set(kp_selector.get_kps_for_single_hop_qg(one_hop_qg))
# remove kps if this edge has kp constraints
allowlist, denylist = eu.get_knowledge_source_constraints(qedge)
kps_to_query = queriable_kps - denylist
if allowlist:
kps_to_query = {kp for kp in kps_to_query if kp in allowlist}
for skipped_kp in queriable_kps.difference(kps_to_query):
skipped_message = "This KP was constrained by this edge"
response.update_query_plan(qedge_key, skipped_kp, "Skipped", skipped_message)
log.info(f"The KPs Expand decided to answer {qedge_key} with are: {kps_to_query}")
else:
kps_to_query = {parameters["kp"]}
for kp in kp_selector.valid_kps.difference(kps_to_query):
skipped_message = f"Expand was told to use {', '.join(kps_to_query)}"
response.update_query_plan(qedge_key, kp, "Skipped", skipped_message)
kps_to_query = list(kps_to_query)
use_asyncio = True # Flip this to False if you want to use multiprocessing instead
# Use a non-concurrent method to expand with KG2 when bypassing the KG2 API
if kps_to_query == ["infores:rtx-kg2"] and mode == "RTXKG2":
kp_answers = [self._expand_edge_kg2_local(one_hop_qg, log)]
# Otherwise concurrently send this query to each KP selected to answer it
elif kps_to_query:
if use_asyncio:
kps_to_query = eu.sort_kps_for_asyncio(kps_to_query, log)
log.debug(f"Will use asyncio to run KP queries concurrently")
loop = asyncio.new_event_loop() # Need to create NEW event loop for threaded environments
asyncio.set_event_loop(loop)
tasks = [self._expand_edge_async(one_hop_qg, kp_to_use, input_parameters, user_specified_kp,
kp_timeout, force_local,
kp_selector, log, multiple_kps=True)
for kp_to_use in kps_to_query]
task_group = asyncio.gather(*tasks)
kp_answers = loop.run_until_complete(task_group)
loop.close()
else:
# Use multiprocessing (which forks behind the scenes) TODO: Delete once fully commit to asyncio
log.debug(f"Will use multiprocessing to run KP queries in parallel")
for kp in kps_to_query:
num_input_curies = max([len(eu.convert_to_list(qnode.ids)) for qnode in one_hop_qg.nodes.values()])
waiting_message = f"Query with {num_input_curies} curies sent: waiting for response"
response.update_query_plan(qedge_key, kp, "Waiting", waiting_message)
log.debug(f"Waiting for all KP processes to finish..")
empty_log = ARAXResponse() # We'll have to merge processes' logs together afterwards
self.logger.info(f"PID {os.getpid()}: BEFORE pool: About to create {len(kps_to_query)} child processes from {multiprocessing.current_process()}")
with multiprocessing.Pool(len(kps_to_query)) as pool:
kp_answers = pool.starmap(self._expand_edge, [[one_hop_qg, kp_to_use, input_parameters,
user_specified_kp, kp_timeout, force_local,
kp_selector, empty_log, True]
for kp_to_use in kps_to_query])
self.logger.info(f"PID {os.getpid()}: AFTER pool: Pool of {len(kps_to_query)} processes is done, back in {multiprocessing.current_process()}")
# Merge the processes' individual logs and update the query plan
for index, response_tuple in enumerate(kp_answers):
kp = kps_to_query[index]
answer_kg = response_tuple[0]
kp_log = response_tuple[1]
wait_time = kp_log.wait_time if hasattr(kp_log, "wait_time") else "unknown"
# Update the query plan with KPs' results
if kp_log.status == 'OK':
if hasattr(kp_log, "timed_out"):
timeout_message = f"Query timed out after {kp_log.timed_out} seconds"
response.update_query_plan(qedge_key, kp, "Timed out", timeout_message)
elif hasattr(kp_log, "http_error"):
error_message = f"Returned error {kp_log.http_error} after {wait_time} seconds"
response.update_query_plan(qedge_key, kp, "Error", error_message)
else:
done_message = f"Query returned {len(answer_kg.edges_by_qg_id.get(qedge_key, dict()))} " \
f"edges in {wait_time} seconds"
response.update_query_plan(qedge_key, kp, "Done", done_message)
else:
response.update_query_plan(qedge_key, kp, "Error",
f"Process returned error {kp_log.status}")
# Merge KP logs as needed, since processes can't share the main log
if len(kps_to_query) > 1 and kp_log.status != 'OK':
kp_log.status = 'OK' # We don't want to halt just because one KP reported an error #1500
log.merge(kp_log)
if response.status != 'OK':
return response
else:
log.error(f"Expand could not find any KPs to answer {qedge_key} with.", error_code="NoResults")
return response
# Merge KPs' answers into our overarching KG
log.debug(f"Got answers from all KPs; merging them into one KG")
for index, response_tuple in enumerate(kp_answers):
answer_kg = response_tuple[0]
# Store any kryptonite edge answers as needed
if mode != "RTXKG2" and qedge.exclude and not answer_kg.is_empty():
self._store_kryptonite_edge_info(answer_kg, qedge_key, message.query_graph,
message.encountered_kryptonite_edges_info, response)
# Otherwise just merge the answer into the overarching KG
else:
self._merge_answer_into_message_kg(answer_kg, overarching_kg, message.query_graph, query_graph, mode, response)
if response.status != 'OK':
return response
log.debug(f"After merging KPs' answers, total KG counts are: {eu.get_printable_counts_by_qg_id(overarching_kg)}")
# Handle any constraints for this qedge and/or its qnodes (that require post-filtering)
qnode_keys = {qedge.subject, qedge.object}
qnode_keys_with_answers = qnode_keys.intersection(set(overarching_kg.nodes_by_qg_id))
for qnode_key in qnode_keys_with_answers:
qnode = query_graph.nodes[qnode_key]
if qnode.constraints:
for constraint in qnode.constraints:
if constraint.id == "biolink:highest_FDA_approval_status" and constraint.operator == "==" and constraint.value == "regular approval":
log.info(f"Applying qnode {qnode_key} constraint: {'NOT ' if constraint._not else ''}"
f"biolink:highest_FDA_approval_status == regular approval")
fda_approved_drug_ids = self._load_fda_approved_drug_ids()
answer_node_ids = set(overarching_kg.nodes_by_qg_id[qnode_key])
if constraint._not:
nodes_to_remove = answer_node_ids.intersection(fda_approved_drug_ids)
else:
nodes_to_remove = answer_node_ids.difference(fda_approved_drug_ids)
log.debug(f"Removing {len(nodes_to_remove)} nodes fulfilling {qnode_key} for FDA "
f"approval constraint ({round((len(nodes_to_remove) / len(answer_node_ids)) * 100)}%)")
overarching_kg.remove_nodes(nodes_to_remove, qnode_key, query_graph)
# Handle knowledge source constraints for this qedge
# Removing kedges that have any sources that are constrained
allowlist, denylist = eu.get_knowledge_source_constraints(qedge)
if qedge_key in overarching_kg.edges_by_qg_id:
kedges_to_remove = []
for kedge_key, kedge in overarching_kg.edges_by_qg_id[qedge_key].items():
for attribute in kedge.attributes:
# check if source(s) of knowledge_source attribute are constrained
if attribute.attribute_type_id in ["biolink:aggregator_knowledge_source","biolink:knowledge_source"]:
sources = attribute.value
# always accept arax as a source
if sources == "infores:arax" or sources == ["infores:arax"]:
continue
# handle cases where source is string or list
if type(sources) == str:
sources = [sources]
if any(source in denylist for source in sources):
kedges_to_remove.append(kedge_key)
break
elif allowlist and any(source not in allowlist for source in sources):
kedges_to_remove.append(kedge_key)
break
# remove kedges which have been determined to be constrained
for kedge_key in kedges_to_remove:
if kedge_key in overarching_kg.edges_by_qg_id[qedge_key]:
del overarching_kg.edges_by_qg_id[qedge_key][kedge_key]
if mode != "RTXKG2":
# Apply any kryptonite ("not") qedges
self._apply_any_kryptonite_edges(overarching_kg, message.query_graph,
message.encountered_kryptonite_edges_info, response)
# Remove any paths that are now dead-ends
overarching_kg = self._remove_dead_end_paths(query_graph, overarching_kg, response)
if response.status != 'OK':
return response
# Declare that we are done expanding this qedge
response.update_query_plan(qedge_key, 'edge_properties', 'status', 'Done')
# Make sure we found at least SOME answers for this edge
if not eu.qg_is_fulfilled(one_hop_qg, overarching_kg) and not qedge.exclude and not qedge.option_group_id:
log.warning(f"No paths were found in {kps_to_query} satisfying qedge {qedge_key}")
return response
# Expand any specified nodes
if input_qnode_keys:
kp_to_use = parameters["kp"] if user_specified_kp else "infores:rtx-kg2" # Only KG2 does single-node queries
for qnode_key in input_qnode_keys:
answer_kg = self._expand_node(qnode_key, kp_to_use, query_graph, mode, user_specified_kp, kp_timeout,
force_local, log)
if log.status != 'OK':
return response
self._merge_answer_into_message_kg(answer_kg, overarching_kg, message.query_graph, query_graph, mode, log)
if log.status != 'OK':
return response
# Get rid of any lingering expand-added subclass self-qedges that are no longer relevant (edges pruned)
all_qedge_keys = set(message.query_graph.edges)
for qedge_key in all_qedge_keys:
if not overarching_kg.edges_by_qg_id.get(qedge_key) and eu.is_expand_created_subclass_qedge_key(qedge_key, message.query_graph):
log.debug(f"Deleting {qedge_key} from the QG because no edges fulfill it anymore")
del message.query_graph.edges[qedge_key]
# Convert message knowledge graph back to standard TRAPI
message.knowledge_graph = eu.convert_qg_organized_kg_to_standard_kg(overarching_kg)
# Override node types so that they match what was asked for in the query graph (where applicable) #987
self._override_node_categories(message.knowledge_graph, message.query_graph)
# Decorate all nodes with additional attributes info from KG2c if requested (iri, description, etc.)
if mode != "RTXKG2" or not parameters.get("return_minimal_metadata"):
decorator = ARAXDecorator()
decorator.decorate_nodes(response)
decorator.decorate_edges(response, kind="RTX-KG2")
# Map canonical curies back to the input curies in the QG (where applicable) #1622
self._map_back_to_input_curies(message.knowledge_graph, query_graph, log)
# Return the response and done
kg = message.knowledge_graph
log.info(f"After Expand, the KG has {len(kg.nodes)} nodes and {len(kg.edges)} edges "
f"({eu.get_printable_counts_by_qg_id(overarching_kg)})")
return response
async def _expand_edge_async(self, edge_qg: QueryGraph, kp_to_use: str, input_parameters: Dict[str, any],
user_specified_kp: bool, kp_timeout: Optional[int], force_local: bool,
kp_selector: KPSelector, log: ARAXResponse, multiple_kps: bool = False) -> Tuple[QGOrganizedKnowledgeGraph, ARAXResponse]:
# This function answers a single-edge (one-hop) query using the specified knowledge provider
qedge_key = next(qedge_key for qedge_key in edge_qg.edges)
log.info(f"Expanding qedge {qedge_key} using {kp_to_use}")
answer_kg = QGOrganizedKnowledgeGraph()
# Make sure we have all default parameters set specific to the KP we'll be using
log.data["parameters"] = self._set_and_validate_parameters(kp_to_use, input_parameters, log)
# Make sure at least one of the qnodes has a curie specified
if not any(qnode for qnode in edge_qg.nodes.values() if qnode.ids):
log.error(f"Cannot expand an edge for which neither end has any curies. (Could not find curies to use from "
f"a prior expand step, and neither qnode has a curie specified.)", error_code="InvalidQuery")
return answer_kg, log
# Make sure the specified KP is a valid option
allowable_kps = kp_selector.valid_kps
if kp_to_use not in allowable_kps:
log.error(f"Invalid knowledge provider: {kp_to_use}. Valid options are {', '.join(allowable_kps)}",
error_code="InvalidKP")
return answer_kg, log
# Route this query to the proper place depending on the KP
try:
use_custom_querier = kp_to_use in {'infores:arax-drug-treats-disease', 'infores:arax-normalized-google-distance'}
if use_custom_querier:
num_input_curies = max([len(eu.convert_to_list(qnode.ids)) for qnode in edge_qg.nodes.values()])
waiting_message = f"Query with {num_input_curies} curies sent: waiting for response"
log.update_query_plan(qedge_key, kp_to_use, "Waiting", waiting_message)
start = time.time()
if kp_to_use == 'infores:arax-drug-treats-disease':
from Expand.DTD_querier import DTDQuerier
kp_querier = DTDQuerier(log)
else:
from Expand.ngd_querier import NGDQuerier
kp_querier = NGDQuerier(log)
answer_kg = kp_querier.answer_one_hop_query(edge_qg)
wait_time = round(time.time() - start)
if log.status == 'OK':
done_message = f"Returned {len(answer_kg.edges_by_qg_id.get(qedge_key, dict()))} edges in {wait_time} seconds"
log.update_query_plan(qedge_key, kp_to_use, "Done", done_message)
else:
log.update_query_plan(qedge_key, kp_to_use, "Error", f"Process error-ed out with {log.status} after {wait_time} seconds")
return answer_kg, log
else:
# This is a general purpose querier for use with any KPs that we query via their TRAPI API
from Expand.trapi_querier import TRAPIQuerier
kp_querier = TRAPIQuerier(response_object=log,
kp_name=kp_to_use,
user_specified_kp=user_specified_kp,
kp_timeout=kp_timeout,
kp_selector=kp_selector,
force_local=force_local)
answer_kg = await kp_querier.answer_one_hop_query_async(edge_qg)
except Exception:
tb = traceback.format_exc()
error_type, error, _ = sys.exc_info()
if user_specified_kp:
log.error(f"An uncaught error was thrown while trying to Expand using {kp_to_use}. Error was: {tb}",
error_code=f"UncaughtError")
else:
log.warning(f"An uncaught error was thrown while trying to Expand using {kp_to_use}, so I couldn't "
f"get answers from that KP. Error was: {tb}")
log.update_query_plan(qedge_key, kp_to_use, "Error", f"Process error-ed out with {log.status}")
return QGOrganizedKnowledgeGraph(), log
if log.status != 'OK':
if multiple_kps:
log.status = 'OK' # We don't want to halt just because one KP reported an error #1500
return answer_kg, log
log.info(f"{kp_to_use}: Query for edge {qedge_key} completed ({eu.get_printable_counts_by_qg_id(answer_kg)})")
# Do some post-processing (deduplicate nodes, remove self-edges..)
if kp_to_use != 'infores:rtx-kg2': # KG2c is already deduplicated and uses canonical predicates
answer_kg = eu.check_for_canonical_predicates(answer_kg, kp_to_use, log)
answer_kg = self._deduplicate_nodes(answer_kg, kp_to_use, log)
if any(edges for edges in answer_kg.edges_by_qg_id.values()): # Make sure the KP actually returned something
answer_kg = self._remove_self_edges(answer_kg, kp_to_use, log)
return answer_kg, log
def _expand_edge_kg2_local(self, one_hop_qg: QueryGraph, log: ARAXResponse) -> Tuple[QGOrganizedKnowledgeGraph, ARAXResponse]:
qedge_key = next(qedge_key for qedge_key in one_hop_qg.edges)
log.debug(f"Expanding {qedge_key} by querying Plover directly")
answer_kg = QGOrganizedKnowledgeGraph()
from Expand.kg2_querier import KG2Querier
kg2_querier = KG2Querier(log)
try:
answer_kg = kg2_querier.answer_one_hop_query(one_hop_qg)
except Exception:
tb = traceback.format_exc()
error_type, error, _ = sys.exc_info()
log.error(f"An uncaught error was thrown while trying to Expand using infores:rtx-kg2 (local). Error was: {tb}",
error_code=f"UncaughtError")
if log.status != 'OK':
return answer_kg, log
if any(edges for edges in answer_kg.edges_by_qg_id.values()): # Make sure the KP actually returned something
answer_kg = self._remove_self_edges(answer_kg, "infores:rtx-kg2", log)
return answer_kg, log
def _expand_edge(self, edge_qg: QueryGraph, kp_to_use: str, input_parameters: Dict[str, any],
user_specified_kp: bool, kp_timeout: Optional[int], force_local: bool, kp_selector: KPSelector,
log: ARAXResponse, multiprocessed: bool = False) -> Tuple[QGOrganizedKnowledgeGraph, ARAXResponse]:
# TODO: Delete this method once we're ready to let go of the multiprocessing (vs. asyncio) option
if multiprocessed:
self.logger.info(f"PID {os.getpid()}: {kp_to_use}: Entered child process {multiprocessing.current_process()}")
# This function answers a single-edge (one-hop) query using the specified knowledge provider
qedge_key = next(qedge_key for qedge_key in edge_qg.edges)
log.info(f"Expanding qedge {qedge_key} using {kp_to_use}")
answer_kg = QGOrganizedKnowledgeGraph()
# Make sure we have all default parameters set specific to the KP we'll be using
log.data["parameters"] = self._set_and_validate_parameters(kp_to_use, input_parameters, log)
# Make sure at least one of the qnodes has a curie specified
if not any(qnode for qnode in edge_qg.nodes.values() if qnode.ids):
log.error(f"Cannot expand an edge for which neither end has any curies. (Could not find curies to use from "
f"a prior expand step, and neither qnode has a curie specified.)", error_code="InvalidQuery")
return answer_kg, log
# Make sure the specified KP is a valid option
allowable_kps = kp_selector.valid_kps
if kp_to_use not in allowable_kps:
log.error(f"Invalid knowledge provider: {kp_to_use}. Valid options are {', '.join(allowable_kps)}",
error_code="InvalidKP")
return answer_kg, log
# Route this query to the proper place depending on the KP
try:
if kp_to_use == 'infores:arax-drug-treats-disease':
from Expand.DTD_querier import DTDQuerier
kp_querier = DTDQuerier(log)
elif kp_to_use == 'infores:arax-normalized-google-distance':
from Expand.ngd_querier import NGDQuerier
kp_querier = NGDQuerier(log)
else:
# This is a general purpose querier for use with any KPs that we query via their TRAPI 1.0+ API
from Expand.trapi_querier import TRAPIQuerier
kp_querier = TRAPIQuerier(response_object=log,
kp_name=kp_to_use,
user_specified_kp=user_specified_kp,
kp_timeout=kp_timeout,
kp_selector=kp_selector,
force_local=force_local)
# Actually answer the query using the Querier we identified above
answer_kg = kp_querier.answer_one_hop_query(edge_qg)
except Exception:
tb = traceback.format_exc()
error_type, error, _ = sys.exc_info()
if user_specified_kp:
log.error(f"An uncaught error was thrown while trying to Expand using {kp_to_use}. Error was: {tb}",
error_code=f"UncaughtError")
else:
log.warning(f"An uncaught error was thrown while trying to Expand using {kp_to_use}, so I couldn't "
f"get answers from that KP. Error was: {tb}")
if multiprocessed:
self.logger.info(f"PID {os.getpid()}: {kp_to_use}: Exiting child process {multiprocessing.current_process()} (it errored out)")
return QGOrganizedKnowledgeGraph(), log
if log.status != 'OK':
if multiprocessed:
self.logger.info(f"PID {os.getpid()}: {kp_to_use}: Exiting child process {multiprocessing.current_process()} (it errored out)")
return answer_kg, log
log.info(f"{kp_to_use}: Query for edge {qedge_key} completed ({eu.get_printable_counts_by_qg_id(answer_kg)})")
# Do some post-processing (deduplicate nodes, remove self-edges..)
if kp_to_use != 'infores:rtx-kg2': # KG2c is already deduplicated and uses canonical predicates
answer_kg = eu.check_for_canonical_predicates(answer_kg, kp_to_use, log)
answer_kg = self._deduplicate_nodes(answer_kg, kp_to_use, log)
if any(edges for edges in answer_kg.edges_by_qg_id.values()): # Make sure the KP actually returned something
answer_kg = self._remove_self_edges(answer_kg, kp_to_use, log)
if multiprocessed:
self.logger.info(f"PID {os.getpid()}: {kp_to_use}: Exiting child process {multiprocessing.current_process()}")
return answer_kg, log
def _expand_node(self, qnode_key: str, kp_to_use: str, query_graph: QueryGraph, mode: str,
user_specified_kp: bool, kp_timeout: Optional[int], force_local: bool, log: ARAXResponse) -> QGOrganizedKnowledgeGraph:
# This function expands a single node using the specified knowledge provider
log.debug(f"Expanding node {qnode_key} using {kp_to_use}")
qnode = query_graph.nodes[qnode_key]
single_node_qg = QueryGraph(nodes={qnode_key: qnode}, edges=dict())
answer_kg = QGOrganizedKnowledgeGraph()
if log.status != 'OK':
return answer_kg
if not qnode.ids:
log.error(f"Cannot expand a single query node if it doesn't have a curie", error_code="InvalidQuery")
return answer_kg
# Answer the query using the proper KP (only our own KP answers single-node queries)
valid_kps_for_single_node_queries = ["infores:rtx-kg2"]
if kp_to_use in valid_kps_for_single_node_queries:
if kp_to_use == 'infores:rtx-kg2' and mode == 'RTXKG2':
from Expand.kg2_querier import KG2Querier
kp_querier = KG2Querier(log)
else:
from Expand.trapi_querier import TRAPIQuerier
kp_querier = TRAPIQuerier(response_object=log,
kp_name=kp_to_use,
user_specified_kp=user_specified_kp,
kp_timeout=kp_timeout,
force_local=force_local)
answer_kg = kp_querier.answer_single_node_query(single_node_qg)
log.info(f"Query for node {qnode_key} returned results ({eu.get_printable_counts_by_qg_id(answer_kg)})")
if kp_to_use != 'infores:rtx-kg2': # KG2c is already deduplicated
answer_kg = self._deduplicate_nodes(answer_kg, kp_to_use, log)
return answer_kg
else:
log.error(f"Invalid knowledge provider: {kp_to_use}. Valid options for single-node queries are "
f"{', '.join(valid_kps_for_single_node_queries)}", error_code="InvalidKP")
return answer_kg
def _get_query_graph_for_edge(self, qedge_key: str, full_qg: QueryGraph, overarching_kg: QGOrganizedKnowledgeGraph, log: ARAXResponse) -> QueryGraph:
# This function creates a query graph for the specified qedge, updating its qnodes' curies as needed
edge_qg = QueryGraph(nodes=dict(), edges=dict())
qedge = full_qg.edges[qedge_key]
qnode_keys = [qedge.subject, qedge.object]
# Add (a copy of) this qedge to our edge query graph
edge_qg.edges[qedge_key] = copy.deepcopy(qedge)
# Update this qedge's qnodes as appropriate and add (copies of) them to the edge query graph
required_qedge_keys = {qe_key for qe_key, qe in full_qg.edges.items() if not qe.option_group_id}
expanded_qedge_keys = set(overarching_kg.edges_by_qg_id)
qedge_has_already_been_expanded = qedge_key in expanded_qedge_keys
qedge_is_required = qedge_key in required_qedge_keys
for qnode_key in qnode_keys:
qnode = full_qg.nodes[qnode_key]
qnode_copy = copy.deepcopy(qnode)
# Feed in curies from a prior Expand() step as the curie for this qnode as necessary
qnode_already_fulfilled = qnode_key in overarching_kg.nodes_by_qg_id
if qnode_already_fulfilled and not qnode_copy.ids:
existing_curies_for_this_qnode_key = list(overarching_kg.nodes_by_qg_id[qnode_key])
if qedge_has_already_been_expanded:
# Feed in curies only for 'input' qnodes if we're re-expanding this edge (i.e., with another KP)
if self._is_input_qnode(qnode_key, qedge_key, full_qg, log):
qnode_copy.ids = existing_curies_for_this_qnode_key
elif qedge_is_required:
# Only feed in curies to required qnodes if it was expansion of a REQUIRED qedge that grabbed them
qedge_keys_connected_to_qnode = eu.get_connected_qedge_keys(qnode_key, full_qg)
was_populated_by_required_edge = qedge_keys_connected_to_qnode.intersection(required_qedge_keys, expanded_qedge_keys)
if was_populated_by_required_edge:
qnode_copy.ids = existing_curies_for_this_qnode_key
else:
qnode_copy.ids = existing_curies_for_this_qnode_key
edge_qg.nodes[qnode_key] = qnode_copy
# Display a summary of what the modified query graph for this edge looks like
qnodes_with_curies = [qnode_key for qnode_key, qnode in edge_qg.nodes.items() if qnode.ids]
qnodes_without_curies = [qnode_key for qnode_key in edge_qg.nodes if qnode_key not in qnodes_with_curies]
input_qnode_key = qnodes_with_curies[0] if qnodes_with_curies else qnodes_without_curies[0]
output_qnode_key = list(set(edge_qg.nodes).difference({input_qnode_key}))[0]
input_qnode = edge_qg.nodes[input_qnode_key]
output_qnode = edge_qg.nodes[output_qnode_key]
input_curie_summary = self._get_qnode_curie_summary(input_qnode)
output_curie_summary = self._get_qnode_curie_summary(output_qnode)
log.debug(f"Modified QG for this qedge is ({input_qnode_key}:{input_qnode.categories}{input_curie_summary})-"
f"{qedge.predicates if qedge.predicates else ''}-({output_qnode_key}:{output_qnode.categories}{output_curie_summary})")
return edge_qg
@staticmethod
def _deduplicate_nodes(answer_kg: QGOrganizedKnowledgeGraph, kp_name: str, log: ARAXResponse) -> QGOrganizedKnowledgeGraph:
log.debug(f"{kp_name}: Deduplicating nodes")
deduplicated_kg = QGOrganizedKnowledgeGraph(nodes={qnode_key: dict() for qnode_key in answer_kg.nodes_by_qg_id},
edges={qedge_key: dict() for qedge_key in answer_kg.edges_by_qg_id})
curie_mappings = dict()
# First deduplicate the nodes
for qnode_key, nodes in answer_kg.nodes_by_qg_id.items():
# Load preferred curie info from NodeSynonymizer
log.debug(f"{kp_name}: Getting preferred curies for {qnode_key} nodes returned in this step")
canonicalized_nodes = eu.get_canonical_curies_dict(list(nodes), log) if nodes else dict()
if log.status != 'OK':
return deduplicated_kg
for node_key in nodes:
# Figure out the preferred curie/name for this node
node = nodes.get(node_key)
canonicalized_node = canonicalized_nodes.get(node_key)
if canonicalized_node:
preferred_curie = canonicalized_node.get('preferred_curie', node_key)
preferred_name = canonicalized_node.get('preferred_name', node.name)
preferred_type = canonicalized_node.get('preferred_type')
preferred_categories = eu.convert_to_list(preferred_type) if preferred_type else node.categories
curie_mappings[node_key] = preferred_curie
else:
# Means the NodeSynonymizer didn't recognize this curie
preferred_curie = node_key
preferred_name = node.name
preferred_categories = node.categories
curie_mappings[node_key] = preferred_curie
# Add this node into our deduplicated KG as necessary
if preferred_curie not in deduplicated_kg.nodes_by_qg_id[qnode_key]:
node_key = preferred_curie
node.name = preferred_name
node.categories = preferred_categories
deduplicated_kg.add_node(node_key, node, qnode_key)
# Then update the edges to reflect changes made to the nodes
for qedge_key, edges in answer_kg.edges_by_qg_id.items():
for edge_key, edge in edges.items():
edge.subject = curie_mappings.get(edge.subject)
edge.object = curie_mappings.get(edge.object)
if not edge.subject or not edge.object:
log.error(f"{kp_name}: Could not find preferred curie mappings for edge {edge_key}'s node(s)")
return deduplicated_kg
deduplicated_kg.add_edge(edge_key, edge, qedge_key)
log.debug(f"{kp_name}: After deduplication, answer KG counts are: {eu.get_printable_counts_by_qg_id(deduplicated_kg)}")
return deduplicated_kg
@staticmethod
def _extract_query_subgraph(qedge_keys_to_expand: List[str], query_graph: QueryGraph, log: ARAXResponse) -> QueryGraph:
# This function extracts a sub-query graph containing the provided qedge IDs from a larger query graph
sub_query_graph = QueryGraph(nodes=dict(), edges=dict())
for qedge_key in qedge_keys_to_expand:
# Make sure this query edge actually exists in the query graph
if qedge_key not in query_graph.edges:
log.error(f"An edge with ID '{qedge_key}' does not exist in Message.QueryGraph",
error_code="UnknownValue")
return None
qedge = query_graph.edges[qedge_key]
# Make sure this qedge's qnodes actually exist in the query graph
if not query_graph.nodes.get(qedge.subject):
log.error(f"Qedge {qedge_key}'s subject refers to a qnode that does not exist in the query graph: "
f"{qedge.subject}", error_code="InvalidQEdge")
return None
if not query_graph.nodes.get(qedge.object):
log.error(f"Qedge {qedge_key}'s object refers to a qnode that does not exist in the query graph: "
f"{qedge.object}", error_code="InvalidQEdge")
return None
# Add (copies of) this qedge and its two qnodes to our new query sub graph
qedge_copy = copy.deepcopy(qedge)
if qedge_key not in sub_query_graph.edges:
sub_query_graph.edges[qedge_key] = qedge_copy
for qnode_key in [qedge_copy.subject, qedge_copy.object]:
qnode_copy = copy.deepcopy(query_graph.nodes[qnode_key])
if qnode_key not in sub_query_graph.nodes:
sub_query_graph.nodes[qnode_key] = qnode_copy
return sub_query_graph
@staticmethod
def _merge_answer_into_message_kg(answer_kg: QGOrganizedKnowledgeGraph, overarching_kg: QGOrganizedKnowledgeGraph,
overarching_qg: QueryGraph, expands_qg: QueryGraph, mode: str, log: ARAXResponse):
# This function merges an answer KG (from the current edge/node expansion) into the overarching KG
log.debug("Merging answer into Message.KnowledgeGraph")
pinned_curies_map = defaultdict(set)