-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathble_client.py
executable file
·109 lines (86 loc) · 3.85 KB
/
ble_client.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
# Example of interaction with a BLE UART device using a UART service
# implementation.
import Adafruit_BluefruitLE
from Adafruit_BluefruitLE.services import UART
import datetime
import time, sys, signal
# Get the BLE provider for the current platform.
ble = Adafruit_BluefruitLE.get_provider()
myfile = open("out.csv", "a")
# Main function implements the program logic so it can run in a background
# thread. Most platforms require the main thread to handle GUI events and other
# asyncronous events like BLE actions. All of the threading logic is taken care
# of automatically though and you just need to provide a main function that uses
# the BLE provider.
def main():
# Clear any cached data because both bluez and CoreBluetooth have issues with
# caching data and it going stale.
ble.clear_cached_data()
# Get the first available BLE network adapter and make sure it's powered on.
adapter = ble.get_default_adapter()
adapter.power_on()
print('Using adapter: {0}'.format(adapter.name))
# Disconnect any currently connected UART devices. Good for cleaning up and
# starting from a fresh state.
print('Disconnecting any connected UART devices...')
UART.disconnect_devices()
# Scan for UART devices.
print('Searching for UART device...')
try:
adapter.start_scan()
# Search for the first UART device found (will time out after 60 seconds
# but you can specify an optional timeout_sec parameter to change it).
device = UART.find_device()
if device is None:
raise RuntimeError('Failed to find UART device!')
finally:
# Make sure scanning is stopped before exiting.
adapter.stop_scan()
print('Connecting to device...')
device.connect() # Will time out after 60 seconds, specify timeout_sec parameter
# to change the timeout.
#signal.pause()
# Once connected do everything else in a try/finally to make sure the device
# is disconnected when done.
try:
# Wait for service discovery to complete for the UART service. Will
# time out after 60 seconds (specify timeout_sec parameter to override).
print('Discovering services...')
UART.discover(device)
# Once service discovery is complete create an instance of the service
# and start interacting with it.
uart = UART(device)
# Write a string to the TX characteristic.
uart.write(b'Hello world!\r\n')
print("Sent 'Hello world!' to the device.")
# Now wait up to one minute to receive data from the device.
print('Waiting up to 60 seconds to receive data from the device...')
print(datetime.datetime.now())
start_ms = int(round(time.time() * 1000))
buffer = ''
suffix = '\r\n'
finished = False
while finished != True:
# read data from BLE device
received = uart.read(timeout_sec=30)
# if nothing has been received after timeout then close the session
if(received == None):
finished = True
else:
buffer += received.decode('UTF-8')
if(buffer.endswith(suffix)):
now_ms = str(int(round(time.time() * 1000)))
line = now_ms + ',' + buffer
myfile.write(line)
myfile.flush()
buffer = ''
finally:
# Make sure device is disconnected on exit.
device.disconnect()
myfile.close()
# Initialize the BLE system. MUST be called before other BLE calls!
ble.initialize()
# Start the mainloop to process BLE events, and run the provided function in
# a background thread. When the provided main function stops running, returns
# an integer status code, or throws an error the program will exit.
ble.run_mainloop_with(main)