diff --git a/api/backend/app/services/agent_service.py b/api/backend/app/services/agent_service.py index 1e326a56..8ad75e1d 100644 --- a/api/backend/app/services/agent_service.py +++ b/api/backend/app/services/agent_service.py @@ -27,6 +27,7 @@ from backend.app.services.template_trigger_service import TemplateTriggerService from backend.app.services.trigger_service import TriggerService from backend.config.api_settings import api_settings +from backend.config.logger import logger def check_if_agent_is_online(last_active: datetime | None) -> bool: @@ -200,6 +201,7 @@ async def fetch_data(self, url, session: ClientSession): try: return await response.json() except: + logger.error("Error fetching agent Drep details , DB Sync upstream service error") raise HTTPException(status_code=400, content="Error fetching agent Drep details") async def fetch_balance(self, stake_address: str, session: ClientSession): @@ -207,7 +209,10 @@ async def fetch_balance(self, stake_address: str, session: ClientSession): try: return await response.json() except: - raise HTTPException(status_code=400, content="Error fetching agent wallet balance") + logger.error("Error fetching agent wallet balance. DB Sync upstream service error") + raise HTTPException( + status_code=500, content="Error fetching agent wallet balance. DB Sync upstream service error" + ) async def fetch_drep_details(self, drep_id: str, session: ClientSession) -> Dict[str, float | bool]: async with session.get(f"{api_settings.DB_SYNC_API}/drep/{drep_id}") as response: @@ -217,7 +222,10 @@ async def fetch_drep_details(self, drep_id: str, session: ClientSession) -> Dict is_drep_registered = res.get("isRegisteredAsDRep", False) return {"voting_power": voting_power, "is_drep_registered": is_drep_registered} except: - raise HTTPException(status_code=400, content="Error fetching agent Drep details") + logger.error("Error fetching agent Drep details , DB Sync upstream service error") + raise HTTPException( + status_code=500, content="Error fetching agent Drep details , DB Sync upstream service error" + ) async def fetch_stake_address_details(self, stake_address: str, session: ClientSession): async with session.get(f"{api_settings.DB_SYNC_API}/stake-address?address={stake_address}") as response: @@ -235,7 +243,11 @@ async def fetch_stake_address_details(self, stake_address: str, session: ClientS last_registered = res.get("registration", {}).get("time", None) if res.get("registration", {}) else None return {"last_registered": last_registered, "is_stake_registered": is_stake_registered} except: - raise HTTPException(status_code=400, content="Error fetching agent Drep details") + logger.error("Error fetching agent stake address details , DB Sync upstream service error") + raise HTTPException( + status_code=500, + content="Error fetching agent stake address details , DB Sync upstream service error", + ) async def fetch_delegation_details(self, stake_address: str, session: ClientSession): async with session.get(f"{api_settings.DB_SYNC_API}/delegation?address={stake_address}") as response: @@ -245,7 +257,10 @@ async def fetch_delegation_details(self, stake_address: str, session: ClientSess pool_id = res.get("pool", {}).get("pool_id") if res.get("pool") else None return Delegation(pool_id=pool_id, drep_id=drep_id) except: - raise HTTPException(status_code=400, content="Error fetching agent Drep details") + logger.error("Error fetching agent Delegation details , DB Sync upstream service error") + raise HTTPException( + status_code=500, content="Error fetching agent Delegation details , DB Sync upstream service error" + ) async def return_agent_with_wallet_details(self, agent: AgentResponse): wallets = await self.agent_instance_wallet_service.get_wallets(agent_id=agent.id) @@ -260,15 +275,22 @@ async def return_agent_with_wallet_details(self, agent: AgentResponse): stake_key_hash=agent_keys.stake_verification_key_hash, instance_index=0, ) - async with aiohttp.ClientSession() as session: - async with asyncio.TaskGroup() as group: - agent_configurations = group.create_task(self.trigger_service.list_triggers_by_agent_id(agent.id)) - wallet_balance = group.create_task(self.fetch_balance(wallet.stake_key_hash, session)) - drep_details = group.create_task(self.fetch_drep_details(wallet.stake_key_hash, session)) - delegation_details = group.create_task(self.fetch_delegation_details(wallet.stake_key_hash, session)) - stake_address_details = group.create_task( - self.fetch_stake_address_details(wallet.stake_key_hash, session) - ) + + try: + async with aiohttp.ClientSession() as session: + async with asyncio.TaskGroup() as group: + agent_configurations = group.create_task(self.trigger_service.list_triggers_by_agent_id(agent.id)) + wallet_balance = group.create_task(self.fetch_balance(wallet.stake_key_hash, session)) + drep_details = group.create_task(self.fetch_drep_details(wallet.stake_key_hash, session)) + delegation_details = group.create_task( + self.fetch_delegation_details(wallet.stake_key_hash, session) + ) + stake_address_details = group.create_task( + self.fetch_stake_address_details(wallet.stake_key_hash, session) + ) + except* Exception as exception: + logger.error("Error fetching agent wallet details , DB Sync upstream service error") + exception # TaskGroup returns the HTTP Exception itself. https://docs.python.org/3/library/asyncio-task.html#asyncio.Task return AgentResponseWithWalletDetails( **agent.dict(), diff --git a/api/backend/app/services/drep_service.py b/api/backend/app/services/drep_service.py index 95b1b538..8f9e0e97 100644 --- a/api/backend/app/services/drep_service.py +++ b/api/backend/app/services/drep_service.py @@ -10,6 +10,8 @@ from backend.app.repositories.agent_repository import AgentRepository from backend.config.api_settings import api_settings from backend.config.database import prisma_connection +from backend.app.exceptions import HTTPException +from backend.config.logger import logger class DrepService: @@ -25,7 +27,8 @@ async def fetch_internal_dreps(self, page: int, page_size: int, search: str | No ) agents = [ await self.db.prisma.agent.find_first( - include={"wallet_details": True}, where={"id": internalDrep.agent_id, "deleted_at": None} + include={"wallet_details": True}, + where={"id": internalDrep.agent_id, "deleted_at": None}, ) ] except: @@ -40,14 +43,19 @@ async def fetch_internal_dreps(self, page: int, page_size: int, search: str | No order={"last_active": "desc"}, ), ) - async with aiohttp.ClientSession() as session: - async with asyncio.TaskGroup() as tg: - for index, agent in enumerate(agents): - tg.create_task(self.fetch_metadata(agent, index, agents, session)) + + try: + async with aiohttp.ClientSession() as session: + async with asyncio.TaskGroup() as tg: + for index, agent in enumerate(agents): + tg.create_task(self.fetch_metadata(agent, index, agents, session)) + except* Exception as exception: + logger.error(exception) + exception # task group returns exception object itselgf return [agent for agent in agents if agent] - async def fetch_metadata(self, agent: Agent, index: int, agents: [Any], session: ClientSession): + async def fetch_metadata(self, agent: Agent, index: int, agents: list[Any], session: ClientSession): drep_dict = {} drep_id = convert_base64_to_hex(agent.wallet_details[0].stake_key_hash) async with session.get(f"{api_settings.DB_SYNC_API}/drep?dRepId={drep_id}") as response: @@ -55,18 +63,18 @@ async def fetch_metadata(self, agent: Agent, index: int, agents: [Any], session: if response_json["items"]: if drep_id == response_json["items"][0]["drepId"]: drep_dict = response_json["items"][0] - if drep_dict.get("metadataHash") and drep_dict.get("url"): - url = drep_dict.get("url") - metadata_hash = drep_dict.get("metadataHash") - try: - async with session.get( - f"{api_settings.METADATA_API}/metadata?url={url}&hash={metadata_hash}" - ) as metadata_resp: - metadata_resp_json = await metadata_resp.json() - if "hash" in metadata_resp_json: - drep_dict["givenName"] = metadata_resp_json["metadata"]["body"]["givenName"] - except: - pass + if drep_dict.get("metadataHash") and drep_dict.get("url"): + url = drep_dict.get("url") + metadata_hash = drep_dict.get("metadataHash") + try: + async with session.get( + f"{api_settings.METADATA_API}/metadata?url={url}&hash={metadata_hash}" + ) as metadata_resp: + metadata_resp_json = await metadata_resp.json() + if "hash" in metadata_resp_json: + drep_dict["givenName"] = metadata_resp_json["metadata"]["body"]["givenName"] + except: + pass # raise HTTPException(status_code = 500,content='MetaData Service Error') if drep_dict: drep_dict = drep_dict | {"agentId": agent.id, "agentName": agent.name} @@ -75,43 +83,47 @@ async def fetch_metadata(self, agent: Agent, index: int, agents: [Any], session: agents[index] = "" async def fetch_external_dreps(self, page: int, page_size: int, search: str | None): - if search: fetchUrl = f"{api_settings.DB_SYNC_API}/drep?dRepId={search}" else: fetchUrl = f"{api_settings.DB_SYNC_API}/drep?page={page}&size={page_size}" async with aiohttp.ClientSession() as session: + async with session.get(fetchUrl) as response: + if response.status != 200 or response is None: + logger.error("Error fetching external Dreps , DB Sync upstream service error") + raise HTTPException( + status_code=500, content="Error fetching external Dreps , DB Sync upstream service error" + ) + response_json = await response.json() + async with asyncio.TaskGroup() as tg: - async with session.get(fetchUrl) as response: - response_json = await response.json() - for drep in response_json["items"]: - if drep["metadataHash"] and drep["url"]: - url = drep["url"] - metadata_hash = drep["metadataHash"] - tg.create_task(self.fetch_metadata_for_drep(metadata_hash, url, drep)) - - try: - internalDrep = await self.db.prisma.agentwalletdetails.find_first( - where={"stake_key_hash": convert_string_to_base64(drep["drepId"])} - ) - except Exception as e: - internalDrep = False - - if internalDrep: - agent = await self.db.prisma.agent.find_first( - where={"id": internalDrep.agent_id, "deleted_at": None} - ) - if agent: - drep["agentId"] = agent.id - drep["agentName"] = agent.name - return { - "items": response_json["items"], - "total": response_json["total"], - "page": max(response_json["page"], 1), - "size": response_json["size"], - "pages": max(int(response_json["total"]) // int(response_json["size"]), 1), - } + for drep in response_json["items"]: + if drep["metadataHash"] and drep["url"]: + url = drep["url"] + metadata_hash = drep["metadataHash"] + tg.create_task(self.fetch_metadata_for_drep(metadata_hash, url, drep)) + + try: + internalDrep = await self.db.prisma.agentwalletdetails.find_first( + where={"stake_key_hash": convert_string_to_base64(drep["drepId"])} + ) + except Exception as e: + internalDrep = False + + if internalDrep: + agent = await self.db.prisma.agent.find_first(where={"id": internalDrep.agent_id, "deleted_at": None}) + if agent: + drep["agentId"] = agent.id + drep["agentName"] = agent.name + + return { + "items": response_json["items"], + "total": response_json["total"], + "page": max(response_json["page"], 1), + "size": response_json["size"], + "pages": max(int(response_json["total"]) // int(response_json["size"]), 1), + } async def fetch_metadata_for_drep(self, metadata_hash: str, url: str, drep: Any): try: @@ -126,7 +138,7 @@ async def fetch_metadata_for_drep(self, metadata_hash: str, url: str, drep: Any) ) except: pass - # raise HTTPException(status_code = 500, content='MetaData Service Error') + # raise HTTPException(status_code = 500, content='MetaData Service Error') def convert_string_to_base64(string): diff --git a/api/backend/app/services/proposal_service.py b/api/backend/app/services/proposal_service.py index bb2d5987..0a523f91 100644 --- a/api/backend/app/services/proposal_service.py +++ b/api/backend/app/services/proposal_service.py @@ -8,6 +8,7 @@ from backend.app.models.trigger_history.trigger_history_dto import TriggerHistoryDto from backend.config.api_settings import api_settings from backend.config.database import prisma_connection +from backend.config.logger import logger class ProposalService: @@ -63,7 +64,7 @@ async def get_internal_proposals(self, page: int = 1, pageSize: int = 10, search ) async def add_metadata_and_agent_detail_in_internal_proposal( - self, proposal: TriggerHistoryDto, index: int, results: [Any] + self, proposal: TriggerHistoryDto, index: int, results: list[Any] ): url = f"{api_settings.DB_SYNC_API}/proposal?proposal={proposal.txHash}" proposal_data = await self._fetch_proposal_data(url) @@ -85,23 +86,28 @@ async def get_external_proposals(self, page: int, pageSize: int, sort: str, sear search_url = f"{api_settings.DB_SYNC_API}/proposal?page={page}&size={pageSize}&sort={sort}" if search: search_url = f"{api_settings.DB_SYNC_API}/proposal?proposal={search}" + async with aiohttp.ClientSession() as session: async with session.get(search_url) as response: - async with asyncio.TaskGroup() as tg: - response_json = await response.json() - for index, proposal in enumerate(response_json["items"]): - tg.create_task(self.add_agent_in_external_proposals(index, proposal, response_json["items"])) - if proposal.get("metadataHash") and proposal.get("url"): - tg.create_task( - self._fetch_metadata(proposal.get("metadataHash"), proposal.get("url"), proposal) - ) - return Page( - items=response_json["items"], - total=response_json["totalCount"], - page=max(response_json["page"] | 0, 1), - size=response_json["size"], - pages=max(int(response_json["totalCount"]) // int(response_json["size"]) | 0, 1), - ) + if response.status != 200: + logger.error("Error fetching External Proposals , DB Sync upstream service error") + raise HTTPException( + status_code=500, content="Error fetching External Proposals , DB Sync upstream service error" + ) + response_json = await response.json() + + async with asyncio.TaskGroup() as tg: + for index, proposal in enumerate(response_json["items"]): + tg.create_task(self.add_agent_in_external_proposals(index, proposal, response_json["items"])) + if proposal.get("metadataHash") and proposal.get("url"): + tg.create_task(self._fetch_metadata(proposal.get("metadataHash"), proposal.get("url"), proposal)) + return Page( + items=response_json["items"], + total=response_json["totalCount"], + page=max(response_json["page"] | 0, 1), + size=response_json["size"], + pages=max(int(response_json["totalCount"]) // int(response_json["size"]) | 0, 1), + ) async def add_agent_in_external_proposals(self, index: int, proposal: Any, proposals: List[Any]): if proposal["txHash"]: @@ -112,6 +118,7 @@ async def add_agent_in_external_proposals(self, index: int, proposal: Any, propo proposal["agentId"] = agent.id proposal["agentName"] = agent.name except: + logger.error("Proposal with given id not found") raise HTTPException(status_code=500, content="Proposal with given id not found") proposals[index] = proposal @@ -133,6 +140,11 @@ async def _fetch_metadata(self, metadata_hash: str, url: str, proposal_dict: Any async def _fetch_proposal_data(self, url: str): async with aiohttp.ClientSession() as session: async with session.get(url) as response: + if response.status == 500: + logger.error("Error fetching proposal MetaData , DB Sync Upstream service error") + raise HTTPException( + status_code=500, content="Error fetching proposal MetaData , DB Sync Upstream service error" + ) if response.status == 200: data = await response.json() if "items" in data and data["items"]: