generated from Knowledge-Graph-Hub/kg-example
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathphenio_transform.py
145 lines (126 loc) · 5.25 KB
/
phenio_transform.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
"""Transform for PHENIO."""
import os
import sys
import tarfile
from typing import Optional
from kgx.cli.cli_utils import transform # type: ignore
from koza.cli_runner import transform_source
from kg_phenio.transform_utils.transform import Transform
from kg_phenio.utils.robot_utils import initialize_robot, robot_convert
ONTO_FILES = {
"PhenioTransform": "phenio.owl",
}
KOZA_CONFIGS = {
"edge": "kg_phenio/transform_utils/phenio/phenio_edge_sources.yaml",
"node": "kg_phenio/transform_utils/phenio/phenio_node_sources.yaml",
}
TRANSLATION_TABLE = "./kg_phenio/transform_utils/translation_table.yaml"
class PhenioTransform(Transform):
"""Parse the PHENIO OWL into nodes and edges."""
def __init__(self, input_dir: str = "", output_dir: str = ""):
"""Set defaults for PHENIO and set up ROBOT."""
source_name = "phenio"
super().__init__(source_name, input_dir, output_dir)
print("Setting up ROBOT...")
self.robot_path = os.path.join(os.getcwd(), "robot")
self.robot_params = initialize_robot(self.robot_path)
print(f"ROBOT path: {self.robot_path}")
self.robot_env = self.robot_params[1]
print(f"ROBOT evironment variables: {self.robot_env['ROBOT_JAVA_ARGS']}")
def run(self, data_file: Optional[str] = None) -> None:
"""Call transform and perform it.
Args:
data_file: data file to parse
Returns:
None.
"""
if data_file:
k = data_file.split(".")[0]
data_file = os.path.join(self.input_base_dir, data_file)
self.parse(k, data_file, k)
else:
# load all ontologies
for k in ONTO_FILES.keys():
data_file = os.path.join(self.input_base_dir, ONTO_FILES[k])
self.parse(k, data_file, k)
def parse(self, name: str, data_file: str, source: str) -> None:
"""Process the data_file.
Args:
name: Name of the ontology
data_file: data file to parse
source: Source name
Returns:
None.
"""
if not os.path.exists(data_file):
if os.path.exists(data_file + ".tar.gz"):
print(f"Decompressing {data_file}")
with tarfile.open(data_file) as compfile:
compfile.extractall(self.input_base_dir)
else:
print(f"Found ontology at {data_file}")
# Check validity of owl before transforming.
# Repair errors if the repair doesn't remove
# information (i.e., no node or edge loss).
# This is necessary for PHENIO because it's large
# and may contain errors impacting transform to
# nodes/edges.
# For now, this means removing empty synonyms, xrefs, and comments.
print("Checking for errors...")
offending_lines = [
"<oboInOwl:hasNarrowSynonym></oboInOwl:hasNarrowSynonym>",
"<oboInOwl:hasBroadSynonym></oboInOwl:hasBroadSynonym>",
"<oboInOwl:hasExactSynonym></oboInOwl:hasExactSynonym>",
"<oboInOwl:hasRelatedSynonym></oboInOwl:hasRelatedSynonym>",
"<oboInOwl:hasDbXref></oboInOwl:hasDbXref>",
"<rdfs:comment></rdfs:comment>",
]
data_file_tmp = data_file + ".tmp"
with open(data_file, "r") as infile:
with open(data_file_tmp, "w") as outfile:
linenum = 0
for line in infile:
linenum = linenum + 1
if line.strip() not in offending_lines:
outfile.write(line)
else:
print(f"Found error at line {linenum}: {line.strip()}.")
os.replace(data_file_tmp, data_file)
# Convert to obojson, if necessary
data_file_json = os.path.splitext(data_file)[0] + ".json"
if not os.path.exists(data_file_json):
if not robot_convert(
robot_path=self.robot_path,
input_path=data_file,
output_path=data_file_json,
robot_env=self.robot_env,
):
sys.exit(f"Failed to convert {data_file}!")
else:
print(f"Found JSON ontology at {data_file_json}.")
# Now do that transform to TSV, if necessary
data_file_tsv = os.path.join(self.output_dir, name + "_edges.tsv")
if not os.path.exists(data_file_tsv):
print("Transforming to KGX TSV...")
transform(
inputs=[data_file_json],
input_format="obojson",
output=os.path.join(self.output_dir, name),
output_format="tsv",
stream=False,
)
else:
print(f"Found KGX TSV edges at {data_file_tsv}.")
# Final step in translation:
# Use Koza to apply additional properties,
# based on each source
for config_type in ["node", "edge"]:
config = KOZA_CONFIGS[config_type]
print(f"Adding {config_type} sources using {config}")
transform_source(
source=config,
output_dir=self.output_dir,
output_format="tsv",
global_table=TRANSLATION_TABLE,
local_table=None,
)