Skip to content


improve logging
Browse files Browse the repository at this point in the history
  • Loading branch information
nkraetzschmar committed Feb 6, 2024
1 parent 7f3d043 commit 4aed562
Showing 1 changed file with 31 additions and 18 deletions.
49 changes: 31 additions & 18 deletions
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import asyncio
import sys
import time
from bleak import BleakScanner, BleakClient, BleakError
from base64 import b64encode, b64decode
Expand All @@ -15,13 +16,25 @@ class Ikawa:
ESCAPE_MAPPING = {0x7D: 0x5D, 0x7E: 0x5E}
UNESCAPE_MAPPING = {0x5D: 0x7D, 0x5E: 0x7E}

def __init__(self, reconnect=True, reconnect_timeout=60, retry_timeout=10):
def __init__(self, reconnect=True, scan_timeout=10, connect_timeout=60, retry_timeout=10, log_level=1, log_target=sys.stdout):
self.seq = 1 # start with 1 because seq=0 makes Cmd(cmd_type=BOOTLOADER_GET_VERSION) an empty message which the firmware does not seem to handle
self.resp_queue = asyncio.Queue()
self.recv_buf = bytearray()
self.reconnect = True
self.reconnect_timeout = reconnect_timeout
self.scan_timeout = scan_timeout
self.connect_timeout = connect_timeout
self.retry_timeout = retry_timeout
self.log_level = log_level
self.log_target = log_target
self.log_debug(f"log_level={log_level} log_target={log_target}")

def log(self, msg):
if self.log_level >= 1:
print(f"INFO {msg}", file=self.log_target)

def log_debug(self, msg):
if self.log_level >= 2:
print(f"DEBUG {msg}", file=self.log_target)

async def __aenter__(self):
await self.scan()
Expand All @@ -32,22 +45,22 @@ async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.disconnect()

async def scan(self):
print(f"Scanning for devices with service UUID: {self.SERVICE_UUID}...")
self.log(f"Scanning for devices with service UUID: {self.SERVICE_UUID}...")

def filter_device(device, advertisement_data):
return self.SERVICE_UUID.lower() in [uuid.lower() for uuid in advertisement_data.service_uuids]

self.target_device = await BleakScanner.find_device_by_filter(filter_device, timeout=5)
self.target_device = await BleakScanner.find_device_by_filter(filter_device, timeout=self.scan_timeout)
if self.target_device:
print(f"Found device: {} [{self.target_device.address}]")
self.log(f"Found device: {} [{self.target_device.address}]")
raise RuntimeError("No device found advertising the specified service UUID")

async def connect(self):
self.client = BleakClient(self.target_device, disconnected_callback=self.on_disconnect)
t = time.time()
while time.time() - t < self.reconnect_timeout:
print("Trying to connect")
while time.time() - t < self.connect_timeout:
self.log("Trying to connect")
await self.client.connect()
Expand All @@ -56,7 +69,7 @@ async def connect(self):
await asyncio.sleep(0.1)
if not self.client.is_connected:
raise RuntimeError("Failed to connect to the device, maximum retries exceeded")
print(f"Connected to {}")
self.log(f"Connected to {}")
await self.client.start_notify(self.NOTIFY_CHARACTERISTIC_UUID, self.on_notify)

async def disconnect(self):
Expand All @@ -69,7 +82,7 @@ async def send_frame(self, frame):
t = time.time()
while time.time() - t < self.retry_timeout:
# print("trying to send frame")
self.log_debug("trying to send frame")
await self.client.write_gatt_char(self.WRITE_CHARACTERISTIC_UUID, frame, response=True)
Expand All @@ -79,15 +92,15 @@ async def send_frame(self, frame):
await asyncio.sleep(0.1)
if not success:
raise RuntimeError("Device is disconnected, maximum retries exceeded")
# print(f"send data frame {frame}")
self.log_debug(f"send data frame {frame}")

def on_disconnect(self, client):
print(f"Disconnected from {client.address}")
self.log(f"Disconnected from {client.address}")
if self.reconnect:

async def on_notify(self, sender, data):
# print(f"notify recieved: (len={len(data)}) {data}")
self.log_debug(f"notify recieved: (len={len(data)}) {data}")
self.recv_buf += data
while True:
Expand All @@ -97,20 +110,20 @@ async def on_notify(self, sender, data):
# no more complete frames, wait for more data
frame = self.recv_buf[frame_start:frame_end+1]
# print(f"detected frame {frame}")
self.log_debug(f"detected frame {frame}")
del self.recv_buf[:frame_end]
if len(frame) < 3:
# print("accidentally considered end of previous frame as start of next, discarding")
self.log_debug("accidentally considered end of previous frame as start of next, discarding")
del self.recv_buf[0]
data_decoded = self.decode_frame(frame)
response = Response.FromString(data_decoded)
if response.seq == self.seq:
self.seq += 1
# print(f"response recieved: {response}")
self.log_debug(f"response recieved: {response}")
await self.resp_queue.put(response)
print(f"Invalid seq number {response.seq} != {self.seq}, discarding message")
self.log(f"Invalid seq number {response.seq} != {self.seq}, discarding message")

async def send_cmd(self, cmd):
assert isinstance(cmd, Cmd)
Expand All @@ -122,9 +135,9 @@ async def send_cmd(self, cmd):
frame = self.encode_frame(data)
for i in range(0, len(frame), 20):
frame_part = frame[i:i+20]
# print(f"sending (len={len(frame_part)}) {frame_part}")
self.log_debug(f"sending (len={len(frame_part)}) {frame_part}")
await self.send_frame(frame_part)
# print("waiting for response")
self.log_debug("waiting for response")
resp = await asyncio.wait_for(self.resp_queue.get(), self.retry_timeout)
return resp

Expand Down

0 comments on commit 4aed562

Please sign in to comment.