Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Function Arguments for all Client Methods #37

Merged
merged 6 commits into from
Mar 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
208 changes: 83 additions & 125 deletions src/api/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -14,21 +14,14 @@ import {
ExecutionResponseCSV,
SuccessResponse,
} from "../types";
import { ageInHours, sleep } from "../utils";
import { sleep } from "../utils";
import log from "loglevel";
import { logPrefix } from "../utils";
import { ExecutionAPI } from "./execution";
import {
MAX_NUM_ROWS_PER_BATCH,
POLL_FREQUENCY_SECONDS,
THREE_MONTHS_IN_HOURS,
} from "../constants";
import {
ExecutionParams,
ExecutionPerformance,
UploadCSVParams,
} from "../types/requestPayload";
import { POLL_FREQUENCY_SECONDS } from "../constants";
import { ExecutionParams, UploadCSVArgs } from "../types/requestPayload";
import { QueryAPI } from "./query";
import { RunQueryArgs, RunSqlArgs } from "../types/client";

/// Various states of query execution that are "terminal".
const TERMINAL_STATES = [
Expand All @@ -55,37 +48,27 @@ export class DuneClient {

/**
* Runs an existing query by ID via execute, await, return results.
* @param queryID id of the query to be executed
* @param params execution parameters (includes query parameters and execution performance)
* @param batchSize puts a limit on the number of results
* @param pingFrequency how frequently should we check execution status (default: 1s)
*
* @param {RunQueryArgs} args
* @returns Execution Results
*/
async runQuery(
queryID: number,
params?: ExecutionParams,
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
): Promise<ResultsResponse> {
const { state, execution_id: jobID } = await this._runInner(
queryID,
async runQuery(args: RunQueryArgs): Promise<ResultsResponse> {
const { queryId, params, opts } = args;
const { state, execution_id } = await this._runInner(
queryId,
params,
pingFrequency,
opts?.pingFrequency,
);
if (state === ExecutionState.COMPLETED) {
const result = await this.getLatestResult(
queryID,
params?.query_parameters,
batchSize,
);
if (result.execution_id !== jobID) {
const result = await this.getLatestResult(args);
if (result.execution_id !== execution_id) {
throw new DuneError(
`invalid execution ID: expected ${jobID}, got ${result.execution_id}`,
`invalid execution ID: expected ${execution_id}, got ${result.execution_id}`,
);
}
return result;
} else {
const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`;
const message = `refresh (execution ${execution_id}) yields incomplete terminal state ${state}`;
// TODO - log the error in constructor
log.error(logPrefix, message);
throw new DuneError(message);
Expand All @@ -94,28 +77,25 @@ export class DuneClient {

/**
* Runs an existing query by ID via execute, await, return Result CSV.
* @param queryID id of the query to be executed
* @param params execution parameters (includes query parameters and execution performance)
* @param pingFrequency how frequently should we check execution status (default: 1s)
*
* @param {RunQueryArgs} args
* @returns Execution Results as CSV
*/
async runQueryCSV(
queryID: number,
params?: ExecutionParams,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
): Promise<ExecutionResponseCSV> {
const { state, execution_id: jobID } = await this._runInner(
queryID,
async runQueryCSV(args: RunQueryArgs): Promise<ExecutionResponseCSV> {
const { queryId, params, opts } = args;
const { state, execution_id } = await this._runInner(
queryId,
params,
pingFrequency,
opts?.pingFrequency,
);
if (state === ExecutionState.COMPLETED) {
// we can't assert that the execution ids agree here, so we use max age hours as a "safe guard"
return this.exec.getResultCSV(jobID, {
// we can't assert that the execution ids agree here,
// so we use max age hours as a "safe guard"
return this.exec.getResultCSV(execution_id, {
query_parameters: params?.query_parameters,
});
} else {
const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`;
const message = `refresh (execution ${execution_id}) yields incomplete terminal state ${state}`;
// TODO - log the error in constructor
log.error(logPrefix, message);
throw new DuneError(message);
Expand All @@ -125,62 +105,55 @@ export class DuneClient {
/**
* Goes a bit beyond the internal call which returns that last execution results.
* Here contains additional logic to refresh the results if they are too old.
* @param queryId - query to get results of.
* @param parameters - parameters for which they were called.
* @param limit - the number of rows to retrieve
* @param maxAgeHours - oldest acceptable results (if expired results are refreshed)
*
* @param {RunQueryArgs} args
* @returns Latest execution results for the given parameters.
*/
async getLatestResult(
queryId: number,
parameters: QueryParameter[] = [],
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
maxAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<ResultsResponse> {
let results = await this.exec.getLastExecutionResults(queryId, {
query_parameters: parameters,
limit: batchSize,
});
const lastRun: Date = results.execution_ended_at!;
if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) {
log.info(
logPrefix,
`results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`,
);
results = await this.runQuery(queryId, { query_parameters: parameters }, batchSize);
async getLatestResult(args: RunQueryArgs): Promise<ResultsResponse> {
const { queryId, params, opts } = args;
const lastestResults = await this.exec.getLastExecutionResults(
queryId,
{
query_parameters: params?.query_parameters,
limit: opts?.batchSize,
},
opts?.maxAgeHours,
);
let results: ResultsResponse;
if (lastestResults.isExpired) {
log.info(logPrefix, `results expired, re-running query.`);
results = await this.runQuery({ queryId, params, opts });
} else {
results = lastestResults.results;
}
return results;
}

/**
* Get the lastest execution results in CSV format and saves to disk.
* @param queryId - query to get results of.
*
* @param {RunQueryArgs} args
* @param outFile - location to save CSV.
* @param parameters - parameters for which they were called.
* @param batchSize - the page size when retriving results.
* @param maxAgeHours - oldest acceptable results (if expired results are refreshed)
* @returns Latest execution results for the given parameters.
*/
async downloadCSV(
queryId: number,
outFile: string,
parameters: QueryParameter[] = [],
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
maxAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<void> {
const params = { query_parameters: parameters, limit: batchSize };
const lastResults = await this.exec.getLastExecutionResults(queryId, params);
const lastRun: Date = lastResults.execution_ended_at!;
async downloadCSV(args: RunQueryArgs, outFile: string): Promise<void> {
const { queryId, params, opts } = args;
const { isExpired } = await this.exec.getLastExecutionResults(
queryId,
{
query_parameters: params?.query_parameters,
limit: opts?.batchSize,
},
args.opts?.maxAgeHours,
);
let results: Promise<ExecutionResponseCSV>;
if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) {
log.info(
logPrefix,
`results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`,
);
results = this.runQueryCSV(queryId, { query_parameters: parameters }, batchSize);
if (isExpired) {
results = this.runQueryCSV(args);
} else {
// TODO (user cost savings): transform the lastResults into CSV instead of refetching
results = this.exec.getLastResultCSV(queryId, params);
results = this.exec.getLastResultCSV(args.queryId, {
query_parameters: args.params?.query_parameters,
limit: args.opts?.batchSize,
});
}
// Wait for the results promise to resolve and then write the CSV data to the specified outFile
const csvData = (await results).data;
Expand All @@ -193,59 +166,40 @@ export class DuneClient {
* - create, run, get results with optional archive/delete.
* - Query is by default made private and archived after execution.
* Requires Plus subscription!
* @param query_sql - raw sql of query to run
* @param params - query parameters
* @param isPrivate - whether the created query should be private
* @param archiveAfter - whether the created query should be archived immediately after execution
* @param performance - performance tier of execution engine
* @param batchSize - the page size when retriving results.
* @param pingFrequency - how frequently should we check execution status
* @param name - name of the query
* @returns
*
* @returns {Promise<ResultsResponse>}
*/
public async runSql(
query_sql: string,
params?: QueryParameter[],
isPrivate: boolean = true,
archiveAfter: boolean = true,
performance?: ExecutionPerformance,
batchSize: number = MAX_NUM_ROWS_PER_BATCH,
pingFrequency: number = POLL_FREQUENCY_SECONDS,
name: string = "API Query",
): Promise<ResultsResponse> {
const queryID = await this.query.createQuery({
name,
public async runSql(args: RunSqlArgs): Promise<ResultsResponse> {
const { name, query_sql, params, isPrivate, archiveAfter, opts } = args;
const queryId = await this.query.createQuery({
name: name ? name : "API Query",
query_sql,
query_parameters: params,
query_parameters: params?.query_parameters,
is_private: isPrivate,
});
let results: ResultsResponse;

try {
results = await this.runQuery(
queryID,
{ query_parameters: params, performance },
batchSize,
pingFrequency,
);
results = await this.runQuery({ queryId, params, opts });
} finally {
if (archiveAfter) {
this.query.archiveQuery(queryID);
this.query.archiveQuery(queryId);
}
}

return results;
}

/**
* Allows for anyone to upload a CSV as a table in Dune.
* The size limit per upload is currently 200MB.
* Allows for anyone to upload a CSV as a table in Dune.
* The size limit per upload is currently 200MB.
* Storage is limited by plan, 1MB on free, 15GB on plus, and 50GB on premium.
* @param params UploadCSVParams relevant fields related to dataset upload.
*
* @param args UploadCSVParams relevant fields related to dataset upload.
* @returns boolean representing if upload was successful.
*/
async uploadCsv(params: UploadCSVParams): Promise<boolean> {
const response = await this.exec.post<SuccessResponse>("table/upload/csv", params);
async uploadCsv(args: UploadCSVArgs): Promise<boolean> {
const response = await this.exec.post<SuccessResponse>("table/upload/csv", args);
try {
return Boolean(response.success);
} catch (err) {
Expand Down Expand Up @@ -283,8 +237,12 @@ export class DuneClient {
async refresh(
queryID: number,
parameters: QueryParameter[] = [],
pingFrequency: number = 1,
pingFrequency?: number,
): Promise<ResultsResponse> {
return this.runQuery(queryID, { query_parameters: parameters }, pingFrequency);
return this.runQuery({
queryId: queryID,
params: { query_parameters: parameters },
opts: { pingFrequency },
});
}
}
30 changes: 21 additions & 9 deletions src/api/execution.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,10 @@ import {
concatResultResponse,
concatResultCSV,
SuccessResponse,
LatestResultsResponse,
} from "../types";
import log from "loglevel";
import { logPrefix } from "../utils";
import { ageInHours, logPrefix, withDefaults } from "../utils";
import { Router } from "./router";
import {
ExecutionParams,
Expand All @@ -20,6 +21,8 @@ import {
DEFAULT_GET_PARAMS,
DUNE_CSV_NEXT_OFFSET_HEADER,
DUNE_CSV_NEXT_URI_HEADER,
MAX_NUM_ROWS_PER_BATCH,
THREE_MONTHS_IN_HOURS,
} from "../constants";

/**
Expand Down Expand Up @@ -85,7 +88,7 @@ export class ExecutionAPI extends Router {
* Retrieve results of a query execution by executionID:
* https://docs.dune.com/api-reference/executions/endpoint/get-execution-result
* @param {string} executionId string representig ID of query execution
* @param {GetResultParams} params including limit, offset and expectedID.
* @param {GetResultParams} params including limit, offset
* @returns {ResultsResponse} response containing execution results.
*/
async getExecutionResults(
Expand All @@ -94,7 +97,7 @@ export class ExecutionAPI extends Router {
): Promise<ResultsResponse> {
const response: ResultsResponse = await this._get(
`execution/${executionId}/results`,
params,
withDefaults(params, { limit: MAX_NUM_ROWS_PER_BATCH }),
);
log.debug(logPrefix, `get_result response ${JSON.stringify(response)}`);
return response as ResultsResponse;
Expand All @@ -104,7 +107,7 @@ export class ExecutionAPI extends Router {
* Retrieve results of a query execution (in CSV format) by executionID:
* https://docs.dune.com/api-reference/executions/endpoint/get-execution-result-csv
* @param {string} executionId string representig ID of query execution.
* @param {GetResultParams} params including limit, offset and expectedID.
* @param {GetResultParams} params including limit, offset
* @returns {ExecutionResponseCSV} execution results as CSV.
*/
async getResultCSV(
Expand All @@ -124,15 +127,24 @@ export class ExecutionAPI extends Router {
* Retrieves results from query's last execution
* @param {number} queryID id of query to get results for.
* @param {GetResultParams} params parameters for retrieval.
* @returns {ResultsResponse} response containing execution results.
* @param {number} expiryAgeHours What is considered to be an expired result set.
* @returns {LatestResultsResponse} response containing execution results and boolean field
*/
async getLastExecutionResults(
queryId: number,
params: GetResultParams = DEFAULT_GET_PARAMS,
): Promise<ResultsResponse> {
/// What is considered to be an expired result set.
expiryAgeHours: number = THREE_MONTHS_IN_HOURS,
): Promise<LatestResultsResponse> {
// The first bit might only return a page.
const results = await this._get<ResultsResponse>(`query/${queryId}/results`, params);
return this._fetchEntireResult(results);
const results = await this._get<ResultsResponse>(
`query/${queryId}/results`,
withDefaults(params, { limit: MAX_NUM_ROWS_PER_BATCH }),
);
const lastRun: Date = results.execution_ended_at!;
const maxAge = expiryAgeHours;
const isExpired = lastRun !== undefined && ageInHours(lastRun) > maxAge;
return { results: await this._fetchEntireResult(results), isExpired };
}

/**
Expand All @@ -147,7 +159,7 @@ export class ExecutionAPI extends Router {
): Promise<ExecutionResponseCSV> {
const response = await this._get<Response>(
`query/${queryId}/results/csv`,
params,
withDefaults(params, { limit: MAX_NUM_ROWS_PER_BATCH }),
true,
);
return this._fetchEntireResultCSV(await this.buildCSVResponse(response));
Expand Down
Loading