-
Notifications
You must be signed in to change notification settings - Fork 2
/
Copy pathlist_rssi.py
135 lines (122 loc) · 4.45 KB
/
list_rssi.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
#!/usr/bin/env python
# -*- encoding: utf-8 -*-
import dbus
import sys
from gi.repository import GObject as gobject
from gi.repository import GLib as glib
from dbus.mainloop.glib import DBusGMainLoop
import logging as log
import argparse
import time
try:
import matplotlib.pyplot as plt
except ImportError:
pass
def is_python_3():
return sys.version_info[0] >= 3
def is_python_2():
return not(is_python_3())
class WiFiList():
def __init__(self, watched):
self.bus = dbus.SystemBus()
self.NM = 'org.freedesktop.NetworkManager'
self.bus.add_signal_receiver(
self.handle_rssi_change, None, self.NM + '.AccessPoint', None, None)
nm = self.bus.get_object(self.NM, '/org/freedesktop/NetworkManager')
self.devlist = nm.GetDevices(dbus_interface=self.NM)
self.rssid = {}
self.data = {}
self.watched = watched
if is_python_2():
self.watched = [i.decode('utf-8') for i in watched]
self.start_time = time.time()
self.xaxis = []
def __repr__(self):
return "\n".join(["%35s: %5d" % (k, j) for k, j in self.rssid.items()])
def dbus_get_property(self, prop, member, proxy):
return proxy.Get(
self.NM + '.' + member,
prop,
dbus_interface='org.freedesktop.DBus.Properties')
def repopulate_ap_list(self):
apl = []
res = []
for i in self.devlist:
tmp = self.bus.get_object(self.NM, i)
if self.dbus_get_property('DeviceType', 'Device', tmp) == 2:
apl.append(
self.bus.get_object(self.NM, i) .GetAccessPoints(
dbus_interface=self.NM + '.Device.Wireless'))
for i in apl:
for j in i:
res.append(self.bus.get_object(self.NM, j))
return res
def get_ssid_string(self, ssid):
if is_python_3():
return b"".join([b"%s" % k.to_bytes(1, "little") for k in ssid]).decode('utf-8')
else:
return b"%s" % bytearray(ssid).decode('utf-8')
def form_rssi_dic(self):
for i in self.repopulate_ap_list():
ssid = self.dbus_get_property('Ssid', 'AccessPoint', i)
strength = self.dbus_get_property('Strength', 'AccessPoint', i)
self.rssid[self.get_ssid_string(ssid)] = int(strength)
def plotter(self):
try:
plt
except NameError:
log.error(
"was unable to use plotting library, try setting up one of: ")
log.error(
("matplotlib (pip install matplotlib)\n"
"python-tk (sudo apt-get install python-tk)\n"))
else:
graphs = []
for i in self.data:
graphs.append(plt.plot(self.xaxis, self.data[i], label=i))
plt.xlabel("Seconds")
plt.ylabel("RSSI, %")
plt.legend(handles=[g[0] for g in graphs])
plt.show()
def handle_rssi_change(self, *_):
self.form_rssi_dic()
self.xaxis.append(time.time() - self.start_time)
for i in [x for x in self.watched if x in self.rssid]:
if i in self.data.keys():
self.data[i].append(self.rssid[i])
else:
self.data[i] = []
self.data[i].append(self.rssid[i])
return True
def iowch(self, _arg, _key, loop):
cmd = sys.stdin.readline()
if cmd.startswith("stop"):
print('stopping the program')
loop.quit()
return False
elif cmd.startswith("print"):
print(self.__repr__())
elif cmd.startswith("plot"):
print('plotting your rssi data')
self.plotter()
return True
if __name__ == '__main__':
parser = argparse.ArgumentParser()
parser.add_argument("-i",
"--interactive",
help="start polling RSSI from the networks",
action="store_true")
parser.add_argument("-n",
"--networks",
nargs="*",
help="networks to collect data from",
default=[])
args = parser.parse_args()
loop = gobject.MainLoop()
DBusGMainLoop(set_as_default=True)
wfl = WiFiList(args.networks)
wfl.form_rssi_dic()
gobject.io_add_watch(sys.stdin, glib.IO_IN, wfl.iowch, loop)
print(wfl.__repr__())
if args.interactive:
loop.run()