-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathserial_port.py
116 lines (95 loc) · 3.41 KB
/
serial_port.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
import serial, time
import serial.tools.list_ports
import pygatt
import pygatt.backends as pb
import pygatt.exceptions
import time
import tytserialport as tytserial
import datetime
import pandas
from datetime import datetime
from binascii import hexlify
from time import sleep
import os
import csv
#initialization and open the port
#possible timeout values:
# 1. None: wait forever, block call
# 2. 0: non-blocking mode, return immediately
# 3. x, x is bigger than 0, float allowed, timeout block call
ser = serial.Serial()
#ser.port = "/dev/ttyUSB0"
ser.baudrate = 115200
ser.bytesize = serial.EIGHTBITS # number of bits per bytes
ser.parity = serial.PARITY_NONE # set parity check: no parity
ser.stopbits = serial.STOPBITS_ONE # number of stop bits
#ser.timeout = None #block read
#ser.timeout = 1 #non-block read
ser.timeout = 10 #timeout block read
ser.xonxoff = False #disable software flow control
ser.rtscts = False #disable hardware (RTS/CTS) flow control
ser.dsrdtr = False #disable hardware (DSR/DTR) flow control
ser.writeTimeout = 10 #timeout for write
def findSerialPort():
global ser
comlist = serial.tools.list_ports.comports()
#connected = []
for element in comlist:
# print(str(element.description))
if element.description == "CP2102 USB to UART Bridge Controller" or "Silicon Labs CP210x USB to UART Bridge" in element.description:
# print("Connecting to COM port: " + str(element.device))
ser.port = element.device
return True
return False
def serialWrite(msg, expectedRspLen = 0):
global ser
if findSerialPort() != True:
return False, "NO_PORT"
try:
ser.open()
except:
# print("error opening serial port: " + str(e))
sleep(2)
ser.open()
if ser.isOpen():
try:
ser.flushInput() #flush input buffer, discarding all its contents
ser.flushOutput()#flush output buffer, aborting current output
#and discard all that is in buffer
#write data
ser.write(bytearray(msg, 'ascii'))
print("write data: {}".format(msg) )
time.sleep(0.2) #give the serial port some time to receive the data
totalNumOfBytesFromSerial = 0
numTrial = 0
full_response = ""
#while True:
# response = ser.readline()
# response = ser.readall()
if expectedRspLen:
response = ser.read(expectedRspLen - totalNumOfBytesFromSerial)
else:
response = ser.readall()
totalNumOfBytesFromSerial += len(response)
numTrial += 1
full_response += response.hex()
# full_response += response.decode("utf-8")
print("expLen: " + str(expectedRspLen) + " resLen: " + str(len(response)) + " resDataHex: " + response.hex())
ser.close()
if totalNumOfBytesFromSerial == expectedRspLen:
return True, full_response
elif totalNumOfBytesFromSerial == 0:
return False
else:
return False
return False
except:
sleep(2)
ser.write(bytearray(msg, 'ascii'))
ser.close()
finally:
ser.close()
else:
return False
if __name__ == '__main__':
pass