Skip to content

Commit

Permalink
Merge pull request #254 from cardanoapi/fix/db-sync-exception
Browse files Browse the repository at this point in the history
Fix/db sync exception
  • Loading branch information
mesudip authored Nov 17, 2024
2 parents d76f713 + 4d299bb commit 62a7f01
Show file tree
Hide file tree
Showing 3 changed files with 124 additions and 78 deletions.
48 changes: 35 additions & 13 deletions api/backend/app/services/agent_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
from backend.app.services.template_trigger_service import TemplateTriggerService
from backend.app.services.trigger_service import TriggerService
from backend.config.api_settings import api_settings
from backend.config.logger import logger


def check_if_agent_is_online(last_active: datetime | None) -> bool:
Expand Down Expand Up @@ -200,14 +201,18 @@ async def fetch_data(self, url, session: ClientSession):
try:
return await response.json()
except:
logger.error("Error fetching agent Drep details , DB Sync upstream service error")
raise HTTPException(status_code=400, content="Error fetching agent Drep details")

async def fetch_balance(self, stake_address: str, session: ClientSession):
async with session.get(f"{api_settings.DB_SYNC_API}/address/balance?address={stake_address}") as response:
try:
return await response.json()
except:
raise HTTPException(status_code=400, content="Error fetching agent wallet balance")
logger.error("Error fetching agent wallet balance. DB Sync upstream service error")
raise HTTPException(
status_code=500, content="Error fetching agent wallet balance. DB Sync upstream service error"
)

async def fetch_drep_details(self, drep_id: str, session: ClientSession) -> Dict[str, float | bool]:
async with session.get(f"{api_settings.DB_SYNC_API}/drep/{drep_id}") as response:
Expand All @@ -217,7 +222,10 @@ async def fetch_drep_details(self, drep_id: str, session: ClientSession) -> Dict
is_drep_registered = res.get("isRegisteredAsDRep", False)
return {"voting_power": voting_power, "is_drep_registered": is_drep_registered}
except:
raise HTTPException(status_code=400, content="Error fetching agent Drep details")
logger.error("Error fetching agent Drep details , DB Sync upstream service error")
raise HTTPException(
status_code=500, content="Error fetching agent Drep details , DB Sync upstream service error"
)

async def fetch_stake_address_details(self, stake_address: str, session: ClientSession):
async with session.get(f"{api_settings.DB_SYNC_API}/stake-address?address={stake_address}") as response:
Expand All @@ -235,7 +243,11 @@ async def fetch_stake_address_details(self, stake_address: str, session: ClientS
last_registered = res.get("registration", {}).get("time", None) if res.get("registration", {}) else None
return {"last_registered": last_registered, "is_stake_registered": is_stake_registered}
except:
raise HTTPException(status_code=400, content="Error fetching agent Drep details")
logger.error("Error fetching agent stake address details , DB Sync upstream service error")
raise HTTPException(
status_code=500,
content="Error fetching agent stake address details , DB Sync upstream service error",
)

async def fetch_delegation_details(self, stake_address: str, session: ClientSession):
async with session.get(f"{api_settings.DB_SYNC_API}/delegation?address={stake_address}") as response:
Expand All @@ -245,7 +257,10 @@ async def fetch_delegation_details(self, stake_address: str, session: ClientSess
pool_id = res.get("pool", {}).get("pool_id") if res.get("pool") else None
return Delegation(pool_id=pool_id, drep_id=drep_id)
except:
raise HTTPException(status_code=400, content="Error fetching agent Drep details")
logger.error("Error fetching agent Delegation details , DB Sync upstream service error")
raise HTTPException(
status_code=500, content="Error fetching agent Delegation details , DB Sync upstream service error"
)

async def return_agent_with_wallet_details(self, agent: AgentResponse):
wallets = await self.agent_instance_wallet_service.get_wallets(agent_id=agent.id)
Expand All @@ -260,15 +275,22 @@ async def return_agent_with_wallet_details(self, agent: AgentResponse):
stake_key_hash=agent_keys.stake_verification_key_hash,
instance_index=0,
)
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as group:
agent_configurations = group.create_task(self.trigger_service.list_triggers_by_agent_id(agent.id))
wallet_balance = group.create_task(self.fetch_balance(wallet.stake_key_hash, session))
drep_details = group.create_task(self.fetch_drep_details(wallet.stake_key_hash, session))
delegation_details = group.create_task(self.fetch_delegation_details(wallet.stake_key_hash, session))
stake_address_details = group.create_task(
self.fetch_stake_address_details(wallet.stake_key_hash, session)
)

try:
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as group:
agent_configurations = group.create_task(self.trigger_service.list_triggers_by_agent_id(agent.id))
wallet_balance = group.create_task(self.fetch_balance(wallet.stake_key_hash, session))
drep_details = group.create_task(self.fetch_drep_details(wallet.stake_key_hash, session))
delegation_details = group.create_task(
self.fetch_delegation_details(wallet.stake_key_hash, session)
)
stake_address_details = group.create_task(
self.fetch_stake_address_details(wallet.stake_key_hash, session)
)
except* Exception as exception:
logger.error("Error fetching agent wallet details , DB Sync upstream service error")
exception # TaskGroup returns the HTTP Exception itself. https://docs.python.org/3/library/asyncio-task.html#asyncio.Task

return AgentResponseWithWalletDetails(
**agent.dict(),
Expand Down
110 changes: 61 additions & 49 deletions api/backend/app/services/drep_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@
from backend.app.repositories.agent_repository import AgentRepository
from backend.config.api_settings import api_settings
from backend.config.database import prisma_connection
from backend.app.exceptions import HTTPException
from backend.config.logger import logger


class DrepService:
Expand All @@ -25,7 +27,8 @@ async def fetch_internal_dreps(self, page: int, page_size: int, search: str | No
)
agents = [
await self.db.prisma.agent.find_first(
include={"wallet_details": True}, where={"id": internalDrep.agent_id, "deleted_at": None}
include={"wallet_details": True},
where={"id": internalDrep.agent_id, "deleted_at": None},
)
]
except:
Expand All @@ -40,33 +43,38 @@ async def fetch_internal_dreps(self, page: int, page_size: int, search: str | No
order={"last_active": "desc"},
),
)
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
for index, agent in enumerate(agents):
tg.create_task(self.fetch_metadata(agent, index, agents, session))

try:
async with aiohttp.ClientSession() as session:
async with asyncio.TaskGroup() as tg:
for index, agent in enumerate(agents):
tg.create_task(self.fetch_metadata(agent, index, agents, session))
except* Exception as exception:
logger.error(exception)
exception # task group returns exception object itselgf

return [agent for agent in agents if agent]

async def fetch_metadata(self, agent: Agent, index: int, agents: [Any], session: ClientSession):
async def fetch_metadata(self, agent: Agent, index: int, agents: list[Any], session: ClientSession):
drep_dict = {}
drep_id = convert_base64_to_hex(agent.wallet_details[0].stake_key_hash)
async with session.get(f"{api_settings.DB_SYNC_API}/drep?dRepId={drep_id}") as response:
response_json = await response.json()
if response_json["items"]:
if drep_id == response_json["items"][0]["drepId"]:
drep_dict = response_json["items"][0]
if drep_dict.get("metadataHash") and drep_dict.get("url"):
url = drep_dict.get("url")
metadata_hash = drep_dict.get("metadataHash")
try:
async with session.get(
f"{api_settings.METADATA_API}/metadata?url={url}&hash={metadata_hash}"
) as metadata_resp:
metadata_resp_json = await metadata_resp.json()
if "hash" in metadata_resp_json:
drep_dict["givenName"] = metadata_resp_json["metadata"]["body"]["givenName"]
except:
pass
if drep_dict.get("metadataHash") and drep_dict.get("url"):
url = drep_dict.get("url")
metadata_hash = drep_dict.get("metadataHash")
try:
async with session.get(
f"{api_settings.METADATA_API}/metadata?url={url}&hash={metadata_hash}"
) as metadata_resp:
metadata_resp_json = await metadata_resp.json()
if "hash" in metadata_resp_json:
drep_dict["givenName"] = metadata_resp_json["metadata"]["body"]["givenName"]
except:
pass
# raise HTTPException(status_code = 500,content='MetaData Service Error')
if drep_dict:
drep_dict = drep_dict | {"agentId": agent.id, "agentName": agent.name}
Expand All @@ -75,43 +83,47 @@ async def fetch_metadata(self, agent: Agent, index: int, agents: [Any], session:
agents[index] = ""

async def fetch_external_dreps(self, page: int, page_size: int, search: str | None):

if search:
fetchUrl = f"{api_settings.DB_SYNC_API}/drep?dRepId={search}"
else:
fetchUrl = f"{api_settings.DB_SYNC_API}/drep?page={page}&size={page_size}"

async with aiohttp.ClientSession() as session:
async with session.get(fetchUrl) as response:
if response.status != 200 or response is None:
logger.error("Error fetching external Dreps , DB Sync upstream service error")
raise HTTPException(
status_code=500, content="Error fetching external Dreps , DB Sync upstream service error"
)
response_json = await response.json()

async with asyncio.TaskGroup() as tg:
async with session.get(fetchUrl) as response:
response_json = await response.json()
for drep in response_json["items"]:
if drep["metadataHash"] and drep["url"]:
url = drep["url"]
metadata_hash = drep["metadataHash"]
tg.create_task(self.fetch_metadata_for_drep(metadata_hash, url, drep))

try:
internalDrep = await self.db.prisma.agentwalletdetails.find_first(
where={"stake_key_hash": convert_string_to_base64(drep["drepId"])}
)
except Exception as e:
internalDrep = False

if internalDrep:
agent = await self.db.prisma.agent.find_first(
where={"id": internalDrep.agent_id, "deleted_at": None}
)
if agent:
drep["agentId"] = agent.id
drep["agentName"] = agent.name
return {
"items": response_json["items"],
"total": response_json["total"],
"page": max(response_json["page"], 1),
"size": response_json["size"],
"pages": max(int(response_json["total"]) // int(response_json["size"]), 1),
}
for drep in response_json["items"]:
if drep["metadataHash"] and drep["url"]:
url = drep["url"]
metadata_hash = drep["metadataHash"]
tg.create_task(self.fetch_metadata_for_drep(metadata_hash, url, drep))

try:
internalDrep = await self.db.prisma.agentwalletdetails.find_first(
where={"stake_key_hash": convert_string_to_base64(drep["drepId"])}
)
except Exception as e:
internalDrep = False

if internalDrep:
agent = await self.db.prisma.agent.find_first(where={"id": internalDrep.agent_id, "deleted_at": None})
if agent:
drep["agentId"] = agent.id
drep["agentName"] = agent.name

return {
"items": response_json["items"],
"total": response_json["total"],
"page": max(response_json["page"], 1),
"size": response_json["size"],
"pages": max(int(response_json["total"]) // int(response_json["size"]), 1),
}

async def fetch_metadata_for_drep(self, metadata_hash: str, url: str, drep: Any):
try:
Expand All @@ -126,7 +138,7 @@ async def fetch_metadata_for_drep(self, metadata_hash: str, url: str, drep: Any)
)
except:
pass
# raise HTTPException(status_code = 500, content='MetaData Service Error')
# raise HTTPException(status_code = 500, content='MetaData Service Error')


def convert_string_to_base64(string):
Expand Down
44 changes: 28 additions & 16 deletions api/backend/app/services/proposal_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from backend.app.models.trigger_history.trigger_history_dto import TriggerHistoryDto
from backend.config.api_settings import api_settings
from backend.config.database import prisma_connection
from backend.config.logger import logger


class ProposalService:
Expand Down Expand Up @@ -63,7 +64,7 @@ async def get_internal_proposals(self, page: int = 1, pageSize: int = 10, search
)

async def add_metadata_and_agent_detail_in_internal_proposal(
self, proposal: TriggerHistoryDto, index: int, results: [Any]
self, proposal: TriggerHistoryDto, index: int, results: list[Any]
):
url = f"{api_settings.DB_SYNC_API}/proposal?proposal={proposal.txHash}"
proposal_data = await self._fetch_proposal_data(url)
Expand All @@ -85,23 +86,28 @@ async def get_external_proposals(self, page: int, pageSize: int, sort: str, sear
search_url = f"{api_settings.DB_SYNC_API}/proposal?page={page}&size={pageSize}&sort={sort}"
if search:
search_url = f"{api_settings.DB_SYNC_API}/proposal?proposal={search}"

async with aiohttp.ClientSession() as session:
async with session.get(search_url) as response:
async with asyncio.TaskGroup() as tg:
response_json = await response.json()
for index, proposal in enumerate(response_json["items"]):
tg.create_task(self.add_agent_in_external_proposals(index, proposal, response_json["items"]))
if proposal.get("metadataHash") and proposal.get("url"):
tg.create_task(
self._fetch_metadata(proposal.get("metadataHash"), proposal.get("url"), proposal)
)
return Page(
items=response_json["items"],
total=response_json["totalCount"],
page=max(response_json["page"] | 0, 1),
size=response_json["size"],
pages=max(int(response_json["totalCount"]) // int(response_json["size"]) | 0, 1),
)
if response.status != 200:
logger.error("Error fetching External Proposals , DB Sync upstream service error")
raise HTTPException(
status_code=500, content="Error fetching External Proposals , DB Sync upstream service error"
)
response_json = await response.json()

async with asyncio.TaskGroup() as tg:
for index, proposal in enumerate(response_json["items"]):
tg.create_task(self.add_agent_in_external_proposals(index, proposal, response_json["items"]))
if proposal.get("metadataHash") and proposal.get("url"):
tg.create_task(self._fetch_metadata(proposal.get("metadataHash"), proposal.get("url"), proposal))
return Page(
items=response_json["items"],
total=response_json["totalCount"],
page=max(response_json["page"] | 0, 1),
size=response_json["size"],
pages=max(int(response_json["totalCount"]) // int(response_json["size"]) | 0, 1),
)

async def add_agent_in_external_proposals(self, index: int, proposal: Any, proposals: List[Any]):
if proposal["txHash"]:
Expand All @@ -112,6 +118,7 @@ async def add_agent_in_external_proposals(self, index: int, proposal: Any, propo
proposal["agentId"] = agent.id
proposal["agentName"] = agent.name
except:
logger.error("Proposal with given id not found")
raise HTTPException(status_code=500, content="Proposal with given id not found")
proposals[index] = proposal

Expand All @@ -133,6 +140,11 @@ async def _fetch_metadata(self, metadata_hash: str, url: str, proposal_dict: Any
async def _fetch_proposal_data(self, url: str):
async with aiohttp.ClientSession() as session:
async with session.get(url) as response:
if response.status == 500:
logger.error("Error fetching proposal MetaData , DB Sync Upstream service error")
raise HTTPException(
status_code=500, content="Error fetching proposal MetaData , DB Sync Upstream service error"
)
if response.status == 200:
data = await response.json()
if "items" in data and data["items"]:
Expand Down

0 comments on commit 62a7f01

Please sign in to comment.