Skip to content

Commit

Permalink
Add message endpoints requests to safe transaction client
Browse files Browse the repository at this point in the history
  • Loading branch information
moisses89 committed Jan 10, 2024
1 parent 33c987a commit 10bc22d
Showing 1 changed file with 71 additions and 0 deletions.
71 changes: 71 additions & 0 deletions gnosis/safe/api/transaction_service_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -275,3 +275,74 @@ def post_transaction(self, safe_tx: SafeTx) -> bool:
if not response.ok:
raise SafeAPIException(f"Error posting transaction: {response.content}")
return True

def post_message(
self,
safe_address: ChecksumAddress,
message: Union[str, Dict],
signature: bytes,
safe_app_id: Optional[int] = 0,
) -> bool:
"""
Create safe message on transaction service for provided Safe address
:param safe_address:
:param message:
:param signature:
:return:
"""
payload = {
"message": message,
"safeAppId": safe_app_id,
"signature": HexBytes(signature).hex(),
}
response = self._post_request(
f"/api/v1/safes/{safe_address}/messages/", payload
)
if not response.ok:
raise SafeAPIException(f"Error posting message: {response.content}")
return True

def get_messages(self, safe_address: ChecksumAddress) -> List[Dict[str, Any]]:
"""
:param safe_address:
:return: list of messages for provided Safe address
"""
response = self._get_request(f"/api/v1/safes/{safe_address}/messages/")
if not response.ok:
raise SafeAPIException(f"Cannot get messages: {response.content}")
return response.json().get("results", [])

def post_message_signature(
self, safe_message_hash: bytes, signature: bytes
) -> bool:
"""
Add a new confirmation for provided Safe message hash
:param safe_message_hash:
:param signature:
:return:
"""
payload = {"signature": HexBytes(signature).hex()}
response = self._post_request(
f"/api/v1/messages/{HexBytes(safe_message_hash).hex()}/signatures/", payload
)
if not response.ok:
raise SafeAPIException(
f"Error posting message signature: {response.content}"
)
return True

def get_message(self, safe_message_hash: bytes) -> Dict[str, Any]:
"""
:param safe_message_hash:
:return: Safe message for provided Safe message hash
"""
response = self._get_request(
f"/api/v1/messages/{HexBytes(safe_message_hash).hex()}/"
)
if not response.ok:
raise SafeAPIException(f"Cannot get messages: {response.content}")
return response.json()

0 comments on commit 10bc22d

Please sign in to comment.