-
Notifications
You must be signed in to change notification settings - Fork 8
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
1 changed file
with
338 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,338 @@ | ||
import { bench, describe } from "vitest"; | ||
import { | ||
DuckDBConnection, | ||
DuckDBInstance, | ||
DuckDBPendingResultState, | ||
} from "../../src"; | ||
|
||
let instance: DuckDBInstance; | ||
let connection: DuckDBConnection; | ||
|
||
const TOTAL_SIZE = 1_000_000n; | ||
const SELECTION_SIZE = 100_000n; | ||
|
||
async function setup() { | ||
instance = await DuckDBInstance.create(); | ||
connection = await instance.connect(); | ||
|
||
const prepared = await connection.prepare(` | ||
CREATE OR REPLACE TABLE test AS | ||
SELECT | ||
TIMESTAMP '2025-01-01' + seq::BIGINT * INTERVAL 1 MILLISECOND AS timestamp, | ||
RANDOM() * 1_000_000 AS value | ||
FROM | ||
range($1) AS seq(seq) | ||
`); | ||
|
||
prepared.bindBigInt(1, TOTAL_SIZE); | ||
|
||
await prepared.run(); | ||
} | ||
|
||
const examples: { | ||
name: string; | ||
factory: (start: string, end: string) => string; | ||
}[] = [ | ||
{ | ||
name: "Row Fetching", | ||
factory: (start, end) => | ||
`SELECT * | ||
FROM test | ||
WHERE timestamp BETWEEN TIMESTAMP '2025-01-01' + ${start} | ||
AND TIMESTAMP '2025-01-01' + ${end};`, | ||
}, | ||
{ | ||
name: "Overall Aggregates", | ||
factory: (start, end) => | ||
`SELECT mean("value"), min("value"), max("value") | ||
FROM test | ||
WHERE timestamp BETWEEN TIMESTAMP '2025-01-01' + ${start} | ||
AND TIMESTAMP '2025-01-01' + ${end};`, | ||
}, | ||
{ | ||
name: "Rolling Aggregates", | ||
factory: (start, end) => | ||
`SELECT mean("value") OVER previous_second, | ||
min("value") OVER previous_second, | ||
max("value") OVER previous_second | ||
FROM test | ||
WHERE timestamp BETWEEN TIMESTAMP '2025-01-01' + ${start} | ||
AND TIMESTAMP '2025-01-01' + ${end} | ||
WINDOW previous_second AS ( | ||
ORDER BY "timestamp" ASC | ||
RANGE BETWEEN INTERVAL 1_000 MILLISECONDS PRECEDING | ||
AND INTERVAL 0 MILLISECONDS FOLLOWING);`, | ||
}, | ||
]; | ||
|
||
/** | ||
* Randomly generate a BigInt amount of milliseconds to use as the query start point, allowing for | ||
* SELECTION_SIZE worth of data | ||
*/ | ||
function startMS() { | ||
return BigInt( | ||
Math.floor(Math.random() * Number(TOTAL_SIZE - SELECTION_SIZE)) | ||
); | ||
} | ||
|
||
for (const full of [false, true]) { | ||
for (const example of examples) { | ||
describe(`${example.name} - ${ | ||
full ? "Full Result" : "Time to First Row" | ||
}`, () => { | ||
bench( | ||
`${example.name} - ${ | ||
full ? "runAndReadAll()" : "runAndReadUntil(q, 1)" | ||
}`, | ||
async () => { | ||
const s = startMS(); | ||
const e = s + SELECTION_SIZE; | ||
const startInterval = `INTERVAL ${s} MILLISECONDS`; | ||
const endInterval = `INTERVAL ${e} MILLISECONDS`; | ||
|
||
const query = example.factory(startInterval, endInterval); | ||
|
||
if (full) { | ||
await connection.runAndReadAll(query); | ||
} else { | ||
await connection.runAndReadUntil(query, 1); | ||
} | ||
}, | ||
{ | ||
setup, | ||
iterations: 200, | ||
} | ||
); | ||
bench( | ||
`${example.name} - ${ | ||
full | ||
? "start runTask pending.readAll()" | ||
: "start runTask pending.readUntil(1)" | ||
}`, | ||
async () => { | ||
const s = startMS(); | ||
const e = s + SELECTION_SIZE; | ||
const startInterval = `INTERVAL ${s} MILLISECONDS`; | ||
const endInterval = `INTERVAL ${e} MILLISECONDS`; | ||
|
||
const query = example.factory(startInterval, endInterval); | ||
|
||
const pending = await connection.start(query); | ||
|
||
while (pending.runTask() !== DuckDBPendingResultState.RESULT_READY) { | ||
// Yield for minimal time | ||
await Promise.resolve(); | ||
} | ||
|
||
if (full) { | ||
await pending.readAll(); | ||
} else { | ||
await pending.readUntil(1); | ||
} | ||
}, | ||
{ | ||
setup, | ||
iterations: 200, | ||
} | ||
); | ||
bench( | ||
`${example.name} - ${ | ||
full | ||
? "start runTask fetchChunks loop" | ||
: "start runTask single fetchChunk" | ||
}`, | ||
async () => { | ||
const s = startMS(); | ||
const e = s + SELECTION_SIZE; | ||
const startInterval = `INTERVAL ${s} MILLISECONDS`; | ||
const endInterval = `INTERVAL ${e} MILLISECONDS`; | ||
|
||
const query = example.factory(startInterval, endInterval); | ||
|
||
const pending = await connection.start(query); | ||
|
||
while (pending.runTask() !== DuckDBPendingResultState.RESULT_READY) { | ||
// Yield for minimal time | ||
await Promise.resolve(); | ||
} | ||
|
||
const result = await pending.getResult(); | ||
|
||
if (full) { | ||
while (true) { | ||
const chunk = await result.fetchChunk(); | ||
// Last chunk will have zero rows. | ||
if (!chunk || chunk.rowCount === 0) { | ||
break; | ||
} | ||
} | ||
} else { | ||
await result.fetchChunk(); | ||
} | ||
}, | ||
{ | ||
setup, | ||
iterations: 200, | ||
} | ||
); | ||
|
||
bench( | ||
`${example.name} - ${ | ||
full ? "streamAndReadAll()" : "streamAndReadUntil(q, 1)" | ||
}`, | ||
async () => { | ||
const s = startMS(); | ||
const e = s + SELECTION_SIZE; | ||
const startInterval = `INTERVAL ${s} MILLISECONDS`; | ||
const endInterval = `INTERVAL ${e} MILLISECONDS`; | ||
|
||
const query = example.factory(startInterval, endInterval); | ||
if (full) { | ||
await connection.streamAndReadAll(query); | ||
} else { | ||
await connection.streamAndReadUntil(query, 1); | ||
} | ||
}, | ||
{ | ||
setup, | ||
iterations: 200, | ||
} | ||
); | ||
bench( | ||
`${example.name} - ${ | ||
full | ||
? "startStream runTask pending.readAll()" | ||
: "startStream runTask pending.readUntil(1)" | ||
}`, | ||
async () => { | ||
const s = startMS(); | ||
const e = s + SELECTION_SIZE; | ||
const startInterval = `INTERVAL ${s} MILLISECONDS`; | ||
const endInterval = `INTERVAL ${e} MILLISECONDS`; | ||
|
||
const query = example.factory(startInterval, endInterval); | ||
|
||
const pending = await connection.startStream(query); | ||
|
||
while (pending.runTask() !== DuckDBPendingResultState.RESULT_READY) { | ||
// Yield for minimal time | ||
await Promise.resolve(); | ||
} | ||
|
||
if (full) { | ||
await pending.readAll(); | ||
} else { | ||
await pending.readUntil(1); | ||
} | ||
}, | ||
{ | ||
setup, | ||
iterations: 200, | ||
} | ||
); | ||
bench( | ||
`${example.name} - ${ | ||
full | ||
? "startStream runTask fetchChunks loop" | ||
: "startStream runTask single fetchChunk" | ||
}`, | ||
async () => { | ||
const s = startMS(); | ||
const e = s + SELECTION_SIZE; | ||
const startInterval = `INTERVAL ${s} MILLISECONDS`; | ||
const endInterval = `INTERVAL ${e} MILLISECONDS`; | ||
|
||
const query = example.factory(startInterval, endInterval); | ||
|
||
const pending = await connection.startStream(query); | ||
|
||
while (pending.runTask() !== DuckDBPendingResultState.RESULT_READY) { | ||
// Yield for minimal time | ||
await Promise.resolve(); | ||
} | ||
|
||
const result = await pending.getResult(); | ||
|
||
if (full) { | ||
while (true) { | ||
const chunk = await result.fetchChunk(); | ||
// Last chunk will have zero rows. | ||
if (!chunk || chunk.rowCount === 0) { | ||
break; | ||
} | ||
} | ||
} else { | ||
await result.fetchChunk(); | ||
} | ||
}, | ||
{ | ||
setup, | ||
iterations: 200, | ||
} | ||
); | ||
bench( | ||
`${example.name} - ${ | ||
full ? "run fetchChunks loop" : "run single fetchChunk" | ||
}`, | ||
async () => { | ||
const s = startMS(); | ||
const e = s + SELECTION_SIZE; | ||
const startInterval = `INTERVAL ${s} MILLISECONDS`; | ||
const endInterval = `INTERVAL ${e} MILLISECONDS`; | ||
|
||
const query = example.factory(startInterval, endInterval); | ||
const result = await connection.run(query); | ||
|
||
if (full) { | ||
while (true) { | ||
const chunk = await result.fetchChunk(); | ||
// Last chunk will have zero rows. | ||
if (!chunk || chunk.rowCount === 0) { | ||
break; | ||
} | ||
} | ||
} else { | ||
// Just fetch one chunk | ||
|
||
await result.fetchChunk(); | ||
} | ||
}, | ||
{ | ||
setup, | ||
iterations: 200, | ||
} | ||
); | ||
bench( | ||
`${example.name} - ${ | ||
full ? "stream fetchChunks loop" : "stream single fetchChunk" | ||
}`, | ||
async () => { | ||
const s = startMS(); | ||
const e = s + SELECTION_SIZE; | ||
const startInterval = `INTERVAL ${s} MILLISECONDS`; | ||
const endInterval = `INTERVAL ${e} MILLISECONDS`; | ||
|
||
const query = example.factory(startInterval, endInterval); | ||
const result = await connection.stream(query); | ||
|
||
if (full) { | ||
while (true) { | ||
const chunk = await result.fetchChunk(); | ||
// Last chunk will have zero rows. | ||
if (!chunk || chunk.rowCount === 0) { | ||
break; | ||
} | ||
} | ||
} else { | ||
// Just fetch one chunk | ||
await result.fetchChunk(); | ||
} | ||
}, | ||
{ | ||
setup, | ||
iterations: 200, | ||
} | ||
); | ||
}); | ||
} | ||
} |