Skip to content

Commit

Permalink
add auto reconnect
Browse files Browse the repository at this point in the history
  • Loading branch information
nkraetzschmar committed Feb 6, 2024
1 parent 1d20a0d commit 97b8c04
Show file tree
Hide file tree
Showing 2 changed files with 92 additions and 42 deletions.
10 changes: 7 additions & 3 deletions example.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#!/usr/bin/env python3

import asyncio
import time
from libikawa import *

async def main():
Expand Down Expand Up @@ -33,9 +34,12 @@ async def main():

while True:
cmd = Cmd(cmd_type=MACH_STATUS_GET_ALL)
resp = await ikawa.send_cmd(cmd)
status = resp.resp_mach_status_get_all
print(f"{status.time}, {MachState.Name(status.state)}, {status.temp_above*0.1:.1f}, {status.temp_below*0.1:.1f}, {status.setpoint*0.1:.1f}, {status.heater}, {status.fan/255.0:.2f}, {(status.fan_measured/12.0)*60:.0f}")
try:
resp = await ikawa.send_cmd(cmd)
status = resp.resp_mach_status_get_all
print(f"{int(time.time())}, {status.time}, {MachState.Name(status.state)}, {status.temp_above*0.1:.1f}, {status.temp_below*0.1:.1f}, {status.setpoint*0.1:.1f}, {status.heater}, {status.fan/255.0:.2f}, {(status.fan_measured/12.0)*60:.0f}")
except TimeoutError:
pass
await asyncio.sleep(0.1)

try:
Expand Down
124 changes: 85 additions & 39 deletions libikawa.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import asyncio
from bleak import BleakScanner, BleakClient
import time
from bleak import BleakScanner, BleakClient, BleakError
from base64 import b64encode, b64decode
from urllib.parse import urlparse
from ikawa_pb2 import *
Expand All @@ -14,69 +15,114 @@ class Ikawa:
ESCAPE_MAPPING = {0x7D: 0x5D, 0x7E: 0x5E}
UNESCAPE_MAPPING = {0x5D: 0x7D, 0x5E: 0x7E}

def __init__(self):
def __init__(self, reconnect=True, retry_timeout=10):
self.seq = 1 # start with 1 because seq=0 makes Cmd(cmd_type=BOOTLOADER_GET_VERSION) an empty message which the firmware does not seem to handle
self.resp_queue = asyncio.Queue()
self.recv_buf = bytearray()
self.reconnect = True
self.retry_timeout = retry_timeout

async def __aenter__(self):
await self.scan_and_connect()
await self.client.start_notify(self.NOTIFY_CHARACTERISTIC_UUID, self.on_notify)
await self.scan()
await self.connect()
return self

async def __aexit__(self, exc_type, exc_val, exc_tb):
await self.client.disconnect()
await self.disconnect()

async def scan_and_connect(self):
async def scan(self):
print(f"Scanning for devices with service UUID: {self.SERVICE_UUID}...")

def filter_device(device, advertisement_data):
return self.SERVICE_UUID.lower() in [uuid.lower() for uuid in advertisement_data.service_uuids]

target_device = await BleakScanner.find_device_by_filter(filter_device, timeout=5)

if target_device:
print(f"Found device: {target_device.name} [{target_device.address}] with service UUID: {self.SERVICE_UUID}")
self.client = BleakClient(target_device, disconnected_callback=self.on_disconnect)
await self.client.connect()
if self.client.is_connected:
print(f"Successfully connected to {target_device.name}")
else:
raise RuntimeError("Failed to connect to the device")
self.target_device = await BleakScanner.find_device_by_filter(filter_device, timeout=5)
if self.target_device:
print(f"Found device: {self.target_device.name} [{self.target_device.address}]")
else:
raise RuntimeError("No device found advertising the specified service UUID")

async def send_cmd(self, cmd):
assert isinstance(cmd, Cmd)
if cmd.seq != 0:
raise ValueError("Cmd.seq should be left uninitialized and will be auto assigned")
cmd.seq = self.seq
data = cmd.SerializeToString()
frame = self.encode_frame(data)
await self.client.write_gatt_char(self.WRITE_CHARACTERISTIC_UUID, frame, response=True)
resp = await self.resp_queue.get()
return resp
async def connect(self):
self.client = BleakClient(self.target_device, disconnected_callback=self.on_disconnect)
t = time.time()
while time.time() - t < self.retry_timeout:
print("Trying to connect")
try:
await self.client.connect()
break
except (BleakError, TimeoutError):
pass
await asyncio.sleep(0.1)
if not self.client.is_connected:
raise RuntimeError("Failed to connect to the device, maximum retries exceeded")
print(f"Connected to {self.target_device.name}")
await self.client.start_notify(self.NOTIFY_CHARACTERISTIC_UUID, self.on_notify)

async def disconnect(self):
reconnect = self.reconnect
self.reconnect = False
await self.client.disconnect()
self.reconnect = reconnect

async def send_frame(self, frame):
success=False
t = time.time()
while time.time() - t < self.retry_timeout:
# print("trying to send frame")
try:
await self.client.write_gatt_char(self.WRITE_CHARACTERISTIC_UUID, frame, response=True)
success=True
break
except BleakError as e:
pass
await asyncio.sleep(0.1)
if not success:
raise RuntimeError("Device is disconnected, maximum retries exceeded")
# print(f"send data frame {frame}")

def on_disconnect(self, client):
print(f"Disconnected from {client.address}")
if self.reconnect:
asyncio.get_running_loop().create_task(self.connect())

async def on_notify(self, sender, data):
# print(f"notify recieved: {data}")
self.recv_buf += data
while len(self.recv_buf) and self.recv_buf[0] != self.FRAME_BYTE:
del self.recv_buf[0]
if len(self.recv_buf) > 3 and self.recv_buf[-1] == self.FRAME_BYTE:
while True:
try:
data_decoded = self.decode_frame(self.recv_buf)
response = Response.FromString(data_decoded)
if response.seq == self.seq:
self.seq += 1
# print(f"response recieved: {response}")
await self.resp_queue.put(response)
else:
print(f"Invalid seq number {response.seq} != {self.seq}, discarding message")
finally:
self.recv_buf = bytearray()
frame_start = self.recv_buf.index(self.FRAME_BYTE)
frame_end = frame_start + self.recv_buf[frame_start+1:].index(self.FRAME_BYTE) + 1
except ValueError:
# no more complete frames, wait for more data
break
frame = self.recv_buf[frame_start:frame_end+1]
# print(f"detected frame {frame}")
del self.recv_buf[:frame_end]
if len(frame) < 3:
# print("accidentally considered end of previous frame as start of next, discarding")
continue
del self.recv_buf[0]
data_decoded = self.decode_frame(frame)
response = Response.FromString(data_decoded)
if response.seq == self.seq:
self.seq += 1
# print(f"response recieved: {response}")
await self.resp_queue.put(response)
else:
print(f"Invalid seq number {response.seq} != {self.seq}, discarding message")

async def send_cmd(self, cmd):
assert isinstance(cmd, Cmd)
if cmd.seq != 0:
raise ValueError("cmd.seq should be left uninitialized and will be auto assigned")
cmd.seq = self.seq
data = cmd.SerializeToString()
cmd.seq = 0
frame = self.encode_frame(data)
await self.send_frame(frame)
# print("waiting for response")
resp = await asyncio.wait_for(self.resp_queue.get(), self.retry_timeout)
return resp

@classmethod
def encode_frame(cls, data):
Expand Down

0 comments on commit 97b8c04

Please sign in to comment.