diff --git a/dune_client/api/base.py b/dune_client/api/base.py index 14c40bb..79abddf 100644 --- a/dune_client/api/base.py +++ b/dune_client/api/base.py @@ -96,6 +96,7 @@ def _build_parameters( sort_by: Optional[List[str]] = None, limit: Optional[int] = None, offset: Optional[int] = None, + allow_partial_results: str = "true", ) -> Dict[str, Union[str, int]]: """ Utility function that builds a dictionary of parameters to be used @@ -111,6 +112,7 @@ def _build_parameters( ), "sampling cannot be combined with filters or pagination" params = params or {} + params["allow_partial_results"] = allow_partial_results if columns is not None and len(columns) > 0: params["columns"] = ",".join(columns) if sample_count is not None: diff --git a/dune_client/api/execution.py b/dune_client/api/execution.py index ba7b244..e15e6bd 100644 --- a/dune_client/api/execution.py +++ b/dune_client/api/execution.py @@ -22,6 +22,7 @@ ResultsResponse, ExecutionResultCSV, DuneError, + ExecutionState, ) from dune_client.query import QueryBase @@ -80,6 +81,7 @@ def get_execution_results( sample_count: Optional[int] = None, filters: Optional[str] = None, sort_by: Optional[List[str]] = None, + allow_partial_results: str = "true", ) -> ResultsResponse: """GET results from Dune API for `job_id` (aka `execution_id`)""" params = self._build_parameters( @@ -89,6 +91,7 @@ def get_execution_results( sort_by=sort_by, limit=limit, offset=offset, + allow_partial_results=allow_partial_results, ) route = f"/execution/{job_id}/results" @@ -126,9 +129,7 @@ def get_execution_results_csv( return self._get_execution_results_csv_by_url(url=url, params=params) def _get_execution_results_by_url( - self, - url: str, - params: Optional[Dict[str, Any]] = None, + self, url: str, params: Optional[Dict[str, Any]] = None ) -> ResultsResponse: """ GET results from Dune API with a given URL. This is particularly useful for pagination. @@ -137,7 +138,13 @@ def _get_execution_results_by_url( response_json = self._get(url=url, params=params) try: - return ResultsResponse.from_dict(response_json) + result = ResultsResponse.from_dict(response_json) + if result.state == ExecutionState.PARTIAL: + self.logger.warning( + f"execution {result.execution_id} resulted in a partial " + f"result set (i.e. results too large)." + ) + return result except KeyError as err: raise DuneError(response_json, "ResultsResponse", err) from err diff --git a/dune_client/api/extensions.py b/dune_client/api/extensions.py index 3aba004..44fda20 100644 --- a/dune_client/api/extensions.py +++ b/dune_client/api/extensions.py @@ -54,6 +54,7 @@ def run_query( sample_count: Optional[int] = None, filters: Optional[str] = None, sort_by: Optional[List[str]] = None, + allow_partial_results: str = "true", ) -> ResultsResponse: """ Executes a Dune `query`, waits until execution completes, @@ -83,6 +84,7 @@ def run_query( filters=filters, sort_by=sort_by, limit=limit, + allow_partial_results=allow_partial_results, ), ) @@ -416,10 +418,11 @@ def _refresh( ) time.sleep(ping_frequency) status = self.get_execution_status(job_id) + if status.state == ExecutionState.PENDING: + self.logger.warning("Partial result set retrieved.") if status.state == ExecutionState.FAILED: self.logger.error(status) raise QueryFailed(f"Error data: {status.error}") - return job_id def _fetch_entire_result( diff --git a/dune_client/client_async.py b/dune_client/client_async.py index 2db989e..7af0b20 100644 --- a/dune_client/client_async.py +++ b/dune_client/client_async.py @@ -175,6 +175,7 @@ async def _get() -> Any: headers=self.default_headers(), params=params, ) + print(response) if raw: return response return await self._handle_response(response) diff --git a/dune_client/models.py b/dune_client/models.py index 270288f..d648f08 100644 --- a/dune_client/models.py +++ b/dune_client/models.py @@ -47,6 +47,7 @@ class ExecutionState(Enum): COMPLETED = "QUERY_STATE_COMPLETED" EXECUTING = "QUERY_STATE_EXECUTING" + PARTIAL = "QUERY_STATE_COMPLETED_PARTIAL" PENDING = "QUERY_STATE_PENDING" CANCELLED = "QUERY_STATE_CANCELLED" FAILED = "QUERY_STATE_FAILED" @@ -57,7 +58,7 @@ def terminal_states(cls) -> set[ExecutionState]: """ Returns the terminal states (i.e. when a query execution is no longer executing """ - return {cls.COMPLETED, cls.CANCELLED, cls.FAILED, cls.EXPIRED} + return {cls.COMPLETED, cls.CANCELLED, cls.FAILED, cls.EXPIRED, cls.PARTIAL} def is_complete(self) -> bool: """Returns True is state is completed, otherwise False.""" @@ -311,6 +312,7 @@ class ResultsResponse: @classmethod def from_dict(cls, data: dict[str, str | int | ResultData]) -> ResultsResponse: """Constructor from dictionary. See unit test for sample input.""" + print(data) assert isinstance(data["execution_id"], str) assert isinstance(data["query_id"], int) assert isinstance(data["state"], str)