Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add async update #17

Open
wants to merge 14 commits into
base: main
Choose a base branch
from
117 changes: 109 additions & 8 deletions electrasmart/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@

import requests
import logging
import aiohttp
import asyncio

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -52,6 +54,43 @@ def post(cls, cmd, data, sid=None, os_details=False, is_second_try=False):
raise cls.RenewSidAndRetryException(j)
return j["data"]

@classmethod
async def async_post(
cls, cmd, data, sid=None, os_details=False, is_second_try=False
):
if os_details:
data = data.copy()
data.update(cls.MOCK_OS_DATA)
random_id = random.randint(1000, 1999)
post_data = dict(pvdid=1, id=random_id, sid=sid, cmd=cmd, data=data)
logger.debug(
f"[ASYNC] Posting request\nid: {random_id}\nurl: {cls.URL}\nheaders: {cls.HEADERS}\n"
f"[ASYNC] post json data:\n{pformat(post_data)}"
)
try:
async with aiohttp.ClientSession() as session:
async with session.post(
cls.URL, headers=cls.HEADERS, json=post_data
) as response:
j = await response.json(content_type=None)
except:
logger.exception(
"[ASYNC] ElectraAPI: Exception caught when posting to cloud service"
)
raise
logger.debug(f"[ASYNC] Response received (id={random_id}):\n{pformat(j)}")
if is_second_try:
try:
assert j["status"] == 0, "invalid status returned from command"
assert j["data"]["res"] == 0, "invalid res returned from command"
except:
logger.exception(f"Error status when posting command")
raise
else:
if j["status"] != 0 or j["data"] is None or j["data"]["res"] != 0:
raise cls.RenewSidAndRetryException(j)
return j["data"]

# raised upon failure in the first try of a post
class RenewSidAndRetryException(Exception):
def __init__(self, post_response):
Expand All @@ -75,7 +114,7 @@ def send_otp_request(phone):
:return: imei
"""
# generate a random imei with a valid prefix (note: this might not be checked today, but just in case)
imei = f"2b950000{random.randint(10**7, 10**8-1)}"
imei = f"2b950000{random.randint(10 ** 7, 10 ** 8 - 1)}"
ElectraAPI.post("SEND_OTP", dict(imei=imei, phone=phone))
return imei

Expand Down Expand Up @@ -104,6 +143,13 @@ def generate_sid(imei, token):
return result["sid"]


async def async_generate_sid(imei, token):
result = await ElectraAPI.async_post(
"VALIDATE_TOKEN", dict(imei=imei, token=token), os_details=True
)
return result["sid"]


def get_shared_sid(imei, token):
date_now = datetime.now()
if (
Expand All @@ -118,6 +164,20 @@ def get_shared_sid(imei, token):
return ElectraAPI.SID


async def async_get_shared_sid(imei, token):
date_now = datetime.now()
if (
ElectraAPI.SID is None
or ElectraAPI.LAST_SID_UPDATE_DATETIME is None
or date_diff_in_seconds(date_now, ElectraAPI.LAST_SID_UPDATE_DATETIME)
> ElectraAPI.MIN_TIME_BETWEEN_SID_UPDATES
):
ElectraAPI.SID = await async_generate_sid(imei, token)
ElectraAPI.LAST_SID_UPDATE_DATETIME = date_now
logger.info(f"renewed shared sid: {ElectraAPI.SID}")
return ElectraAPI.SID


def date_diff_in_seconds(dt2, dt1):
timedelta = dt2 - dt1
return timedelta.total_seconds()
Expand All @@ -142,6 +202,7 @@ def __init__(self, imei, token, ac_id, sid=None, use_single_sid=False):
self.sid = sid
self._status = None
self._model = None
self.last_update_status = None

def renew_sid(self):
try:
Expand All @@ -153,8 +214,25 @@ def renew_sid(self):
except ElectraAPI.RenewSidAndRetryException as exc:
raise Exception(f"Failed to renew sid: {exc.res_desc}")

def update_status(self):
self._status = self._fetch_status()
async def async_renew_sid(self):
try:
if self.use_singe_sid:
self.sid = await async_get_shared_sid(self.imei, self.token)
else:
self.sid = await async_generate_sid(self.imei, self.token)
logger.debug(f"renewed sid: {self.sid}")
except ElectraAPI.RenewSidAndRetryException as exc:
raise Exception(f"Failed to renew sid: {exc.res_desc}")

def update_status(self, operoper_dict=None):
if operoper_dict is None:
self._status = self._fetch_status()
else:
self._status["OPER"]["OPER"] = operoper_dict.copy()
self.last_update_status = datetime.now()

async def async_update_status(self):
self._status = await self._async_fetch_status()

@property
def status(self):
Expand All @@ -170,16 +248,36 @@ def _fetch_status(self):
status = {k: self._parse_status_group(v) for k, v in cj.items()}
return status

async def _async_fetch_status(self):
r = await self._async_post_with_sid_check(
"GET_LAST_TELEMETRY", dict(id=self.ac_id, commandName="OPER,DIAG_L2,HB")
)
cj = r["commandJson"]
status = {k: self._parse_status_group(v) for k, v in cj.items()}
return status

def _post_with_sid_check(self, cmd, data, os_details=False):
try:
return self._post(cmd, data, os_details, False)
except ElectraAPI.RenewSidAndRetryException:
self.renew_sid()
return self._post(cmd, data, os_details, True)

async def _async_post_with_sid_check(self, cmd, data, os_details=False):
try:
return await self._async_post(cmd, data, os_details, False)
except ElectraAPI.RenewSidAndRetryException:
await self.async_renew_sid()
return await self._async_post(cmd, data, os_details, True)

def _post(self, cmd, data, os_details=False, is_second_try=False):
return ElectraAPI.post(cmd, data, self._get_sid(), os_details, is_second_try)

async def _async_post(self, cmd, data, os_details=False, is_second_try=False):
return await ElectraAPI.async_post(
cmd, data, self._get_sid(), os_details, is_second_try
)

def _get_sid(self):
if self.use_singe_sid:
return ElectraAPI.SID
Expand All @@ -193,15 +291,17 @@ def _parse_status_group(cls, v):
return json.loads(v)

@contextmanager
def _modify_oper_and_send_command(self):
self.update_status()
def _modify_oper_and_send_command(self, update_status=True):
if self._status is None or update_status:
self.update_status()
new_oper = self.status.raw["OPER"]["OPER"].copy()
# make any needed modifications inplace within the context
yield new_oper
self._post_with_sid_check(
"SEND_COMMAND",
dict(id=self.ac_id, commandJson=json.dumps({"OPER": new_oper})),
)
self.update_status(operoper_dict=new_oper)

def modify_oper(
self,
Expand All @@ -213,8 +313,9 @@ def modify_oper(
shabat=None,
ac_sleep=None,
ifeel=None,
update_status=True,
):
with self._modify_oper_and_send_command() as oper:
with self._modify_oper_and_send_command(update_status=update_status) as oper:
if ac_mode is not None:
if self.model.on_off_flag:
if ac_mode == "STBY":
Expand Down Expand Up @@ -245,8 +346,8 @@ def modify_oper(
if ifeel is not None and "IFEEL" in oper:
oper["IFEEL"] = ifeel

def turn_off(self):
with self._modify_oper_and_send_command() as oper:
def turn_off(self, update_status=True):
with self._modify_oper_and_send_command(update_status=update_status) as oper:
if self.model.on_off_flag:
oper["TURN_ON_OFF"] = "OFF"
else:
Expand Down