-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathhero.py
169 lines (129 loc) · 4.7 KB
/
hero.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
"""
HERO - Home Energy Report Online(?)
https://github.com/jerobins/hero
"""
import requests, sys, datetime, json, random, logging
class hero:
def _getCredentials(self, username, password):
""" End-point discovery and Oauth token request.
Params:
username - for HER dashboard
password - for HER dashboard
Returns: hero object
"""
# i'm guessing this numeric param is just to ensure no cacheing of the end-point info
# for giggles made it a random int, probably not needed at all
params = { "x": random.randint(10000, 999999) }
endpoint = self.her_host
try:
r = requests.get(endpoint, params=params)
except Exception as e:
logging.critical(e)
sys.exit(e)
if r.status_code != 200:
logging.critical(r)
sys.exit(r)
result = r.json()
if 'fault' in result:
logging.critical(result)
sys.exit(result)
logging.debug(result)
self.hero_api = result["heroApi"]
form_data = { "grant_type": "password",
"username": username,
"password": password }
try:
r = requests.post(result["authApi"],
data=form_data,
auth=(result["consumerKey"],
result["consumerSecret"]))
except Exception as e:
logging.critical(e)
sys.exit(e)
if r.status_code != 200:
logging.critical(r)
sys.exit(r)
result = r.json()
if 'errorResponse' in result:
logging.critical(result)
sys.exit(result)
logging.debug(result)
# token lifetime is 30 mins.
self.hero_api_token = result["access_token"]
params = { "loginIdentity": result["email"],
"cdpId": result["cdp_internal_user_id"] }
endpoint = self.hero_api + self.api_account
headers = { "Authorization": "Bearer " + self.hero_api_token }
try:
r = requests.get(endpoint, params=params, headers=headers)
except Exception as e:
logging.critical(e)
sys.exit(e)
if r.status_code != 200:
logging.critical(r)
sys.exit(r)
result = r.json()
logging.debug(result)
if 'errorResponse' in result:
logging.critical(result)
sys.exit(result)
self.hero_account_id = result[0]["hubAcctId"]
return
def getDailyUsage(self, timestamp):
"""
"""
endpoint = self.hero_api + self.api_dailydata
params= { "accountId": self.hero_account_id,
"intervalDate": timestamp.strftime("%Y-%m-%d") }
headers = { "Authorization": "Bearer " + self.hero_api_token }
try:
r = requests.get(endpoint, params=params, headers=headers)
except Exception as e:
logging.critical(e)
sys.exit(e)
if r.status_code != 200:
logging.critical(r)
result = {}
else:
result = r.json()
logging.debug(result)
result = result["usagesFifteen"]
return result
def getCurrentUsageDetail(self, billCycleEndDate):
"""
"""
endpoint = self.hero_api + self.api_usage
params= { "accountId": self.hero_account_id,
"billCycleEndDate": billCycleEndDate.strftime("%Y-%m-%d") }
headers = { "Authorization": "Bearer " + self.hero_api_token }
try:
r = requests.get(endpoint, params=params, headers=headers)
except Exception as e:
logging.critical(e)
sys.exit(e)
if r.status_code != 200:
logging.critical(r)
result = {}
else:
result = r.json()
logging.debug(result)
return result
def __init__(self, username, password):
self.hero_api = ""
self.hero_api_token = ""
self.hero_account_id = ""
self.her_host = "https://her.duke-energy.com/config/config.prod.json"
self.api_account = "/user/accounts"
self.api_dailydata = "/usage/comparison/daily/one-day"
self.api_usage = "/usage/detail"
self.api_billing_cycle = "/usage/latest-billing-cycle"
# to test
self.api_energy_profile = "/home-energy-profile" # hubAccountId
self.api_two_year_billing_cycle = "/usage/two-years-billing-cycle" # hubAccountId
self.api_card = "/card" # hubAccountId
self.api_cohort = "/cohort" # hubAccountId
self.api_account_id = "/user/account-id" # accountNumber
self.api_monthlydata = "/usage/comparison/daily/one-month" # hubAccountId, startDate, endDate, heroUsageCategoryEnum?
self.api_load = "/load/disaggregation" # hubAccountId, billCycleEndDate
self._getCredentials(username, password)
return