-
Notifications
You must be signed in to change notification settings - Fork 34
/
Copy pathtimed-ble-beacon
executable file
·240 lines (215 loc) · 10.4 KB
/
timed-ble-beacon
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
#!/usr/bin/env python
import os, sys, re, signal, hmac, hashlib as hl, traceback as tb
import struct, base64, math, time, datetime as dt
import dbus, dbus.service, dbus.exceptions, dbus.mainloop.glib # python-dbus
from gi.repository import GLib # python-gobject
_td_days = dict(
y=365.2422, yr=365.2422, year=365.2422,
mo=30.5, month=30.5, w=7, week=7, d=1, day=1 )
_td_s = dict( h=3600, hr=3600, hour=3600,
m=60, min=60, minute=60, s=1, sec=1, second=1 )
_td_usort = lambda d: sorted(
d.items(), key=lambda kv: (kv[1], len(kv[0])), reverse=True )
_td_re = re.compile('(?i)^[-+]?' + ''.join( fr'(?P<{k}>\d+{k}\s*)?'
for k, v in [*_td_usort(_td_days), *_td_usort(_td_s)] ) + '$')
def ts_parse(ts_str, ts_now=False, relative_only=False):
relative = ts_now is False
if not ts_now: ts_now = dt.datetime.now()
try: ts = ts_now + dt.timedelta(seconds=float(ts_str))
except: ts = None
if not ts and ( # short time offset like "3d 5h"
(m := _td_re.search(ts_str)) and any(m.groups()) ):
delta = list()
for units in _td_days, _td_s:
val = 0
for k, v in units.items():
if not m.group(k): continue
val += v * int(''.join(filter(str.isdigit, m.group(k))) or 1)
delta.append(val)
ts = ts_now + dt.timedelta(*delta)
if not ts and not relative_only and (m := re.search( # common BE format
r'^(?P<date>(?:\d{2}|(?P<Y>\d{4}))-\d{2}-\d{2})'
r'(?:[ T](?P<time>\d{2}(?::\d{2}(?::\d{2})?)?)?)?$', ts_str )):
tpl = 'y' if not m.group('Y') else 'Y'
tpl, tss = f'%{tpl}-%m-%d', m.group('date')
if m.group('time'):
tpl_time = ['%H', '%M', '%S']
tss += ' ' + ':'.join(tss_time := m.group('time').split(':'))
tpl += ' ' + ':'.join(tpl_time[:len(tss_time)])
try: ts = dt.datetime.strptime(tss, tpl)
except ValueError: pass
if not ts and (m := re.search( # just time without AM/PM - treat as 24h format
r'^\d{1,2}:\d{2}(?::\d{2}(?P<us>\.\d+)?)?$', ts_str )):
us, tpl = 0, ':'.join(['%H', '%M', '%S'][:len(ts_str.split(':'))])
if m.group('us'):
ts_str, us = ts_str.rsplit('.', 1)
us = us[:6] + '0'*max(0, 6 - len(us))
try: ts = dt.datetime.strptime(ts_str, tpl)
except ValueError: pass
else:
ts = ts_now.replace( hour=ts.hour,
minute=ts.minute, second=ts.second, microsecond=int(us) )
if ts < ts_now: ts += dt.timedelta(1)
if not ts and not relative_only: # coreutils' "date" parses everything
import subprocess as sp
while True:
res = sp.run( ['date', '+%s', '-d', ts_str],
stdout=sp.PIPE, stderr=sp.DEVNULL )
if res.returncode:
if ',' in ts_str: ts_str = ts_str.replace(',', ' '); continue
else:
ts = dt.datetime.fromtimestamp(int(res.stdout.strip()))
if 0 < (ts_now - ts).total_seconds() <= 24*3600 and re.search(
r'(?i)^[\d:]+\s*(am|pm)?\s*([-+][\d:]+|\w+|\w+[-+][\d:]+)?$', ts_str.strip() ):
ts += dt.timedelta(1)
break
if ts: return (ts - ts_now) if relative else ts
raise ValueError(f'Failed to parse date/time string: {ts_str}')
def ts_repr( ts, ts0=None, units_max=2, units_res=None, printf=None,
_units=dict( h=3600, m=60, s=1,
y=365.2422*86400, mo=30.5*86400, w=7*86400, d=1*86400 ) ):
if ts0 is None and isinstance(ts, dt.datetime): ts0 = dt.datetime.now()
delta = ts if ts0 is None else (ts - ts0)
if isinstance(delta, dt.timedelta): delta = delta.total_seconds()
res, s, n_last = list(), abs(delta), units_max - 1
units = sorted(_units.items(), key=lambda v: v[1], reverse=True)
for unit, unit_s in units:
if not (val := math.floor(val_raw := s / unit_s)):
if units_res == unit: break
continue
elif val_raw - val > 0.98: val += 1
if len(res) == n_last or units_res == unit:
val, n_last = round(s / unit_s), True
res.append(f'{val:.0f}{unit}')
if n_last is True: break
if (s := s - val * unit_s) < 1: break
if not res: return 'now'
res = ' '.join(res)
if printf: res = printf % res
return res
class BLErrInvalidArgs(dbus.exceptions.DBusException):
_dbus_error_name = 'org.freedesktop.DBus.Error.InvalidArgs'
class BLEAdMsg(dbus.service.Object): # see man org.bluez.LEAdvertisement
iface = 'org.bluez.LEAdvertisement1'
m = ( lambda si='',so='',iface='org.freedesktop.DBus.Properties':
dbus.service.method(iface, in_signature=si, out_signature=so) )
def __init__(self, bus, mid, secret, td, bms_a, bms_b):
n, td = int(time.time() - 1_735_689_600), round(td / 100)
msg = struct.pack('<HL', td, n)
mac = hmac.HMAC(secret, msg, digestmod=hl.sha256).digest()[:8]
ad_name = base64.urlsafe_b64encode(mac).decode()
ad_name = ad_name.translate(dict(zip(b'_-=', 'qQX'))) # dbus name limits
self.bus, self.path = bus, f'/org/bluez/example/advertisement/{ad_name}'
p(f'-- new BLE ad [ {self.path} ]: ( {mid:,d}, {td:,d} {n:,d} {secret} )')
self.broadcast = dict(
MinInterval=dbus.UInt32(bms_a), MaxInterval=dbus.UInt32(bms_a),
ManufacturerData=dbus.Dictionary( # uint16 key, <=27B value
{mid: dbus.Array(msg + mac, signature='y')}, signature='qv' ) )
super().__init__(self.bus, self.path)
@m('s', 'a{sv}')
def GetAll(self, iface):
try:
if iface != self.iface: raise BLErrInvalidArgs
return dict(Type='broadcast', **self.broadcast)
except: tb.print_exc(file=sys.stderr); sys.stderr.flush()
Get = m('ss', 'v')(lambda s, iface, k: s.GetAll(iface)[k])
Release = m(iface=iface)(lambda s: None)
def run_ad(mid, secret, td_updates, td_total, td_beacon):
ts_end = time.monotonic() + td_total
dbus.mainloop.glib.DBusGMainLoop(set_as_default=True)
bus = dbus.SystemBus()
om = dbus.Interface(
bus.get_object('org.bluez', '/'), 'org.freedesktop.DBus.ObjectManager')
for adapter, props in om.GetManagedObjects().items():
if (iface := 'org.bluez.LEAdvertisingManager1') in props: break
else: raise RuntimeError('Failed to find any suitable BT device')
ad_mgr = dbus.Interface(bus.get_object('org.bluez', adapter), iface)
ad = None
def _ad_update(close=False):
nonlocal ad
try:
if ad:
try:
ad_mgr.UnregisterAdvertisement(ad)
dbus.service.Object.remove_from_connection(ad)
except: pass
if close: return
if not (td := max(0, ts_end - time.monotonic())): return loop.quit()
ad = BLEAdMsg(bus, mid, secret, td, *td_beacon)
ad_mgr.RegisterAdvertisement( ad.path, {},
reply_handler=lambda: None, error_handler=lambda err:
(p_err(f'BLE ad setup/update failed - {err}' ), loop.quit()) )
return True # for glib to run this again after interval
except: tb.print_exc(file=sys.stderr); sys.stderr.flush(); loop.quit()
ts_start = dt.datetime.now().isoformat(sep=' ', timespec='seconds')
p( f'loop: Setup start=[ {ts_start} ] duration=[ {ts_repr(td_total)} ]'
f' update-interval=[ {ts_repr(td_updates)} ] beacon_ms={td_beacon}' )
loop = GLib.MainLoop()
for sig in signal.SIGTERM, signal.SIGINT:
GLib.unix_signal_add(GLib.PRIORITY_DEFAULT, sig, loop.quit)
_ad_update()
GLib.timeout_add(round(td_updates * 1000), _ad_update)
p('loop: Starting BLE advertisement loop...')
loop.run()
_ad_update(close=True)
p('loop: Finished')
p = lambda *a,**kw: print(*a, **kw, flush=True)
p_err = lambda *a,**kw: print('ERROR:', *a, **kw, file=sys.stderr, flush=True) or 1
def main(argv=None):
import argparse, textwrap
dd = lambda text: re.sub( r' \t+', ' ',
textwrap.dedent(text).strip('\n') + '\n' ).replace('\t', ' ')
parser = argparse.ArgumentParser(
formatter_class=argparse.RawTextHelpFormatter, description=dd('''
Run BLE broadcasts with timing information in them.
Broadcasts should be passively detectable and not "discoverable"
(means that they just don't announce device name in them, I think).
BLE "Manufacturer Specific Data" broadcast field is used to carry
all information, being a tuple with --mid value + 16B of following data:
field = 2B uint16-le time-delta || 4B uint32-le replay-counter || 8B HMAC
HMAC = HMAC-SHA256(time-delta || replay-counter, key=secret)[:8]
Where time-delta is (seconds/100),
replay-counter is a simple time-based monotonic integer,
and -s/--secret set via script option must be same on sender/receiver.'''))
parser.add_argument('-t', '--time-span', metavar='time', help=dd('''
Absolute or relative time interval to send broadcasts for.
Script updates remaining time in broadcasts, and exits when time is up.
Parses timestamps as relative short times (e.g. "30s", "10min", "1h 20m", etc),
iso8601-ish times/dates or falls back to just using "date" binary,
which parses a lot of different other absolute/relative formats.'''))
parser.add_argument('-i', '--update-interval',
metavar='delta', default='2m20s', help=dd('''
Interval between updating BLE broadcast data. Default: %(default)s
Same exact broadcast is repeated within these intervals.
Should be specified as a relative short time (e.g. 1m45s) or [[HH:]MM:]SS.'''))
parser.add_argument('-b', '--beacon-interval',
metavar='ms[-ms]', default='60-120', help=dd('''
Range for an interval between sending beacons
when those are active, in milliseconds. Default: %(default)s'''))
parser.add_argument('-m', '--mid', metavar='uint16', type=int, default=61_634, help=dd('''
Manufacturer "Company Identifier" field in 0-65535 range. Default: %(default)s
It's an arbitrary value that must match on both broadcaster/observer sides.
Ideally shouldn't match commonly-seen IDs, to avoid receiver
needlessly checking broadcasts with same value from other devices.'''))
parser.add_argument('-s', '--secret',
metavar='string', default='timed-beacon-test', help=dd('''
Matching secret string for both sender and receiver. Default: %(default)s
Can be a filename prefixed by @ character (e.g. @myfile.txt), "-" for stdin,
or a file descriptor number with optional %% prefix (e.g. 3 or %%3).'''))
parser.add_argument('-d', '--debug', action='store_true', help='Verbose operation mode')
opts = parser.parse_args(argv)
def _read_file(path):
if not path or path == '-': return sys.stdin.read()
elif path.isdigit(): path = int(path)
elif path[0] == '%': path = int(path[1:])
else: return path
with open(path) as src: return src.read()
if not opts.debug: global p; p = lambda *a,**kw: None
if not opts.time_span: parser.error('-t/--time-span option must be specified')
bms_a, s, bms_b = map(str.strip, opts.beacon_interval.partition('-'))
bms_a, bms_b = int(bms_a or bms_b), int(bms_b or bms_a)
run_ad( opts.mid, _read_file(opts.secret).encode(),
td_beacon=(bms_a, bms_b),
td_total=ts_parse(opts.time_span).total_seconds(),
td_updates=ts_parse(opts.update_interval, relative_only=True).total_seconds() )
if __name__ == '__main__': sys.exit(main())