-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathshacl2pgdl.py
122 lines (100 loc) · 5.27 KB
/
shacl2pgdl.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
import rdflib
from rdflib.namespace import RDF, Namespace
import yaml
import argparse
# Define namespaces
SH = Namespace("http://www.w3.org/ns/shacl#")
SHPG = Namespace("http://ii.uwb.edu.pl/shpg#")
XSD = Namespace("http://www.w3.org/2001/XMLSchema#")
DCTERMS = Namespace("http://purl.org/dc/terms/")
RDFS = Namespace("http://www.w3.org/2000/01/rdf-schema#")
OWL = Namespace("http://www.w3.org/2002/07/owl#")
SKOS = Namespace("http://www.w3.org/2004/02/skos/core#")
def get_last_segment(uri):
"""
Extracts the last segment from a URI.
"""
return uri.split('/')[-1].split('#')[-1].split(':')[-1]
def parse_shacl(graph):
metadata = {}
shapes = []
# Extract metadata
for term in ['title', 'creator', 'subject', 'description', 'publisher', 'contributor', 'date', 'type', 'format', 'identifier', 'source', 'language', 'relation', 'coverage', 'rights']:
for s, p, o in graph.triples((None, DCTERMS[term], None)):
if isinstance(s, rdflib.term.BNode) or (isinstance(s, rdflib.term.URIRef) and (str(s).endswith('/') or str(s).endswith('#'))):
if o not in (SH.NodeShape, SH.PropertyGroup, SH.PropertyShape):
if term not in metadata:
metadata[term] = []
metadata[term].append(str(o))
for term in ['created', 'issued', 'modified']:
for s, p, o in graph.triples((None, DCTERMS[term], None)):
if isinstance(s, rdflib.term.BNode) or (isinstance(s, rdflib.term.URIRef) and (str(s).endswith('/') or str(s).endswith('#'))):
if o not in (SH.NodeShape, SH.PropertyGroup, SH.PropertyShape):
if term not in metadata:
metadata[term] = []
metadata[term].append(str(o))
for term in ['label', 'comment', 'seeAlso', 'isDefinedBy']:
for s, p, o in graph.triples((None, RDFS[term], None)):
if isinstance(s, rdflib.term.BNode) or (isinstance(s, rdflib.term.URIRef) and (str(s).endswith('/') or str(s).endswith('#'))):
if o not in (SH.NodeShape, SH.PropertyGroup, SH.PropertyShape):
if term not in metadata:
metadata[term] = []
metadata[term].append(str(o))
for term in ['versionInfo', 'priorVersion', 'backwardCompatibleWith', 'incompatibleWith']:
for s, p, o in graph.triples((None, OWL[term], None)):
if isinstance(s, rdflib.term.BNode) or (isinstance(s, rdflib.term.URIRef) and (str(s).endswith('/') or str(s).endswith('#'))):
if o not in (SH.NodeShape, SH.PropertyGroup, SH.PropertyShape):
if term not in metadata:
metadata[term] = []
metadata[term].append(str(o))
for term in ['altLabel', 'changeNote', 'definition', 'editorialNote', 'example', 'hiddenLabel', 'historyNote', 'note', 'prefLabel', 'scopeNote']:
for s, p, o in graph.triples((None, SKOS[term], None)):
if isinstance(s, rdflib.term.BNode) or (isinstance(s, rdflib.term.URIRef) and (str(s).endswith('/') or str(s).endswith('#'))):
if o not in (SH.NodeShape, SH.PropertyGroup, SH.PropertyShape):
if term not in metadata:
metadata[term] = []
metadata[term].append(str(o))
for term, values in metadata.items():
if len(values) == 1:
metadata[term] = values[0]
# Extract shapes
for shape in graph.subjects(RDF.type, SH.NodeShape):
shape_dict = {'targetClass': get_last_segment(str(graph.value(shape, SH.targetClass)))}
properties = []
edges = []
for prop in graph.objects(shape, SH.property):
prop_dict = {}
path = graph.value(prop, SH.path)
if path:
prop_dict['name'] = get_last_segment(str(path))
datatype = graph.value(prop, SH.datatype)
if datatype:
prop_dict['datatype'] = get_last_segment(str(datatype))
properties.append(prop_dict)
relation = graph.value(prop, SHPG.relation)
if relation:
edge_dict = {'name': get_last_segment(str(path)), 'node': get_last_segment(str(graph.value(prop, SH.node)))}
key = graph.value(relation, SHPG.key)
if key:
edge_dict['relations'] = [{'name': get_last_segment(str(key)), 'datatype': get_last_segment(str(graph.value(relation, SH.datatype)))}]
edges.append(edge_dict)
if properties:
shape_dict['properties'] = properties
if edges:
shape_dict['edges'] = edges
shapes.append(shape_dict)
return {'metadata': metadata, 'shapes': shapes}
def shacl_to_yaml(input_shacl_file):
g = rdflib.Graph()
with open(input_shacl_file, 'r') as file:
g.parse(file, format="turtle")
parsed_data = parse_shacl(g)
return yaml.dump(parsed_data, sort_keys=False, allow_unicode=True)
def main():
parser = argparse.ArgumentParser(description="Convert SHACL to YAML.")
parser.add_argument('input_shacl_file', type=str, help="Path to the input SHACL file.")
args = parser.parse_args()
yaml_output = shacl_to_yaml(args.input_shacl_file)
print(yaml_output)
if __name__ == "__main__":
main()