-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathtest_mp_esp32.py
156 lines (122 loc) · 4.91 KB
/
test_mp_esp32.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
import os
from inspect import getsource
from textwrap import dedent
import socket
import sys
from snapmount import mounted
import pytest
pytestmark = pytest.mark.metal
board = None
test_host = socket.gethostbyname(socket.gethostname())
def pipe(out, err, err_msg):
sys.stdout.write(out.decode())
sys.stderr.write(err.decode())
if len(err):
raise RuntimeError(f"MCU: {err_msg}")
def _strip_decorators(source_code, until):
source_code = source_code.split("\n")
for i, line in enumerate(source_code):
if line.startswith(until):
break
else:
raise ValueError(f"decorator {until} not found")
return "\n".join(source_code[i + 1:])
def runs_on_metal(mount, **kwargs):
def _wrapper(func):
source_code = _strip_decorators(dedent(getsource(func)), "@runs_on_metal")
f_name = func.__name__
def _result():
with mounted(mount, **kwargs) as board:
pipe(*board.exec_raw(source_code), f"error while compiling '{f_name}'")
call_as = f"{f_name}()"
pipe(*board.exec_raw(call_as, timeout=60), f"error while invoking {f_name}")
return _result
return _wrapper
@runs_on_metal({"/hello.txt": "Hello world"}, fs="lfs")
def test_mount_littlefs():
with open("/mount/hello.txt", 'r') as f:
assert f.read() == "Hello world"
@runs_on_metal({"/hello.txt": "Hello world"}, fs="fat")
def test_mount_fat():
with open("/mount/hello.txt", 'r') as f:
assert f.read() == "Hello world"
@runs_on_metal({}, fs="lfs")
def test_mount_littlefs_rw():
with open("/mount/hello.txt", 'w') as f:
f.write("Hello world")
with open("/mount/hello.txt", 'r') as f:
assert f.read() == "Hello world"
@runs_on_metal({}, fs="fat")
def test_mount_fat_rw():
with open("/mount/hello.txt", 'w') as f:
f.write("Hello world")
with open("/mount/hello.txt", 'r') as f:
assert f.read() == "Hello world"
@runs_on_metal({"/some/folder/hello.txt": "Hello world", "/another/folder": None}, fs="lfs")
def test_mount_littlefs_subfolders():
with open("/mount/some/folder/hello.txt", 'r') as f:
assert f.read() == "Hello world"
assert sorted(os.listdir("/mount")) == ["another", "some"]
assert os.listdir("/mount/another") == ["folder"]
@runs_on_metal({"/some/folder/hello.txt": "Hello world", "/another/folder": None}, fs="fat")
def test_mount_fat_subfolders():
with open("/mount/some/folder/hello.txt", 'r') as f:
assert f.read() == "Hello world"
assert sorted(os.listdir("/mount")) == ["another", "some"]
assert os.listdir("/mount/another") == ["folder"]
@runs_on_metal({"test.txt": b"abcdefgh" * 12800}, block_size=512, fs="fat")
def test_perf_fat_512():
from time import ticks_ms, ticks_diff
t = ticks_ms()
with open("/mount/test.txt", "rb") as f:
size = len(f.read())
dt = ticks_diff(ticks_ms(), t)
print(f"fat 512 read {size * 0.001}k in {dt * 0.001}s at {size / dt:.1f}k/s")
chunk = bytearray(size)
t = ticks_ms()
with open("/mount/test.txt", "wb") as f:
f.write(chunk)
dt = ticks_diff(ticks_ms(), t)
print(f"fat 512 write {size * 0.001}k in {dt * 0.001}s at {size / dt:.1f}k/s")
@runs_on_metal({"test.txt": b"abcdefgh" * 12800}, block_size=1024, fs="fat")
def test_perf_fat_1k():
from time import ticks_ms, ticks_diff
t = ticks_ms()
with open("/mount/test.txt", "rb") as f:
size = len(f.read())
dt = ticks_diff(ticks_ms(), t)
print(f"fat 1k read {size * 0.001}k in {dt * 0.001}s at {size / dt:.1f}k/s")
chunk = bytearray(size)
t = ticks_ms()
with open("/mount/test.txt", "wb") as f:
f.write(chunk)
dt = ticks_diff(ticks_ms(), t)
print(f"fat 1k write {size * 0.001}k in {dt * 0.001}s at {size / dt:.1f}k/s")
@runs_on_metal({"test.txt": b"abcdefgh" * 12800}, block_size=2048, fs="fat")
def test_perf_fat_2k():
from time import ticks_ms, ticks_diff
t = ticks_ms()
with open("/mount/test.txt", "rb") as f:
size = len(f.read())
dt = ticks_diff(ticks_ms(), t)
print(f"fat 2k read {size * 0.001}k in {dt * 0.001}s at {size / dt:.1f}k/s")
chunk = bytearray(size)
t = ticks_ms()
with open("/mount/test.txt", "wb") as f:
f.write(chunk)
dt = ticks_diff(ticks_ms(), t)
print(f"fat 2k write {size * 0.001}k in {dt * 0.001}s at {size / dt:.1f}k/s")
@runs_on_metal({"test.txt": b"abcdefgh" * 12800}, block_size=4096, fs="fat")
def test_perf_fat_4k():
from time import ticks_ms, ticks_diff
t = ticks_ms()
with open("/mount/test.txt", "rb") as f:
size = len(f.read())
dt = ticks_diff(ticks_ms(), t)
print(f"fat 4k read {size * 0.001}k in {dt * 0.001}s at {size / dt:.1f}k/s")
chunk = bytearray(size)
t = ticks_ms()
with open("/mount/test.txt", "wb") as f:
f.write(chunk)
dt = ticks_diff(ticks_ms(), t)
print(f"fat 4k write {size * 0.001}k in {dt * 0.001}s at {size / dt:.1f}k/s")