From 10bc22d6f2f08ed75a782d7dfd7fec0f3652d409 Mon Sep 17 00:00:00 2001 From: moisses89 <7888669+moisses89@users.noreply.github.com> Date: Wed, 10 Jan 2024 14:24:18 +0100 Subject: [PATCH] Add message endpoints requests to safe transaction client --- gnosis/safe/api/transaction_service_api.py | 71 ++++++++++++++++++++++ 1 file changed, 71 insertions(+) diff --git a/gnosis/safe/api/transaction_service_api.py b/gnosis/safe/api/transaction_service_api.py index 078e1b7fb..6505ba3e2 100644 --- a/gnosis/safe/api/transaction_service_api.py +++ b/gnosis/safe/api/transaction_service_api.py @@ -275,3 +275,74 @@ def post_transaction(self, safe_tx: SafeTx) -> bool: if not response.ok: raise SafeAPIException(f"Error posting transaction: {response.content}") return True + + def post_message( + self, + safe_address: ChecksumAddress, + message: Union[str, Dict], + signature: bytes, + safe_app_id: Optional[int] = 0, + ) -> bool: + """ + Create safe message on transaction service for provided Safe address + + :param safe_address: + :param message: + :param signature: + :return: + """ + payload = { + "message": message, + "safeAppId": safe_app_id, + "signature": HexBytes(signature).hex(), + } + response = self._post_request( + f"/api/v1/safes/{safe_address}/messages/", payload + ) + if not response.ok: + raise SafeAPIException(f"Error posting message: {response.content}") + return True + + def get_messages(self, safe_address: ChecksumAddress) -> List[Dict[str, Any]]: + """ + + :param safe_address: + :return: list of messages for provided Safe address + """ + response = self._get_request(f"/api/v1/safes/{safe_address}/messages/") + if not response.ok: + raise SafeAPIException(f"Cannot get messages: {response.content}") + return response.json().get("results", []) + + def post_message_signature( + self, safe_message_hash: bytes, signature: bytes + ) -> bool: + """ + Add a new confirmation for provided Safe message hash + + :param safe_message_hash: + :param signature: + :return: + """ + payload = {"signature": HexBytes(signature).hex()} + response = self._post_request( + f"/api/v1/messages/{HexBytes(safe_message_hash).hex()}/signatures/", payload + ) + if not response.ok: + raise SafeAPIException( + f"Error posting message signature: {response.content}" + ) + return True + + def get_message(self, safe_message_hash: bytes) -> Dict[str, Any]: + """ + + :param safe_message_hash: + :return: Safe message for provided Safe message hash + """ + response = self._get_request( + f"/api/v1/messages/{HexBytes(safe_message_hash).hex()}/" + ) + if not response.ok: + raise SafeAPIException(f"Cannot get messages: {response.content}") + return response.json()