diff --git a/src/api/client.ts b/src/api/client.ts index bee5088..252aafc 100644 --- a/src/api/client.ts +++ b/src/api/client.ts @@ -14,21 +14,14 @@ import { ExecutionResponseCSV, SuccessResponse, } from "../types"; -import { ageInHours, sleep } from "../utils"; +import { sleep } from "../utils"; import log from "loglevel"; import { logPrefix } from "../utils"; import { ExecutionAPI } from "./execution"; -import { - MAX_NUM_ROWS_PER_BATCH, - POLL_FREQUENCY_SECONDS, - THREE_MONTHS_IN_HOURS, -} from "../constants"; -import { - ExecutionParams, - ExecutionPerformance, - UploadCSVParams, -} from "../types/requestPayload"; +import { POLL_FREQUENCY_SECONDS } from "../constants"; +import { ExecutionParams, UploadCSVArgs } from "../types/requestPayload"; import { QueryAPI } from "./query"; +import { RunQueryArgs, RunSqlArgs } from "../types/client"; /// Various states of query execution that are "terminal". const TERMINAL_STATES = [ @@ -55,37 +48,27 @@ export class DuneClient { /** * Runs an existing query by ID via execute, await, return results. - * @param queryID id of the query to be executed - * @param params execution parameters (includes query parameters and execution performance) - * @param batchSize puts a limit on the number of results - * @param pingFrequency how frequently should we check execution status (default: 1s) + * + * @param {RunQueryArgs} args * @returns Execution Results */ - async runQuery( - queryID: number, - params?: ExecutionParams, - batchSize: number = MAX_NUM_ROWS_PER_BATCH, - pingFrequency: number = POLL_FREQUENCY_SECONDS, - ): Promise { - const { state, execution_id: jobID } = await this._runInner( - queryID, + async runQuery(args: RunQueryArgs): Promise { + const { queryId, params, opts } = args; + const { state, execution_id } = await this._runInner( + queryId, params, - pingFrequency, + opts?.pingFrequency, ); if (state === ExecutionState.COMPLETED) { - const result = await this.getLatestResult( - queryID, - params?.query_parameters, - batchSize, - ); - if (result.execution_id !== jobID) { + const result = await this.getLatestResult(args); + if (result.execution_id !== execution_id) { throw new DuneError( - `invalid execution ID: expected ${jobID}, got ${result.execution_id}`, + `invalid execution ID: expected ${execution_id}, got ${result.execution_id}`, ); } return result; } else { - const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`; + const message = `refresh (execution ${execution_id}) yields incomplete terminal state ${state}`; // TODO - log the error in constructor log.error(logPrefix, message); throw new DuneError(message); @@ -94,28 +77,25 @@ export class DuneClient { /** * Runs an existing query by ID via execute, await, return Result CSV. - * @param queryID id of the query to be executed - * @param params execution parameters (includes query parameters and execution performance) - * @param pingFrequency how frequently should we check execution status (default: 1s) + * + * @param {RunQueryArgs} args * @returns Execution Results as CSV */ - async runQueryCSV( - queryID: number, - params?: ExecutionParams, - pingFrequency: number = POLL_FREQUENCY_SECONDS, - ): Promise { - const { state, execution_id: jobID } = await this._runInner( - queryID, + async runQueryCSV(args: RunQueryArgs): Promise { + const { queryId, params, opts } = args; + const { state, execution_id } = await this._runInner( + queryId, params, - pingFrequency, + opts?.pingFrequency, ); if (state === ExecutionState.COMPLETED) { - // we can't assert that the execution ids agree here, so we use max age hours as a "safe guard" - return this.exec.getResultCSV(jobID, { + // we can't assert that the execution ids agree here, + // so we use max age hours as a "safe guard" + return this.exec.getResultCSV(execution_id, { query_parameters: params?.query_parameters, }); } else { - const message = `refresh (execution ${jobID}) yields incomplete terminal state ${state}`; + const message = `refresh (execution ${execution_id}) yields incomplete terminal state ${state}`; // TODO - log the error in constructor log.error(logPrefix, message); throw new DuneError(message); @@ -125,62 +105,55 @@ export class DuneClient { /** * Goes a bit beyond the internal call which returns that last execution results. * Here contains additional logic to refresh the results if they are too old. - * @param queryId - query to get results of. - * @param parameters - parameters for which they were called. - * @param limit - the number of rows to retrieve - * @param maxAgeHours - oldest acceptable results (if expired results are refreshed) + * + * @param {RunQueryArgs} args * @returns Latest execution results for the given parameters. */ - async getLatestResult( - queryId: number, - parameters: QueryParameter[] = [], - batchSize: number = MAX_NUM_ROWS_PER_BATCH, - maxAgeHours: number = THREE_MONTHS_IN_HOURS, - ): Promise { - let results = await this.exec.getLastExecutionResults(queryId, { - query_parameters: parameters, - limit: batchSize, - }); - const lastRun: Date = results.execution_ended_at!; - if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) { - log.info( - logPrefix, - `results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`, - ); - results = await this.runQuery(queryId, { query_parameters: parameters }, batchSize); + async getLatestResult(args: RunQueryArgs): Promise { + const { queryId, params, opts } = args; + const lastestResults = await this.exec.getLastExecutionResults( + queryId, + { + query_parameters: params?.query_parameters, + limit: opts?.batchSize, + }, + opts?.maxAgeHours, + ); + let results: ResultsResponse; + if (lastestResults.isExpired) { + log.info(logPrefix, `results expired, re-running query.`); + results = await this.runQuery({ queryId, params, opts }); + } else { + results = lastestResults.results; } return results; } /** * Get the lastest execution results in CSV format and saves to disk. - * @param queryId - query to get results of. + * + * @param {RunQueryArgs} args * @param outFile - location to save CSV. - * @param parameters - parameters for which they were called. - * @param batchSize - the page size when retriving results. - * @param maxAgeHours - oldest acceptable results (if expired results are refreshed) - * @returns Latest execution results for the given parameters. */ - async downloadCSV( - queryId: number, - outFile: string, - parameters: QueryParameter[] = [], - batchSize: number = MAX_NUM_ROWS_PER_BATCH, - maxAgeHours: number = THREE_MONTHS_IN_HOURS, - ): Promise { - const params = { query_parameters: parameters, limit: batchSize }; - const lastResults = await this.exec.getLastExecutionResults(queryId, params); - const lastRun: Date = lastResults.execution_ended_at!; + async downloadCSV(args: RunQueryArgs, outFile: string): Promise { + const { queryId, params, opts } = args; + const { isExpired } = await this.exec.getLastExecutionResults( + queryId, + { + query_parameters: params?.query_parameters, + limit: opts?.batchSize, + }, + args.opts?.maxAgeHours, + ); let results: Promise; - if (lastRun !== undefined && ageInHours(lastRun) > maxAgeHours) { - log.info( - logPrefix, - `results (from ${lastRun}) older than ${maxAgeHours} hours, re-running query.`, - ); - results = this.runQueryCSV(queryId, { query_parameters: parameters }, batchSize); + if (isExpired) { + results = this.runQueryCSV(args); } else { // TODO (user cost savings): transform the lastResults into CSV instead of refetching - results = this.exec.getLastResultCSV(queryId, params); + results = this.exec.getLastResultCSV(args.queryId, { + query_parameters: args.params?.query_parameters, + limit: args.opts?.batchSize, + }); } // Wait for the results promise to resolve and then write the CSV data to the specified outFile const csvData = (await results).data; @@ -193,44 +166,24 @@ export class DuneClient { * - create, run, get results with optional archive/delete. * - Query is by default made private and archived after execution. * Requires Plus subscription! - * @param query_sql - raw sql of query to run - * @param params - query parameters - * @param isPrivate - whether the created query should be private - * @param archiveAfter - whether the created query should be archived immediately after execution - * @param performance - performance tier of execution engine - * @param batchSize - the page size when retriving results. - * @param pingFrequency - how frequently should we check execution status - * @param name - name of the query - * @returns + * + * @returns {Promise} */ - public async runSql( - query_sql: string, - params?: QueryParameter[], - isPrivate: boolean = true, - archiveAfter: boolean = true, - performance?: ExecutionPerformance, - batchSize: number = MAX_NUM_ROWS_PER_BATCH, - pingFrequency: number = POLL_FREQUENCY_SECONDS, - name: string = "API Query", - ): Promise { - const queryID = await this.query.createQuery({ - name, + public async runSql(args: RunSqlArgs): Promise { + const { name, query_sql, params, isPrivate, archiveAfter, opts } = args; + const queryId = await this.query.createQuery({ + name: name ? name : "API Query", query_sql, - query_parameters: params, + query_parameters: params?.query_parameters, is_private: isPrivate, }); let results: ResultsResponse; try { - results = await this.runQuery( - queryID, - { query_parameters: params, performance }, - batchSize, - pingFrequency, - ); + results = await this.runQuery({ queryId, params, opts }); } finally { if (archiveAfter) { - this.query.archiveQuery(queryID); + this.query.archiveQuery(queryId); } } @@ -238,14 +191,15 @@ export class DuneClient { } /** - * Allows for anyone to upload a CSV as a table in Dune. - * The size limit per upload is currently 200MB. + * Allows for anyone to upload a CSV as a table in Dune. + * The size limit per upload is currently 200MB. * Storage is limited by plan, 1MB on free, 15GB on plus, and 50GB on premium. - * @param params UploadCSVParams relevant fields related to dataset upload. + * + * @param args UploadCSVParams relevant fields related to dataset upload. * @returns boolean representing if upload was successful. */ - async uploadCsv(params: UploadCSVParams): Promise { - const response = await this.exec.post("table/upload/csv", params); + async uploadCsv(args: UploadCSVArgs): Promise { + const response = await this.exec.post("table/upload/csv", args); try { return Boolean(response.success); } catch (err) { @@ -283,8 +237,12 @@ export class DuneClient { async refresh( queryID: number, parameters: QueryParameter[] = [], - pingFrequency: number = 1, + pingFrequency?: number, ): Promise { - return this.runQuery(queryID, { query_parameters: parameters }, pingFrequency); + return this.runQuery({ + queryId: queryID, + params: { query_parameters: parameters }, + opts: { pingFrequency }, + }); } } diff --git a/src/api/execution.ts b/src/api/execution.ts index 8675719..ccb2371 100644 --- a/src/api/execution.ts +++ b/src/api/execution.ts @@ -7,9 +7,10 @@ import { concatResultResponse, concatResultCSV, SuccessResponse, + LatestResultsResponse, } from "../types"; import log from "loglevel"; -import { logPrefix } from "../utils"; +import { ageInHours, logPrefix, withDefaults } from "../utils"; import { Router } from "./router"; import { ExecutionParams, @@ -20,6 +21,8 @@ import { DEFAULT_GET_PARAMS, DUNE_CSV_NEXT_OFFSET_HEADER, DUNE_CSV_NEXT_URI_HEADER, + MAX_NUM_ROWS_PER_BATCH, + THREE_MONTHS_IN_HOURS, } from "../constants"; /** @@ -85,7 +88,7 @@ export class ExecutionAPI extends Router { * Retrieve results of a query execution by executionID: * https://docs.dune.com/api-reference/executions/endpoint/get-execution-result * @param {string} executionId string representig ID of query execution - * @param {GetResultParams} params including limit, offset and expectedID. + * @param {GetResultParams} params including limit, offset * @returns {ResultsResponse} response containing execution results. */ async getExecutionResults( @@ -94,7 +97,7 @@ export class ExecutionAPI extends Router { ): Promise { const response: ResultsResponse = await this._get( `execution/${executionId}/results`, - params, + withDefaults(params, { limit: MAX_NUM_ROWS_PER_BATCH }), ); log.debug(logPrefix, `get_result response ${JSON.stringify(response)}`); return response as ResultsResponse; @@ -104,7 +107,7 @@ export class ExecutionAPI extends Router { * Retrieve results of a query execution (in CSV format) by executionID: * https://docs.dune.com/api-reference/executions/endpoint/get-execution-result-csv * @param {string} executionId string representig ID of query execution. - * @param {GetResultParams} params including limit, offset and expectedID. + * @param {GetResultParams} params including limit, offset * @returns {ExecutionResponseCSV} execution results as CSV. */ async getResultCSV( @@ -124,15 +127,24 @@ export class ExecutionAPI extends Router { * Retrieves results from query's last execution * @param {number} queryID id of query to get results for. * @param {GetResultParams} params parameters for retrieval. - * @returns {ResultsResponse} response containing execution results. + * @param {number} expiryAgeHours What is considered to be an expired result set. + * @returns {LatestResultsResponse} response containing execution results and boolean field */ async getLastExecutionResults( queryId: number, params: GetResultParams = DEFAULT_GET_PARAMS, - ): Promise { + /// What is considered to be an expired result set. + expiryAgeHours: number = THREE_MONTHS_IN_HOURS, + ): Promise { // The first bit might only return a page. - const results = await this._get(`query/${queryId}/results`, params); - return this._fetchEntireResult(results); + const results = await this._get( + `query/${queryId}/results`, + withDefaults(params, { limit: MAX_NUM_ROWS_PER_BATCH }), + ); + const lastRun: Date = results.execution_ended_at!; + const maxAge = expiryAgeHours; + const isExpired = lastRun !== undefined && ageInHours(lastRun) > maxAge; + return { results: await this._fetchEntireResult(results), isExpired }; } /** @@ -147,7 +159,7 @@ export class ExecutionAPI extends Router { ): Promise { const response = await this._get( `query/${queryId}/results/csv`, - params, + withDefaults(params, { limit: MAX_NUM_ROWS_PER_BATCH }), true, ); return this._fetchEntireResultCSV(await this.buildCSVResponse(response)); diff --git a/src/types/client.ts b/src/types/client.ts new file mode 100644 index 0000000..119ad9c --- /dev/null +++ b/src/types/client.ts @@ -0,0 +1,39 @@ +import { QueryParameter } from "./queryParameter"; +import { ExecutionParams } from "./requestPayload"; + +export interface Options { + /// The page size when retriving results. + batchSize?: number; + /// How frequently should we check execution status + pingFrequency?: number; + maxAgeHours?: number; +} + +export interface RunQueryArgs { + /// ID of the query. + queryId: number; + params?: ExecutionParams; + opts?: Options; +} + +export interface LatestResultArgs { + /// ID of the query. + queryId: number; + parameters?: QueryParameter[]; + opts?: Options; +} + +export interface RunSqlArgs { + /// raw sql of query to run (Trino/DuneSQL syntax) + query_sql: string; + /// Query execution parameters. + params?: ExecutionParams; + /// Name of created query. + name?: string; + /// Whether the created query should be private or not (default = true). + isPrivate?: boolean; + /// Whether the created query should be archived immediately after execution or not (default = true). + archiveAfter?: boolean; + /// Additional options execution options. + opts?: Options; +} diff --git a/src/types/requestPayload.ts b/src/types/requestPayload.ts index 06adf6c..4470b5b 100644 --- a/src/types/requestPayload.ts +++ b/src/types/requestPayload.ts @@ -12,7 +12,7 @@ export enum ExecutionPerformance { Large = "large", } -export type UploadCSVParams = { +export type UploadCSVArgs = { table_name: string; data: string; description?: string; @@ -25,7 +25,7 @@ export type RequestPayload = | ExecuteQueryParams | UpdateQueryParams | CreateQueryParams - | UploadCSVParams; + | UploadCSVArgs; /// Utility method used by router to parse request payloads. export function payloadJSON(payload?: RequestPayload): string { @@ -79,7 +79,6 @@ export interface GetResultParams extends BaseParams { limit?: number; /// Which row to start returning results from offset?: number; - expectedId?: string; } export interface ExecuteQueryParams extends BaseParams { diff --git a/src/types/response.ts b/src/types/response.ts index e579393..a6cfadf 100644 --- a/src/types/response.ts +++ b/src/types/response.ts @@ -138,6 +138,11 @@ export interface ResultsResponse extends TimeData { result?: ExecutionResult; } +export interface LatestResultsResponse { + results: ResultsResponse; + isExpired?: boolean; +} + export function concatResultResponse( left: ResultsResponse, right: ResultsResponse, diff --git a/src/utils.ts b/src/utils.ts index fc0bde0..f8f45d9 100644 --- a/src/utils.ts +++ b/src/utils.ts @@ -24,3 +24,20 @@ export function ageInHours(timestamp: Date | string): number { // Convert milliseconds to hours and return return resultAge / (1000 * 60 * 60); } + +/** + * + * @param obj Used to populate partial payloads with required defaults + * @param defaults + * @returns + */ +export function withDefaults(obj: T, defaults: Partial): T { + // eslint-disable-next-line @typescript-eslint/no-explicit-any + const result: any = { ...obj }; + for (const key in defaults) { + if (result[key] === undefined) { + result[key] = defaults[key]; + } + } + return result as T; +} diff --git a/tests/e2e/client.spec.ts b/tests/e2e/client.spec.ts index a087490..54e852a 100644 --- a/tests/e2e/client.spec.ts +++ b/tests/e2e/client.spec.ts @@ -19,8 +19,11 @@ describe("DuneClient Extensions", () => { it("executes runQuery", async () => { // https://dune.com/queries/1215383 - const results = await client.runQuery(parameterizedQuery, { - query_parameters: [QueryParameter.text("TextField", "Plain Text")], + const results = await client.runQuery({ + queryId: parameterizedQuery, + params: { + query_parameters: [QueryParameter.text("TextField", "Plain Text")], + }, }); expect(results.result?.rows).to.be.deep.equal([ { @@ -32,11 +35,11 @@ describe("DuneClient Extensions", () => { ]); // pagination: - const multiRowResults = await client.runQuery( - multiRowQuery, - { query_parameters: [QueryParameter.number("StartFrom", 10)] }, - 4, - ); + const multiRowResults = await client.runQuery({ + queryId: multiRowQuery, + params: { query_parameters: [QueryParameter.number("StartFrom", 10)] }, + opts: { batchSize: 4 }, + }); expect(multiRowResults.result?.rows).to.be.deep.equal( [10, 11, 12, 13, 14, 15].map((t) => ({ number: t })), ); @@ -44,8 +47,11 @@ describe("DuneClient Extensions", () => { it("executes runQueryCSV", async () => { // https://dune.com/queries/1215383 - const results = await client.runQueryCSV(parameterizedQuery, { - query_parameters: [QueryParameter.text("TextField", "Plain Text")], + const results = await client.runQueryCSV({ + queryId: parameterizedQuery, + params: { + query_parameters: [QueryParameter.text("TextField", "Plain Text")], + }, }); expect(results.data).to.be.equal( [ @@ -55,34 +61,39 @@ describe("DuneClient Extensions", () => { ); // pagination: - const multiRowResults = await client.runQueryCSV( - multiRowQuery, - { query_parameters: [QueryParameter.number("StartFrom", 3)] }, - 4, - ); + const multiRowResults = await client.runQueryCSV({ + queryId: multiRowQuery, + params: { query_parameters: [QueryParameter.number("StartFrom", 3)] }, + opts: { batchSize: 4 }, + }); expect(multiRowResults.data).to.be.deep.equal("number\n3\n4\n5\n6\n7\n8\n"); }); it("getsLatestResults", async () => { // https://dune.com/queries/1215383 - const results = await client.getLatestResult(1215383, [ - QueryParameter.text("TextField", "Plain Text"), - ]); + const results = await client.getLatestResult({ + queryId: 1215383, + params: { query_parameters: [QueryParameter.text("TextField", "Plain Text")] }, + }); expect(results.result?.rows.length).to.be.greaterThan(0); // pagination: - const multiRowResults = await client.getLatestResult( - multiRowQuery, - [QueryParameter.number("StartFrom", 10)], - 4, - ); + const multiRowResults = await client.getLatestResult({ + queryId: multiRowQuery, + params: { query_parameters: [QueryParameter.number("StartFrom", 10)] }, + opts: { batchSize: 4 }, + }); expect(multiRowResults.result?.rows.length).to.be.equal(6); }); - it("downloadCSV", async () => { - await client.downloadCSV(multiRowQuery, "./out.csv", [ - QueryParameter.number("StartFrom", 3), - ]); + it("downloads CSV", async () => { + await client.downloadCSV( + { + queryId: multiRowQuery, + params: { query_parameters: [QueryParameter.number("StartFrom", 3)] }, + }, + "./out.csv", + ); const fileContents = await fs.readFile("./out.csv", { encoding: "utf8" }); // Compare the contents of the CSV file with the expected string expect(fileContents).to.deep.equal("number\n3\n4\n5\n6\n7\n8\n"); @@ -92,7 +103,11 @@ describe("DuneClient Extensions", () => { it("runSQL", async () => { const premiumClient = new DuneClient(PLUS_KEY); - const results = await premiumClient.runSql("select 1", [], true, true); + const results = await premiumClient.runSql({ + query_sql: "select 1", + archiveAfter: true, + isPrivate: true, + }); const queryID = results.query_id; expect(results.result?.rows).to.be.deep.equal([{ _col0: 1 }]); const query = await premiumClient.query.readQuery(queryID); @@ -111,7 +126,7 @@ describe("DuneClient Extensions", () => { const private_success = await premiumClient.uploadCsv({ table_name: "ts_client_test_private", data: "column1,column2\nvalue1,value2\nvalue3,value4", - is_private: true + is_private: true, }); expect(private_success).to.be.equal(true); }); diff --git a/tests/e2e/executionAPI.spec.ts b/tests/e2e/executionAPI.spec.ts index 58574fa..9be2f31 100644 --- a/tests/e2e/executionAPI.spec.ts +++ b/tests/e2e/executionAPI.spec.ts @@ -105,13 +105,13 @@ describe("ExecutionAPI: native routes", () => { expect(resultCSV.data).to.be.eq(expectedRows.join("")); }); - it("getLastResult", async () => { - const result = await client.getLastExecutionResults(testQueryId, { + it("gets LastResult", async () => { + const { results } = await client.getLastExecutionResults(testQueryId, { query_parameters: [QueryParameter.text("TextField", "Plain Text")], }); - expect(result.result?.rows).to.be.deep.equal([ + expect(results.result?.rows).to.be.deep.equal([ { - date_field: "2022-05-04 00:00:00.000", + date_field: "2022-05-04T00:00:00Z", list_field: "Option 1", number_field: "3.1415926535", text_field: "Plain Text", @@ -119,14 +119,14 @@ describe("ExecutionAPI: native routes", () => { ]); }); - it("getLastResultCSV", async () => { + it("gets LastResultCSV", async () => { // https://dune.com/queries/1215383 const resultCSV = await client.getLastResultCSV(testQueryId, { query_parameters: [QueryParameter.text("TextField", "Plain Text")], }); const expectedRows = [ "text_field,number_field,date_field,list_field\n", - "Plain Text,3.1415926535,2022-05-04 00:00:00.000,Option 1\n", + "Plain Text,3.1415926535,2022-05-04T00:00:00Z,Option 1\n", ]; expect(resultCSV.data).to.be.eq(expectedRows.join("")); }); diff --git a/tests/unit/utils.spec.ts b/tests/unit/utils.spec.ts new file mode 100644 index 0000000..d1225db --- /dev/null +++ b/tests/unit/utils.spec.ts @@ -0,0 +1,14 @@ +import { expect } from "chai"; +import { withDefaults } from "../../src/utils"; + +describe("utility methods", () => { + it("withDefaults: inserts defaults (overriding optionals)", async () => { + interface MyType { + a: string; + b?: string; + } + + const x = { a: "1" }; + expect(withDefaults(x, { b: "2" })).to.be.deep.equal({ a: "1", b: "2" }); + }); +});