Skip to content

Commit

Permalink
ur:crypto-psbt support (#208)
Browse files Browse the repository at this point in the history
experimental support. sometimes fails for large txs or fast QR repetition rate
  • Loading branch information
stepansnigirev authored Jul 10, 2022
1 parent 133f54e commit f442942
Show file tree
Hide file tree
Showing 4 changed files with 86 additions and 27 deletions.
47 changes: 34 additions & 13 deletions src/gui/components/qrcode.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@
import asyncio
import platform

from microur.encoder import UREncoder
from io import BytesIO

qr_style = lv.style_t()
qr_style.body.main_color = lv.color_hex(0xFFFFFF)
qr_style.body.grad_color = lv.color_hex(0xFFFFFF)
Expand Down Expand Up @@ -33,6 +36,7 @@ def __init__(self, *args, **kwargs):
style.text.font = lv.font_roboto_16
style.text.color = lv.color_hex(0x192432)

self.encoder = None
self._autoplay = True

self.qr = lvqr.QRCode(self)
Expand Down Expand Up @@ -216,6 +220,13 @@ def update_note(self):
self.check_controls()

def set_text(self, text="Text", set_first_frame=False):
if text[:5] == b"psbt\xff":
self.encoder = UREncoder(UREncoder.CRYPTO_PSBT, BytesIO(text), 100)
self._text = text
self.idx = 0
self.set_frame()
return
self.encoder = None
if platform.simulator and self._text != text:
print("QR on screen:", text)
self._text = text
Expand All @@ -236,17 +247,22 @@ def set_text(self, text="Text", set_first_frame=False):
self.update_note()

def set_frame(self):
if self._text.startswith("UR:BYTES/"):
arr = self._text.split("/")
payload = arr[-1]
prefix = arr[0] + "/%dOF%d/" % (self.idx + 1, self.frame_num)
prefix += arr[1] + "/"
if self.encoder:
payload = self.encoder.next_part()
self._set_text(payload)
note = ""
else:
payload = self._text
prefix = "p%dof%d " % (self.idx + 1, self.frame_num)
offset = self.frame_size * self.idx
self._set_text(prefix + payload[offset : offset + self.frame_size])
note = "Part %d of %d." % (self.idx + 1, self.frame_num)
if self._text.startswith("UR:BYTES/"):
arr = self._text.split("/")
payload = arr[-1]
prefix = arr[0] + "/%dOF%d/" % (self.idx + 1, self.frame_num)
prefix += arr[1] + "/"
else:
payload = self._text
prefix = "p%dof%d " % (self.idx + 1, self.frame_num)
offset = self.frame_size * self.idx
self._set_text(prefix + payload[offset : offset + self.frame_size])
note = "Part %d of %d." % (self.idx + 1, self.frame_num)
if self.is_fullscreen:
note += " Click to shrink."
else:
Expand All @@ -256,9 +272,14 @@ def set_frame(self):
self.check_controls()

def check_controls(self):
self.controls.set_hidden((not self.is_fullscreen) or (self.idx is None))
self.playback.set_hidden((not self.is_fullscreen) or (self.idx is None))
self.play.set_hidden((not self.is_fullscreen) or (self.idx is not None) or (len(self._text) <= self.MIN_SIZE))
if self.encoder:
self.controls.set_hidden(True)
self.playback.set_hidden(True)
self.play.set_hidden(True)
else:
self.controls.set_hidden((not self.is_fullscreen) or (self.idx is None))
self.playback.set_hidden((not self.is_fullscreen) or (self.idx is None))
self.play.set_hidden((not self.is_fullscreen) or (self.idx is not None) or (len(self._text) <= self.MIN_SIZE))

def _set_text(self, text):
# one bcur frame doesn't require checksum
Expand Down
62 changes: 50 additions & 12 deletions src/hosts/qr.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@
from gui.screens.settings import HostSettings
from gui.screens import Alert
from helpers import read_until, read_write
from microur.decoder import FileURDecoder
from microur.util import cbor

QRSCANNER_TRIGGER = config.QRSCANNER_TRIGGER
# OK response from scanner
Expand Down Expand Up @@ -249,13 +251,16 @@ async def settings_menu(self, show_screen, keystore):
def clean_uart(self):
self.uart.read()

def stop_scanning(self):
self.scanning = False
def _stop_scanner(self):
if self.trigger is not None:
self.trigger.on()
else:
self.set_setting(SETTINGS_ADDR, self.CMD_MODE)

def stop_scanning(self):
self.scanning = False
self._stop_scanner()

def abort(self):
with open(self.tmpfile,"wb"):
pass
Expand Down Expand Up @@ -283,6 +288,8 @@ async def scan(self):
self.animated = False
self.parts = None
self.bcur = False
self.bcur2 = False
self.decoder = FileURDecoder(self.path)
self.bcur_hash = b""
gc.collect()
while self.scanning:
Expand All @@ -293,6 +300,8 @@ async def scan(self):
if self.parts is not None:
del self.parts
self.parts = None
del self.decoder
self.decoder = None
gc.collect()
if self.cancelled:
return None
Expand Down Expand Up @@ -343,17 +352,35 @@ def process_chunk(self):
# should not be there if trigger mode or simulator
with open(self.tmpfile, "rb") as f:
c = f.read(len(SUCCESS))
if c!=SUCCESS:
f.seek(-len(c), 1)
while c == SUCCESS:
c = f.read(len(SUCCESS))
f.seek(-len(c), 1)
# check if it's bcur encoding
start = f.read(9)
start = f.read(9).upper()
f.seek(-len(start), 1)
if start.upper() == b"UR:BYTES/":
if start == b"UR:BYTES/":
self.bcur = True
return self.process_bcur(f)
# bcur2 encoding
elif start == b"UR:CRYPTO":
self.bcur2 = True
return self.process_bcur2(f)
else:
return self.process_normal(f)

def process_bcur2(self, f):
gc.collect()
if self.decoder.read_part(f):
self._stop_scanner()
fname = self.path + "/data.txt"
with self.decoder.result() as b:
msglen = cbor.read_bytes_len(b)
with open(fname, "wb") as fout:
read_write(b, fout)
gc.collect()
return True
return False

def process_bcur(self, f):
# check if starts with UR:BYTES/
chunk, char = read_until(f, b"/", return_on_max_len=True)
Expand Down Expand Up @@ -413,6 +440,7 @@ def process_bcur(self, f):
self.parts[m - 1] = fname
# all have non-zero len
if None not in self.parts:
self._stop_scanner()
fname = self.path + "/data.txt"
with open(fname, "wb") as fout:
fout.write(b"UR:BYTES/")
Expand Down Expand Up @@ -478,6 +506,7 @@ def process_normal(self, f):
self.parts[m - 1] = fname
# all have non-zero len
if None not in self.parts:
self._stop_scanner()
fname = self.path + "/data.txt"
with open(fname, "wb") as fout:
for part in self.parts:
Expand Down Expand Up @@ -510,18 +539,25 @@ async def get_data(self):

async def send_data(self, stream, meta):
# if it's str - it's a file
if isinstance(stream, str):
with open(stream, "r") as f:
response = f.read()
else:
response = stream.read().decode()
title = "Your data:"
note = None
if "title" in meta:
title = meta["title"]
if "note" in meta:
note = meta["note"]
msg = response
if self.bcur2:
# binary data here
if isinstance(stream, str):
with open(stream, "rb") as f:
response = f.read()
else:
response = stream.read()
elif isinstance(stream, str):
with open(stream, "r") as f:
response = f.read()
else:
response = stream.read().decode()
msg = response if not self.bcur2 else ""
if "message" in meta:
msg = meta["message"]
await self.manager.gui.qr_alert(title, msg, response, note=note, qr_width=480)
Expand All @@ -537,6 +573,8 @@ def progress(self):
- either as a number between 0 and 1
- or a list of True False for checkboxes
"""
if self.bcur2 and self.decoder:
return self.decoder.progress
if not self.in_progress:
return 1
if not self.animated:
Expand Down
2 changes: 1 addition & 1 deletion src/specter.py
Original file line number Diff line number Diff line change
Expand Up @@ -534,7 +534,7 @@ async def process_host_request(self, stream, popup=True, appname=None, show_fn=N
if app.can_process(stream):
matching_apps.append(app)
if len(matching_apps) == 0:
raise HostError("Can't find matching app for this request")
raise HostError("Can't find matching app for this request:\n\n %r" % stream.read(100))
# TODO: if more than one - ask which one to use
if len(matching_apps) > 1:
raise HostError(
Expand Down

0 comments on commit f442942

Please sign in to comment.