Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

result streaming #77

Merged
merged 2 commits into from
Dec 23, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 50 additions & 30 deletions api/pkgs/@duckdb/node-api/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,14 @@ prepared.bindInteger(2, 42);
const result = await prepared.run();
```

### Stream Results

Streaming results evaluate lazily when rows are read.

```ts
const result = await connection.stream('from range(10_000)');
```

### Inspect Result

Get column names and types:
Expand All @@ -102,6 +110,38 @@ const columnNames = result.columnNames();
const columnTypes = result.columnTypes();
```

### Result Reader

Run and read all data:
```ts
const reader = await connection.runAndReadAll('from test_all_types()');
const rows = reader.getRows();
// OR: const columns = reader.getColumns();
```

Stream and read up to (at least) some number of rows:
```ts
const reader = await connection.streamAndReadUntil('from range(5000)', 1000);
const rows = reader.getRows();
// rows.length === 2048. (Rows are read in chunks of 2048.)
```

Read rows incrementally:
```ts
const reader = await connection.streamAndRead('from range(5000)');
reader.readUntil(2000);
// reader.currentRowCount === 2048 (Rows are read in chunks of 2048.)
// reader.done === false
reader.readUntil(4000);
// reader.currentRowCount === 4096
// reader.done === false
reader.readUntil(6000);
// reader.currentRowCount === 5000
// reader.done === true
```

### Read chunks

Fetch all chunks:
```ts
const chunks = await result.fetchAllChunks();
Expand All @@ -120,6 +160,16 @@ while (true) {
}
```

For materialized (non-streaming) results, chunks can be read by index:
```ts
const rowCount = result.rowCount;
const chunkCount = result.chunkCount;
for (let i = 0; i < chunkCount; i++) {
const chunk = result.getChunk(i);
// ...
}
```

Read chunk data (column-major):
```ts
// array of columns, each as an array of values
Expand Down Expand Up @@ -148,36 +198,6 @@ for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) {
}
```

### Result Reader

Run and read all data:
```ts
const reader = await connection.runAndReadAll('from test_all_types()');
const rows = reader.getRows();
// OR: const columns = reader.getColumns();
```

Run and read up to (at least) some number of rows:
```ts
const reader = await connection.runAndReadUtil('from range(5000)', 1000);
const rows = reader.getRows();
// rows.length === 2048. (Rows are read in chunks of 2048.)
```

Read rows incrementally:
```ts
const reader = await connection.runAndRead('from range(5000)');
reader.readUntil(2000);
// reader.currentRowCount === 2048 (Rows are read in chunks of 2048.)
// reader.done === false
reader.readUntil(4000);
// reader.currentRowCount === 4096
// reader.done === false
reader.readUntil(6000);
// reader.currentRowCount === 5000
// reader.done === true
```

### Inspect Data Types

```ts
Expand Down
31 changes: 29 additions & 2 deletions api/src/DuckDBConnection.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@ import duckdb from '@duckdb/node-bindings';
import { DuckDBAppender } from './DuckDBAppender';
import { DuckDBExtractedStatements } from './DuckDBExtractedStatements';
import { DuckDBInstance } from './DuckDBInstance';
import { DuckDBMaterializedResult } from './DuckDBMaterializedResult';
import { DuckDBPreparedStatement } from './DuckDBPreparedStatement';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBResultReader } from './DuckDBResultReader';
import { DuckDBPendingResult } from './DuckDBPendingResult';

export class DuckDBConnection {
private readonly connection: duckdb.Connection;
Expand All @@ -22,8 +24,8 @@ export class DuckDBConnection {
public get progress(): duckdb.QueryProgress {
return duckdb.query_progress(this.connection);
}
public async run(sql: string): Promise<DuckDBResult> {
return new DuckDBResult(await duckdb.query(this.connection, sql));
public async run(sql: string): Promise<DuckDBMaterializedResult> {
return new DuckDBMaterializedResult(await duckdb.query(this.connection, sql));
}
public async runAndRead(sql: string): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.run(sql));
Expand All @@ -38,6 +40,31 @@ export class DuckDBConnection {
await reader.readUntil(targetRowCount);
return reader;
}
public async stream(sql: string): Promise<DuckDBResult> {
const prepared = await this.prepare(sql);
return prepared.stream();
}
public async streamAndRead(sql: string): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.stream(sql));
}
public async streamAndReadAll(sql: string): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.stream(sql));
await reader.readAll();
return reader;
}
public async streamAndReadUntil(sql: string, targetRowCount: number): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.stream(sql));
await reader.readUntil(targetRowCount);
return reader;
}
public async start(sql: string): Promise<DuckDBPendingResult> {
const prepared = await this.prepare(sql);
return prepared.start();
}
public async startStream(sql: string): Promise<DuckDBPendingResult> {
const prepared = await this.prepare(sql);
return prepared.startStream();
}
public async prepare(sql: string): Promise<DuckDBPreparedStatement> {
return new DuckDBPreparedStatement(
await duckdb.prepare(this.connection, sql)
Expand Down
18 changes: 18 additions & 0 deletions api/src/DuckDBMaterializedResult.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import duckdb from '@duckdb/node-bindings';
import { DuckDBDataChunk } from './DuckDBDataChunk';
import { DuckDBResult } from './DuckDBResult';

export class DuckDBMaterializedResult extends DuckDBResult {
constructor(result: duckdb.Result) {
super(result);
}
public get rowCount(): number {
return duckdb.row_count(this.result);
}
public get chunkCount(): number {
return duckdb.result_chunk_count(this.result);
}
public getChunk(chunkIndex: number): DuckDBDataChunk {
return new DuckDBDataChunk(duckdb.result_get_chunk(this.result, chunkIndex));
}
}
3 changes: 2 additions & 1 deletion api/src/DuckDBPendingResult.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import duckdb from '@duckdb/node-bindings';
import { createResult } from './createResult';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBResultReader } from './DuckDBResultReader';

Expand Down Expand Up @@ -34,7 +35,7 @@ export class DuckDBPendingResult {
}
}
public async getResult(): Promise<DuckDBResult> {
return new DuckDBResult(await duckdb.execute_pending(this.pending_result));
return createResult(await duckdb.execute_pending(this.pending_result));
}
public async read(): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.getResult());
Expand Down
28 changes: 24 additions & 4 deletions api/src/DuckDBPreparedStatement.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
import duckdb from '@duckdb/node-bindings';
import { DuckDBMaterializedResult } from './DuckDBMaterializedResult';
import { DuckDBPendingResult } from './DuckDBPendingResult';
import { DuckDBResult } from './DuckDBResult';
import { DuckDBResultReader } from './DuckDBResultReader';
Expand Down Expand Up @@ -111,10 +112,8 @@ export class DuckDBPreparedStatement {
// public bindValue(parameterIndex: number, value: Value) {
// duckdb.bind_value(this.prepared_statement, parameterIndex, value);
// }
public async run(): Promise<DuckDBResult> {
return new DuckDBResult(
await duckdb.execute_prepared(this.prepared_statement)
);
public async run(): Promise<DuckDBMaterializedResult> {
return new DuckDBMaterializedResult(await duckdb.execute_prepared(this.prepared_statement));
}
public async runAndRead(): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.run());
Expand All @@ -129,9 +128,30 @@ export class DuckDBPreparedStatement {
await reader.readUntil(targetRowCount);
return reader;
}
public async stream(): Promise<DuckDBResult> {
return new DuckDBResult(await duckdb.execute_prepared_streaming(this.prepared_statement));
}
public async streamAndRead(): Promise<DuckDBResultReader> {
return new DuckDBResultReader(await this.stream());
}
public async streamAndReadAll(): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.stream());
await reader.readAll();
return reader;
}
public async streamAndReadUntil(targetRowCount: number): Promise<DuckDBResultReader> {
const reader = new DuckDBResultReader(await this.stream());
await reader.readUntil(targetRowCount);
return reader;
}
public start(): DuckDBPendingResult {
return new DuckDBPendingResult(
duckdb.pending_prepared(this.prepared_statement)
);
}
public startStream(): DuckDBPendingResult {
return new DuckDBPendingResult(
duckdb.pending_prepared_streaming(this.prepared_statement)
);
}
}
12 changes: 8 additions & 4 deletions api/src/DuckDBResult.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ import { ResultReturnType, StatementType } from './enums';
import { DuckDBValue } from './values';

export class DuckDBResult {
private readonly result: duckdb.Result;
protected readonly result: duckdb.Result;
constructor(result: duckdb.Result) {
this.result = result;
}
Expand Down Expand Up @@ -56,17 +56,21 @@ export class DuckDBResult {
}
return columnTypes;
}
public get isStreaming(): boolean {
return duckdb.result_is_streaming(this.result);
}
public get rowsChanged(): number {
return duckdb.rows_changed(this.result);
}
public async fetchChunk(): Promise<DuckDBDataChunk> {
return new DuckDBDataChunk(await duckdb.fetch_chunk(this.result));
public async fetchChunk(): Promise<DuckDBDataChunk | null> {
const chunk = await duckdb.fetch_chunk(this.result);
return chunk ? new DuckDBDataChunk(chunk) : null;
}
public async fetchAllChunks(): Promise<DuckDBDataChunk[]> {
const chunks: DuckDBDataChunk[] = [];
while (true) {
const chunk = await this.fetchChunk();
if (chunk.rowCount === 0) {
if (!chunk || chunk.rowCount === 0) {
return chunks;
}
chunks.push(chunk);
Expand Down
2 changes: 1 addition & 1 deletion api/src/DuckDBResultReader.ts
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ export class DuckDBResultReader {
)
) {
const chunk = await this.result.fetchChunk();
if (chunk.rowCount > 0) {
if (chunk && chunk.rowCount > 0) {
this.updateChunkSizeRuns(chunk);
this.chunks.push(chunk);
this.currentRowCount_ += chunk.rowCount;
Expand Down
3 changes: 3 additions & 0 deletions api/src/DuckDBVector.ts
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,9 @@ class DuckDBValidity {
public static fromVector(vector: duckdb.Vector, itemCount: number): DuckDBValidity {
const bigintCount = Math.ceil(itemCount / 64);
const bytes = duckdb.vector_get_validity(vector, bigintCount * 8);
if (!bytes) {
return new DuckDBValidity(null, 0);
}
const bigints = new BigUint64Array(bytes.buffer, bytes.byteOffset, bigintCount);
return new DuckDBValidity(bigints, 0);
}
Expand Down
11 changes: 11 additions & 0 deletions api/src/createResult.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import duckdb from '@duckdb/node-bindings';
import { DuckDBMaterializedResult } from './DuckDBMaterializedResult';
import { DuckDBResult } from './DuckDBResult';

export function createResult(result: duckdb.Result) {
if (duckdb.result_is_streaming(result)) {
return new DuckDBResult(result);
} else {
return new DuckDBMaterializedResult(result);
}
}
1 change: 1 addition & 0 deletions api/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ export * from './DuckDBDataChunk';
export * from './DuckDBExtractedStatements';
export * from './DuckDBInstance';
export * from './DuckDBLogicalType';
export * from './DuckDBMaterializedResult';
export * from './DuckDBPendingResult';
export * from './DuckDBPreparedStatement';
export * from './DuckDBResult';
Expand Down
Loading
Loading