forked from ayushsharma82/ElegantOTA
-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathplatformio_upload.py
140 lines (119 loc) · 5.12 KB
/
platformio_upload.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
# Allows PlatformIO to upload directly to ElegantOTA
#
# To use:
# - copy this script into the same folder as your platformio.ini
# - set the following for your project in platformio.ini:
#
# extra_scripts = platformio_upload.py
# upload_protocol = custom
# custom_upload_url = <your upload URL>
#
# An example of an upload URL:
# custom_upload_url = http://192.168.1.123/update
# also possible: custom_upload_url = http://domainname/update
import sys
import requests
import hashlib
from urllib.parse import urlparse
import time
from requests.auth import HTTPDigestAuth
Import("env")
try:
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from tqdm import tqdm
except ImportError:
env.Execute("$PYTHONEXE -m pip install requests_toolbelt")
env.Execute("$PYTHONEXE -m pip install tqdm")
from requests_toolbelt import MultipartEncoder, MultipartEncoderMonitor
from tqdm import tqdm
def on_upload(source, target, env):
firmware_path = str(source[0])
auth = None
upload_url_compatibility = env.GetProjectOption('custom_upload_url')
upload_url = upload_url_compatibility.replace("/update", "")
with open(firmware_path, 'rb') as firmware:
md5 = hashlib.md5(firmware.read()).hexdigest()
parsed_url = urlparse(upload_url)
host_ip = parsed_url.netloc
# Führe die GET-Anfrage aus
start_url = f"{upload_url}/ota/start?mode=fr&hash={md5}"
start_headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'de,en-US;q=0.7,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Referer': f'{upload_url}/update',
'Connection': 'keep-alive'
}
try:
checkAuthResponse = requests.get(f"{upload_url_compatibility}/update")
except Exception as e:
return 'Error checking auth: ' + repr(e)
if checkAuthResponse.status_code == 401:
try:
username = env.GetProjectOption('custom_username')
password = env.GetProjectOption('custom_password')
except:
username = None
password = None
print("No authentication values specified.")
print('Please, add some Options in your .ini file like: \n\ncustom_username=username\ncustom_password=password\n')
if username is None or password is None:
return "Authentication required, but no credentials provided."
print("Serverconfiguration: authentication needed.")
auth = HTTPDigestAuth(username, password)
try:
doUpdateAuth = requests.get(start_url, headers=start_headers, auth=auth)
except Exception as e:
return 'Error while authenticating: ' + repr(e)
if doUpdateAuth.status_code != 200:
return "Authentication failed " + str(doUpdateAuth.status_code)
print("Authentication successful")
else:
auth = None
print("Serverconfiguration: authentication not needed.")
try:
doUpdate = requests.get(start_url, headers=start_headers)
except Exception as e:
return 'Error while starting upload: ' + repr(e)
if doUpdate.status_code != 200:
return "Start request failed " + str(doUpdate.status_code)
firmware.seek(0)
encoder = MultipartEncoder(fields={
'MD5': md5,
'firmware': ('firmware', firmware, 'application/octet-stream')}
)
bar = tqdm(desc='Upload Progress',
total=encoder.len,
dynamic_ncols=True,
unit='B',
unit_scale=True,
unit_divisor=1024
)
monitor = MultipartEncoderMonitor(encoder, lambda monitor: bar.update(monitor.bytes_read - bar.n))
post_headers = {
'Host': host_ip,
'User-Agent': 'Mozilla/5.0 (X11; Ubuntu; Linux x86_64; rv:109.0) Gecko/20100101 Firefox/118.0',
'Accept': '*/*',
'Accept-Language': 'de,en-US;q=0.7,en;q=0.3',
'Accept-Encoding': 'gzip, deflate',
'Referer': f'{upload_url}/update',
'Connection': 'keep-alive',
'Content-Type': monitor.content_type,
'Content-Length': str(monitor.len),
'Origin': f'{upload_url}'
}
try:
response = requests.post(f"{upload_url}/ota/upload", data=monitor, headers=post_headers, auth=auth)
except Exception as e:
return 'Error while uploading: ' + repr(e)
bar.close()
time.sleep(0.1)
if response.status_code != 200:
message = "\nUpload failed.\nServer response: " + response.text
tqdm.write(message)
else:
message = "\nUpload successful.\nServer response: " + response.text
tqdm.write(message)
env.Replace(UPLOADCMD=on_upload)