Skip to content

Commit

Permalink
read benchmarks
Browse files Browse the repository at this point in the history
  • Loading branch information
Mike-Dax committed Dec 24, 2024
1 parent 2315717 commit 7662f2d
Showing 1 changed file with 337 additions and 0 deletions.
337 changes: 337 additions & 0 deletions api/test/bench/read.bench.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,337 @@
import { bench, describe } from "vitest";
import {
DuckDBConnection,
DuckDBInstance,
DuckDBPendingResultState,
} from "../../src";

let instance: DuckDBInstance;
let connection: DuckDBConnection;

const TOTAL_SIZE = 1_000_000n;
const SELECTION_SIZE = 100_000n;

async function setup() {
instance = await DuckDBInstance.create();
connection = await instance.connect();

const prepared = await connection.prepare(`
CREATE OR REPLACE TABLE test AS
SELECT
TIMESTAMP '2025-01-01' + seq::BIGINT * INTERVAL 1 MILLISECOND AS timestamp,
RANDOM() * 1_000_000 AS value
FROM
range($1) AS seq(seq)
`);

prepared.bindBigInt(1, TOTAL_SIZE);

await prepared.run();
}

const examples: {
name: string;
factory: (start: string, end: string) => string;
}[] = [
{
name: "Row Fetching",
factory: (start, end) =>
`SELECT *
FROM test
WHERE timestamp BETWEEN TIMESTAMP '2025-01-01' + ${start}
AND TIMESTAMP '2025-01-01' + ${end};`,
},
{
name: "Overall Aggregates",
factory: (start, end) =>
`SELECT mean("value"), min("value"), max("value")
FROM test
WHERE timestamp BETWEEN TIMESTAMP '2025-01-01' + ${start}
AND TIMESTAMP '2025-01-01' + ${end};`,
},
{
name: "Rolling Aggregates",
factory: (start, end) =>
`SELECT mean("value") OVER previous_second,
min("value") OVER previous_second,
max("value") OVER previous_second
FROM test
WHERE timestamp BETWEEN TIMESTAMP '2025-01-01' + ${start}
AND TIMESTAMP '2025-01-01' + ${end}
WINDOW previous_second AS (
ORDER BY "timestamp" ASC
RANGE BETWEEN INTERVAL 1_000 MILLISECONDS PRECEDING
AND INTERVAL 0 MILLISECONDS FOLLOWING);`,
},
];

/**
* Randomly generate a BigInt amount of milliseconds to use as the query start point, allowing for
* SELECTION_SIZE worth of data
*/
function startMS() {
return BigInt(
Math.floor(Math.random() * Number(TOTAL_SIZE - SELECTION_SIZE))
);
}

for (const full of [false, true]) {
for (const example of examples) {
describe(`${example.name} - ${
full ? "Full Result" : "Time to First Row"
}`, () => {
bench(
`${example.name} - ${
full ? "runAndReadAll()" : "runAndReadUntil(q, 1)"
}`,
async () => {
const s = startMS();
const e = s + SELECTION_SIZE;
const startInterval = `INTERVAL ${s} MILLISECONDS`;
const endInterval = `INTERVAL ${e} MILLISECONDS`;

const query = example.factory(startInterval, endInterval);

if (full) {
await connection.runAndReadAll(query);
} else {
await connection.runAndReadUntil(query, 1);
}
},
{
setup,
iterations: 200,
}
);
bench(
`${example.name} - ${
full
? "start runTask pending.readAll()"
: "start runTask pending.readUntil(1)"
}`,
async () => {
const s = startMS();
const e = s + SELECTION_SIZE;
const startInterval = `INTERVAL ${s} MILLISECONDS`;
const endInterval = `INTERVAL ${e} MILLISECONDS`;

const query = example.factory(startInterval, endInterval);

const pending = await connection.start(query);

while (pending.runTask() !== DuckDBPendingResultState.RESULT_READY) {
// Yield for minimal time
await Promise.resolve();
}

if (full) {
await pending.readAll();
} else {
await pending.readUntil(1);
}
},
{
setup,
iterations: 200,
}
);
bench(
`${example.name} - ${
full
? "start runTask fetchChunks loop"
: "start runTask single fetchChunk"
}`,
async () => {
const s = startMS();
const e = s + SELECTION_SIZE;
const startInterval = `INTERVAL ${s} MILLISECONDS`;
const endInterval = `INTERVAL ${e} MILLISECONDS`;

const query = example.factory(startInterval, endInterval);

const pending = await connection.start(query);

while (pending.runTask() !== DuckDBPendingResultState.RESULT_READY) {
// Yield for minimal time
await Promise.resolve();
}

const result = await pending.getResult();

if (full) {
while (true) {
const chunk = await result.fetchChunk();
// Last chunk will have zero rows.
if (!chunk || chunk.rowCount === 0) {
break;
}
}
} else {
await result.fetchChunk();
}
},
{
setup,
iterations: 200,
}
);

bench(
`${example.name} - ${
full ? "streamAndReadAll()" : "streamAndReadUntil(q, 1)"
}`,
async () => {
const s = startMS();
const e = s + SELECTION_SIZE;
const startInterval = `INTERVAL ${s} MILLISECONDS`;
const endInterval = `INTERVAL ${e} MILLISECONDS`;

const query = example.factory(startInterval, endInterval);
if (full) {
await connection.streamAndReadAll(query);
} else {
await connection.streamAndReadUntil(query, 1);
}
},
{
setup,
iterations: 200,
}
);
bench(
`${example.name} - ${
full
? "startStream runTask pending.readAll()"
: "startStream runTask pending.readUntil(1)"
}`,
async () => {
const s = startMS();
const e = s + SELECTION_SIZE;
const startInterval = `INTERVAL ${s} MILLISECONDS`;
const endInterval = `INTERVAL ${e} MILLISECONDS`;

const query = example.factory(startInterval, endInterval);

const pending = await connection.startStream(query);

while (pending.runTask() !== DuckDBPendingResultState.RESULT_READY) {
// Yield for minimal time
await Promise.resolve();
}

if (full) {
await pending.readAll();
} else {
await pending.readUntil(1);
}
},
{
setup,
iterations: 200,
}
);
bench(
`${example.name} - ${
full
? "startStream runTask fetchChunks loop"
: "startStream runTask single fetchChunk"
}`,
async () => {
const s = startMS();
const e = s + SELECTION_SIZE;
const startInterval = `INTERVAL ${s} MILLISECONDS`;
const endInterval = `INTERVAL ${e} MILLISECONDS`;

const query = example.factory(startInterval, endInterval);

const pending = await connection.startStream(query);

while (pending.runTask() !== DuckDBPendingResultState.RESULT_READY) {
// Yield for minimal time
await Promise.resolve();
}

const result = await pending.getResult();

if (full) {
while (true) {
const chunk = await result.fetchChunk();
// Last chunk will have zero rows.
if (!chunk || chunk.rowCount === 0) {
break;
}
}
} else {
await result.fetchChunk();
}
},
{
setup,
iterations: 200,
}
);
bench(
`${example.name} - ${
full ? "run fetchChunks loop" : "run single fetchChunk"
}`,
async () => {
const s = startMS();
const e = s + SELECTION_SIZE;
const startInterval = `INTERVAL ${s} MILLISECONDS`;
const endInterval = `INTERVAL ${e} MILLISECONDS`;

const query = example.factory(startInterval, endInterval);
const result = await connection.run(query);

if (full) {
while (true) {
const chunk = await result.fetchChunk();
// Last chunk will have zero rows.
if (!chunk || chunk.rowCount === 0) {
break;
}
}
} else {
// Just fetch one chunk
await result.fetchChunk();
}
},
{
setup,
iterations: 200,
}
);
bench(
`${example.name} - ${
full ? "stream fetchChunks loop" : "stream single fetchChunk"
}`,
async () => {
const s = startMS();
const e = s + SELECTION_SIZE;
const startInterval = `INTERVAL ${s} MILLISECONDS`;
const endInterval = `INTERVAL ${e} MILLISECONDS`;

const query = example.factory(startInterval, endInterval);
const result = await connection.stream(query);

if (full) {
while (true) {
const chunk = await result.fetchChunk();
// Last chunk will have zero rows.
if (!chunk || chunk.rowCount === 0) {
break;
}
}
} else {
// Just fetch one chunk
await result.fetchChunk();
}
},
{
setup,
iterations: 200,
}
);
});
}
}

0 comments on commit 7662f2d

Please sign in to comment.