forked from jkoelker/python-nest
-
Notifications
You must be signed in to change notification settings - Fork 4
/
Copy pathnest.py
375 lines (319 loc) · 13.7 KB
/
nest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
# -*- coding:utf-8 -*-
import logging
import time
import os
import threading
from typing import Dict, Any, List, Callable, Optional
import requests
from requests.compat import json
from requests_oauthlib import OAuth2Session
from oauthlib.oauth2 import TokenExpiredError
# Interface URLs
ACCESS_TOKEN_URL = 'https://www.googleapis.com/oauth2/v4/token'
AUTHORIZE_URL = 'https://nestservices.google.com/partnerconnections/{project_id}/auth'
API_URL = 'https://smartdevicemanagement.googleapis.com/v1/enterprises/{project_id}/devices'
REDIRECT_URI = 'https://www.google.com'
SCOPE = ['https://www.googleapis.com/auth/sdm.service']
_LOGGER = logging.getLogger(__name__)
class Device():
"""This is the class used to access the traits of a Nest device
You can access a list of traits and send commands.
The class is linked back to a Nest instance which will keep it updated
based on the Nest objects cache_period.
Since any access can trigger a network request, they can result in exceptions
"""
def __init__(self, nest_api: Optional['Nest'] = None,
name: Optional[str] = None,
device_data: Optional[Dict[str, Any]] = None):
"""Meant for internal use, get instances of Device from the Nest api
Devices returned have the nest_api and name set.
Parameters
----------
nest_api : Nest
The Nest instance providing updates
name : str
The unique name of this device
device_data : Dict
Instead of specifying the previous two, intialize directly from the
dict for the device returned from the API call. Used internally.
"""
self._name = name
self._nest_api = nest_api
self._device_data = device_data
def __str__(self):
trait_str = ','.join([f'<{k}: {v}>' for k, v in self.traits.items()])
return f'name: {self.name} where:{self.where} - {self.type}({trait_str})'
@property
def name(self) -> str:
"""str representing the unique name of the device"""
if self._device_data is not None:
full_name = self._device_data['name']
else:
full_name = self._name
return full_name.split('/')[-1]
@property
def _device(self):
if self._device_data is not None:
return self._device_data
else:
return next(device for device in self._devices if self.name in device['name'])
@property
def _devices(self):
if self._device_data is not None:
raise RuntimeError("Invalid use of singular device")
return self._nest_api._devices
@property
def where(self) -> str:
"""str representing the parent structure of the device"""
return self._device['parentRelations'][0]['displayName']
@property
def type(self) -> str:
"""str representing the type of device"""
return self._device['type'].split('.')[-1]
@property
def traits(self) -> Dict[str, Any]:
"""list of traits see https://developers.google.com/nest/device-access/traits"""
return {k.split('.')[-1]: v for k, v in self._device['traits'].items()}
def send_cmd(self, cmd: str, params: Dict[str, Any]) -> Dict[str, Any]:
"""Send a command to this device
commands are listed in https://developers.google.com/nest/device-access/traits
Parameters
----------
cmd : str
The string for the command can include the full command ie:
"sdm.devices.commands.ThermostatTemperatureSetpoint.SetCool"
or just the last two parts ie:
"ThermostatTemperatureSetpoint.SetCool"
params : dict
The content of the params to send with the command
Exceptions
----------
Will return APIError if the command or params is invalid
Returns
-------
Dict
The body of the response
"""
cmd = '.'.join(cmd.split('.')[-2:])
path = f'/{self.name}:executeCommand'
data = {
"command": "sdm.devices.commands." + cmd,
'params': params
}
response = self._nest_api._put(path=path, data=data)
return response
@staticmethod
def filter_for_trait(devices: List['Device'], trait: str) -> List['Device']:
"""Filter a list of Devices for ones with a the specified trait"""
trait = trait.split('.')[-1]
return [device for device in devices if trait in device.traits]
@staticmethod
def filter_for_cmd(devices: List['Device'], cmd: str) -> List['Device']:
"""Filter a list of Devices for ones with a trait associated with a cmd
ie. "ThermostatTemperatureSetpoint.SetCool" will filter for devices
with the "ThermostatTemperatureSetpoint" trait
"""
trait = cmd.split('.')[-2]
return Device.filter_for_trait(devices, trait)
class Nest(object):
"""This is the class used to manage the connection to Google Smart Devices
It handles the authentication flow and returns a list of the devices
associated with the account. These devices will call back to this class
to keep their values up to date.
"""
def __init__(self,
client_id: str, client_secret: str,
project_id: str,
access_token: Optional[Dict[str, Any]] = None,
access_token_cache_file: Optional[str] = None,
reautherize_callback: Optional[Callable[[str], str]] = None,
cache_period: float = 10):
"""
Parameters
----------
client_id : str
OAuth client_id
client_secret : str
OAuth secret
project_id : str
The project_id from https://console.nest.google.com/device-access/project-list
access_token : Optional[Dict[str, Any]]
Directly specify the OAuth access token ie.:
{"access_token": "", "expires_in": 3599,
"scope": ["https://www.googleapis.com/auth/sdm.service"],
"token_type": "Bearer", "expires_at": 1617334543.9341743,
"refresh_token": ""}
access_token_cache_file : Optional[str]
A path to store and load tokens to avoid needing to reauthentic
every time.
reautherize_callback : Optional[Callable[[str], str]]
If the token is expired or invalid, this callback will be called
with the URL the user needs to go to to revalidate. If not set
an AuthorizationError exception will trigger.
cache_period : float
When requesting the device set, how long should the previous
results be reused before making a new request.
"""
self._client_id = client_id
self._client_secret = client_secret
self._project_id = project_id
self._cache_period = cache_period
self._access_token_cache_file = access_token_cache_file
self._reautherize_callback = reautherize_callback
self._lock = threading.Lock()
self._last_update = 0
self._client = None
self._devices_value = {}
if not access_token and self._access_token_cache_file:
try:
with open(self._access_token_cache_file, 'r') as fd:
access_token = json.load(fd)
_LOGGER.debug("Loaded access token from %s",
self._access_token_cache_file)
except:
_LOGGER.warn("Token load failed from %s",
self._access_token_cache_file)
if access_token:
self._client = OAuth2Session(self._client_id, token=access_token)
def __save_token(self, token):
if self._access_token_cache_file:
with open(self._access_token_cache_file, 'w') as fd:
json.dump(token, fd)
_LOGGER.debug("Save access token to %s",
self._access_token_cache_file)
def __reauthorize(self):
if self._reautherize_callback is None:
raise AuthorizationError(None, 'No callback to handle OAuth URL')
self._client = OAuth2Session(
self._client_id, redirect_uri=REDIRECT_URI, scope=SCOPE)
authorization_url, state = self._client.authorization_url(
AUTHORIZE_URL.format(project_id=self._project_id),
# access_type and prompt are Google specific extra
# parameters.
access_type="offline", prompt="consent")
authorization_response = self._reautherize_callback(authorization_url)
_LOGGER.debug(">> fetch_token")
token = self._client.fetch_token(
ACCESS_TOKEN_URL,
authorization_response=authorization_response,
# Google specific extra parameter used for client
# authentication
client_secret=self._client_secret)
self.__save_token(token)
def _request(self, verb, path, data=None):
url = self._api_url + path
if data is not None:
data = json.dumps(data)
attempt = 0
while True:
attempt += 1
if self._client:
try:
_LOGGER.debug(">> %s %s", verb, url)
r = self._client.request(verb, url,
allow_redirects=False,
data=data)
_LOGGER.debug(f"<< {r.status_code}")
if r.status_code == 200:
return r.json()
if r.status_code != 401:
raise APIError(r)
except TokenExpiredError as e:
# most providers will ask you for extra credentials to be passed along
# when refreshing tokens, usually for authentication purposes.
extra = {
'client_id': self._client_id,
'client_secret': self._client_secret,
}
_LOGGER.debug(">> refreshing token")
token = self._client.refresh_token(
ACCESS_TOKEN_URL, **extra)
self.__save_token(token)
if attempt > 1:
raise AuthorizationError(
None, 'Repeated TokenExpiredError')
continue
self.__reauthorize()
def _put(self, path, data=None):
pieces = path.split('/')
path = '/' + pieces[-1]
return self._request('POST', path, data=data)
@property
def _api_url(self):
return API_URL.format(project_id=self._project_id)
@property
def _devices(self):
if time.time() > self._last_update + self._cache_period:
with self._lock:
self._devices_value = self._request('GET', '')['devices']
self._last_update = time.time()
return self._devices_value
def get_devices(self, names: Optional[List[str]] = None,
wheres: Optional[List[str]] = None,
types: Optional[List[str]] = None) -> List[Device]:
"""Return the list of devices on this account that match the specified criteria
Parameters
----------
names : Optional[List[str]]
return devices that have names that appear in this list if not None
wheres : Optional[List[str]]
return devices that have where values that appear in this list if not None
types : Optional[List[str]]
return devices that have types that appear in this list if not None
"""
ret = []
for device in self._devices:
obj = Device(device_data=device)
name_match = (names is None or obj.name in names)
where_match = (wheres is None or obj.where in wheres)
type_match = (types is None or obj.type in types)
if name_match and where_match and type_match:
ret.append(Device(nest_api=self, name=obj.name))
return ret
def __enter__(self):
return self
def __exit__(self, exc_type, exc_value, traceback):
return False
class APIError(Exception):
def __init__(self, response, msg=None):
if response is None:
response_content = b''
else:
try:
response_content = response.content
except AttributeError:
response_content = response.data
if response_content != b'':
if isinstance(response, requests.Response):
try:
message = response.json()['error']
except:
message = response_content
else:
message = "API Error Occured"
if msg is not None:
message = "API Error Occured: " + msg
# Call the base class constructor with the parameters it needs
super(APIError, self).__init__(message)
self.response = response
class AuthorizationError(Exception):
def __init__(self, response, msg=None):
if response is None:
response_content = b''
else:
try:
response_content = response.content
except AttributeError:
response_content = response.data
if response_content != b'':
if isinstance(response, requests.Response):
message = response.json().get(
'error_description',
"Authorization Failed")
else:
message = "Authorization failed"
if msg is not None:
message = "Authorization Failed: " + msg
# Call the base class constructor with the parameters it needs
super(AuthorizationError, self).__init__(message)
self.response = response