-
Notifications
You must be signed in to change notification settings - Fork 90
/
Copy pathdebiangraph.py
164 lines (137 loc) · 5.33 KB
/
debiangraph.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
#/usr/bin/env python3
"""
example to build a file with all packages known to your system for a debian jessie:
for i in $(ls /var/lib/apt/lists/*debian_dists_jessie_* |\
grep -v i386 |grep -v Release); do
cat $i >> /tmp/allpackages;
echo >> /tmp/allpackages;
done
All debian based distros have a set of files in /var/lib/apt/lists.
In doubt create a filter for your distro
Install needed Python modules:
pip3 install pyArango
pip3 install deb_pkg_tools
"""
import deb_pkg_tools
from deb_pkg_tools.control import deb822_from_string
from deb_pkg_tools.control import parse_control_fields
from pyArango.connection import *
from pyArango.database import *
from pyArango.collection import *
from pyArango.document import *
from pyArango.query import *
from pyArango.graph import *
from pyArango.theExceptions import *
# Configure your ArangoDB server connection here
conn = Connection(arangoURL="http://localhost:8529", username="root", password="")
db = None
edgeCols = {}
packagesCol = {}
# we create our own database so we don't interfere with userdata:
if not conn.hasDatabase("testdb"):
db = conn.createDatabase("testdb")
else:
db = conn["testdb"]
if not db.hasCollection('packages'):
packagesCol = db.createCollection('Collection', name='packages')
else:
packagesCol = db.collections['packages']
def getEdgeCol(name):
if not name in edgeCols:
if not db.hasCollection(name):
edgeCols[name] = db.createCollection(name=name, className='Edges')
else:
edgeCols[name] = db.collections[name]
return edgeCols[name]
def saveGraphDefinition():
graph_collection = db.collections["_graphs"]
graph_defintion = {
"_key": "debian_dependency_graph",
"edgeDefinitions": [],
"orphanCollections": [],
}
for collection in edgeCols.keys():
graph_defintion["edgeDefinitions"].append(
{"collection": collection,
"from": ["packages",],
"to": ["packages",]})
graph_collection.createDocument(graph_defintion).save()
def VersionedDependencyToDict(oneDep, hasAlternatives):
return {
'name': oneDep.name,
'version': oneDep.version,
'operator': oneDep.operator,
'hasAlternatives': hasAlternatives
}
def DependencyToDict(oneDep, hasAlternatives):
return {
'name': oneDep.name,
'hasAlternatives': hasAlternatives
}
def DependencySetToDict(dep, hasAlternatives):
depset = []
for oneDep in dep.relationships:
if isinstance(oneDep, deb_pkg_tools.deps.VersionedRelationship):
depset.append(VersionedDependencyToDict(oneDep, hasAlternatives))
elif isinstance(oneDep, deb_pkg_tools.deps.AlternativeRelationship):
depset.append(DependencySetToDict(oneDep, True))
elif isinstance(oneDep, deb_pkg_tools.deps.Relationship):
depset.append(DependencyToDict(oneDep, hasAlternatives))
else:
print("Unknown relationshitp: " + repr(oneDep))
return depset
def PackageToDict(pkg):
# packages aren't serializable by default, translate it:
ret = {}
for attribute in pkg.keys():
if isinstance(pkg[attribute], deb_pkg_tools.deps.RelationshipSet):
# relation ship field to become an array of relations:
ret[attribute] = DependencySetToDict(pkg[attribute], False)
else:
# regular string field:
ret[attribute] = pkg[attribute]
ret["_key"] = ret["Package"]
return ret
def saveDependencyToEdgeCol(edgeCol, dep, pname, hasAlternatives):
for oneDep in dep.relationships:
if isinstance(oneDep, deb_pkg_tools.deps.VersionedRelationship):
# version dependend relations:
d = VersionedDependencyToDict(oneDep, hasAlternatives)
d['_from'] = 'packages/' + pname
d['_to'] = 'packages/' + oneDep.name
relation = edgeCol.createDocument(d).save()
elif isinstance(oneDep, deb_pkg_tools.deps.AlternativeRelationship):
# A set of alternative relations; recurse:
saveDependencyToEdgeCol(edgeCol, oneDep, pname, True)
elif isinstance(oneDep, deb_pkg_tools.deps.Relationship):
# simple relations only to package names without versions:
d = DependencyToDict(oneDep, hasAlternatives)
d['_from'] = 'packages/' + pname
d['_to'] = 'packages/' + oneDep.name
relation = edgeCol.createDocument(d).save()
else:
print("Unknown relationshitp: " + repr(oneDep))
#
# Main import routine
#
onePackage = ''
for line in open('/tmp/allpackages', encoding='utf-8'):
# Package blocks are separated by new lines.
if len(line) == 1 and len(onePackage) > 4:
pkg = deb822_from_string(onePackage)
pname = pkg['Package']
pkg1 = parse_control_fields(pkg)
p = PackageToDict(pkg1)
try:
packagesCol.createDocument(p).save()
for key in pkg1.keys():
# filter for fields with relations:
if isinstance(pkg1[key], deb_pkg_tools.deps.RelationshipSet):
# save one relation set to field:
saveDependencyToEdgeCol(getEdgeCol(key), pkg1[key], pname, False)
onePackage = ''
except CreationError:
pass
else:
onePackage += line
saveGraphDefinition()