-
Notifications
You must be signed in to change notification settings - Fork 193
/
Copy pathtest-e2e-latency.py
executable file
·180 lines (142 loc) · 5.37 KB
/
test-e2e-latency.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
#!/usr/bin/env python3
#
# Copyright (c) 2016-2022 Arkadiusz Bokowy
#
# This file is a part of bluez-alsa.
#
# This project is licensed under the terms of the MIT license.
import argparse
import signal
import subprocess
import sys
import time
from csv import DictWriter
from math import ceil, floor
from random import randint
from struct import Struct
# Format mapping between BlueALSA
# and Python struct module
FORMATS = {
'U8': ('<', 'B'),
'S16_LE': ('<', 'h'),
'S24_3LE': ('<', None), # not supported
'S24_LE': ('<', 'i'),
'S32_LE': ('<', 'i'),
}
# Value ranges for different formats
LIMITS = {
'U8': (0, 255),
'S16_LE': (-32768, 32767),
'S24_LE': (-8388608, 8388607),
'S32_LE': (-2147483648, 2147483647),
}
def rate_sync(t0: float, frames: int, rate: int):
delta = frames / rate - time.monotonic() + t0
if (delta > 0):
print(f"Rate sync: {delta:.6f}")
time.sleep(delta)
else:
print(f"Rate sync overdue: {-delta:.6f}")
def test_pcm_write(pcm, pcm_format, pcm_channels, pcm_rate, interval):
"""Write PCM test signal.
This function generates test signal when real time modulo interval is zero
(within sample rate resolution capabilities). Providing that both devices
are in sync with NTP, this should be a reliable way to detect end-to-end
latency.
"""
fmt = FORMATS[pcm_format]
# Structure for single PCM frame
struct = Struct(fmt[0] + fmt[1] * pcm_channels)
# Time quantum in seconds
t_quantum = 1.0 / pcm_rate
# Noise PCM value range
v_noise_min = int(LIMITS[pcm_format][0] * 0.05)
v_noise_max = int(LIMITS[pcm_format][1] * 0.05)
# Signal PCM value range
v_signal_min = int(LIMITS[pcm_format][1] * 0.8)
v_signal_max = int(LIMITS[pcm_format][1] * 1.0)
signal_frames = int(0.1 * pcm_rate)
print(f"Signal frames: {signal_frames}")
frames = 0
t0 = time.monotonic()
while True:
# Time until next signal
t = time.time()
t_delta = ceil(t / interval) * interval - t - t_quantum
print(f"Next signal at: {t:.6f} + {t_delta:.6f} -> {t + t_delta:.6f}")
# Write random data to keep encoder busy
noise_frames = int(t_delta * pcm_rate)
print(f"Noise frames: {noise_frames}")
pcm.writelines(
struct.pack(*[randint(v_noise_min, v_noise_max)] * pcm_channels)
for _ in range(noise_frames))
pcm.flush()
frames += noise_frames
rate_sync(t0, frames, pcm_rate)
# Write signal data
pcm.writelines(
struct.pack(*[randint(v_signal_min, v_signal_max)] * pcm_channels)
for _ in range(signal_frames))
pcm.flush()
frames += signal_frames
rate_sync(t0, frames, pcm_rate)
def test_pcm_read(pcm, pcm_format, pcm_channels, pcm_rate, interval):
"""Read PCM test signal."""
fmt = FORMATS[pcm_format]
# Structure for single PCM frame
struct = Struct(fmt[0] + fmt[1] * pcm_channels)
# Minimal value for received PCM signal
v_signal_min = int(LIMITS[pcm_format][1] * 0.51)
csv = DictWriter(sys.stdout, fieldnames=[
'time', 'expected', 'latency', 'duration'])
csv.writeheader()
t_signal = 0
while True:
pcm_frame = struct.unpack(pcm.read(struct.size))
if pcm_frame[0] < v_signal_min:
if t_signal > 0:
t = time.time()
t_expected = floor(t / interval) * interval
csv.writerow({'time': t,
'expected': float(t_expected),
'latency': t - t_expected,
'duration': t - t_signal})
t_signal = 0
continue
if t_signal == 0:
t_signal = time.time()
parser = argparse.ArgumentParser()
parser.add_argument('-B', '--dbus', type=str, metavar='NAME',
help='BlueALSA service name suffix')
parser.add_argument('-i', '--interval', type=int, metavar='SEC', default=2,
help='signal interval in seconds; default: 2')
parser.add_argument('-t', '--timeout', type=int, metavar='SEC', default=60,
help='test timeout in seconds; default: 60')
parser.add_argument('PCM_PATH', type=str,
help='D-Bus path of the BlueALSA PCM device')
args = parser.parse_args()
signal.alarm(args.timeout)
options = ["--verbose"]
if args.dbus:
options.append(f'--dbus={args.dbus}')
try: # Get info for given BlueALSA PCM device
cmd = ['bluealsactl', *options, 'info', args.PCM_PATH]
output = subprocess.check_output(cmd, text=True)
except subprocess.CalledProcessError:
sys.exit(1)
info = {key.lower(): value.strip()
for key, value in (line.split(':', 1)
for line in output.splitlines())}
channels = int(info['channels'])
rate = int(info['rate'].split()[0])
print(f"Bluetooth: {info['transport']} {info['selected codec']}")
print(f"PCM: {info['format']} {channels} channels {rate} Hz")
print("==========")
cmd = ['bluealsactl', *options, 'open', args.PCM_PATH]
client = subprocess.Popen(cmd, stdin=subprocess.PIPE, stdout=subprocess.PIPE)
# Wait for BlueALSA to open the PCM device
time.sleep(1)
if info['mode'] == 'sink':
test_pcm_write(client.stdin, info['format'], channels, rate, args.interval)
if info['mode'] == 'source':
test_pcm_read(client.stdout, info['format'], channels, rate, args.interval)