From 09d2dddf839fe753552bda3d10012f0a7fb343f2 Mon Sep 17 00:00:00 2001 From: Jeff Raymakers Date: Sun, 22 Dec 2024 16:15:06 -0800 Subject: [PATCH 1/2] result streaming --- api/src/DuckDBResult.ts | 7 +- api/src/DuckDBResultReader.ts | 2 +- api/src/DuckDBVector.ts | 3 + api/test/api.test.ts | 583 +++++++++--------- api/test/bench/util/runSql.ts | 2 +- .../pkgs/@duckdb/node-bindings/duckdb.d.ts | 20 +- bindings/src/duckdb_node_bindings.cpp | 115 +++- bindings/test/appender.test.ts | 29 +- bindings/test/config.test.ts | 6 + bindings/test/extracted_statements.test.ts | 8 + bindings/test/pending.test.ts | 22 + bindings/test/prepared_statements.test.ts | 31 + bindings/test/query.test.ts | 12 +- bindings/test/utils/ExpectedResult.ts | 3 + bindings/test/utils/ExpectedVector.ts | 10 +- bindings/test/utils/expectResult.ts | 18 +- bindings/test/utils/expectValidity.ts | 2 +- bindings/test/utils/expectVector.ts | 15 +- bindings/test/utils/expectedVectors.ts | 10 +- bindings/test/utils/getValue.ts | 4 +- bindings/test/utils/isValid.ts | 5 +- 21 files changed, 580 insertions(+), 327 deletions(-) diff --git a/api/src/DuckDBResult.ts b/api/src/DuckDBResult.ts index 86f65034..19f128be 100644 --- a/api/src/DuckDBResult.ts +++ b/api/src/DuckDBResult.ts @@ -59,14 +59,15 @@ export class DuckDBResult { public get rowsChanged(): number { return duckdb.rows_changed(this.result); } - public async fetchChunk(): Promise { - return new DuckDBDataChunk(await duckdb.fetch_chunk(this.result)); + public async fetchChunk(): Promise { + const chunk = await duckdb.fetch_chunk(this.result); + return chunk ? new DuckDBDataChunk(chunk) : null; } public async fetchAllChunks(): Promise { const chunks: DuckDBDataChunk[] = []; while (true) { const chunk = await this.fetchChunk(); - if (chunk.rowCount === 0) { + if (!chunk || chunk.rowCount === 0) { return chunks; } chunks.push(chunk); diff --git a/api/src/DuckDBResultReader.ts b/api/src/DuckDBResultReader.ts index 20ec5c29..a4869758 100644 --- a/api/src/DuckDBResultReader.ts +++ b/api/src/DuckDBResultReader.ts @@ -124,7 +124,7 @@ export class DuckDBResultReader { ) ) { const chunk = await this.result.fetchChunk(); - if (chunk.rowCount > 0) { + if (chunk && chunk.rowCount > 0) { this.updateChunkSizeRuns(chunk); this.chunks.push(chunk); this.currentRowCount_ += chunk.rowCount; diff --git a/api/src/DuckDBVector.ts b/api/src/DuckDBVector.ts index 13b18aaa..552d3d8f 100644 --- a/api/src/DuckDBVector.ts +++ b/api/src/DuckDBVector.ts @@ -244,6 +244,9 @@ class DuckDBValidity { public static fromVector(vector: duckdb.Vector, itemCount: number): DuckDBValidity { const bigintCount = Math.ceil(itemCount / 64); const bytes = duckdb.vector_get_validity(vector, bigintCount * 8); + if (!bytes) { + return new DuckDBValidity(null, 0); + } const bigints = new BigUint64Array(bytes.buffer, bytes.byteOffset, bigintCount); return new DuckDBValidity(bigints, 0); } diff --git a/api/test/api.test.ts b/api/test/api.test.ts index 3b40e467..eb201934 100644 --- a/api/test/api.test.ts +++ b/api/test/api.test.ts @@ -336,9 +336,12 @@ describe('api', () => { const result = await connection.run('select 42 as num'); assertColumns(result, [{ name: 'num', type: DuckDBIntegerType.instance }]); const chunk = await result.fetchChunk(); - assert.strictEqual(chunk.columnCount, 1); - assert.strictEqual(chunk.rowCount, 1); - assertValues(chunk, 0, DuckDBIntegerVector, [42]); + assert.isDefined(chunk); + if (chunk) { + assert.strictEqual(chunk.columnCount, 1); + assert.strictEqual(chunk.rowCount, 1); + assertValues(chunk, 0, DuckDBIntegerVector, [42]); + } }); test('should support running prepared statements', async () => { await withConnection(async (connection) => { @@ -360,12 +363,15 @@ describe('api', () => { { name: 'd', type: DuckDBIntegerType.instance }, ]); const chunk = await result.fetchChunk(); - assert.strictEqual(chunk.columnCount, 4); - assert.strictEqual(chunk.rowCount, 1); - assertValues(chunk, 0, DuckDBIntegerVector, [10]); - assertValues(chunk, 1, DuckDBVarCharVector, ['abc']); - assertValues(chunk, 2, DuckDBBooleanVector, [true]); - assertValues(chunk, 3, DuckDBIntegerVector, [null]); + assert.isDefined(chunk); + if (chunk) { + assert.strictEqual(chunk.columnCount, 4); + assert.strictEqual(chunk.rowCount, 1); + assertValues(chunk, 0, DuckDBIntegerVector, [10]); + assertValues(chunk, 1, DuckDBVarCharVector, ['abc']); + assertValues(chunk, 2, DuckDBBooleanVector, [true]); + assertValues(chunk, 3, DuckDBIntegerVector, [null]); + } }); }); test('should support starting prepared statements and running them incrementally', async () => { @@ -386,9 +392,12 @@ describe('api', () => { { name: 'int', type: DuckDBIntegerType.instance }, ]); const chunk = await result.fetchChunk(); - assert.strictEqual(chunk.columnCount, 1); - assert.strictEqual(chunk.rowCount, 3); - assertValues(chunk, 0, DuckDBIntegerVector, [DuckDBIntegerType.Min, DuckDBIntegerType.Max, null]); + assert.isDefined(chunk); + if (chunk) { + assert.strictEqual(chunk.columnCount, 1); + assert.strictEqual(chunk.rowCount, 3); + assertValues(chunk, 0, DuckDBIntegerVector, [DuckDBIntegerType.Min, DuckDBIntegerType.Max, null]); + } }); }); test('should support streaming results from prepared statements', async () => { @@ -402,7 +411,7 @@ describe('api', () => { const chunks: DuckDBDataChunk[] = []; let currentChunk: DuckDBDataChunk | null = null; currentChunk = await result.fetchChunk(); - while (currentChunk.rowCount > 0) { + while (currentChunk && currentChunk.rowCount > 0) { chunks.push(currentChunk); currentChunk = await result.fetchChunk(); } @@ -497,280 +506,283 @@ describe('api', () => { ]); const chunk = await result.fetchChunk(); - assert.strictEqual(chunk.columnCount, 54); - assert.strictEqual(chunk.rowCount, 3); - - assertValues(chunk, 0, DuckDBBooleanVector, [false, true, null]); - assertValues(chunk, 1, DuckDBTinyIntVector, [DuckDBTinyIntType.Min, DuckDBTinyIntType.Max, null]); - assertValues(chunk, 2, DuckDBSmallIntVector, [DuckDBSmallIntType.Min, DuckDBSmallIntType.Max, null]); - assertValues(chunk, 3, DuckDBIntegerVector, [DuckDBIntegerType.Min, DuckDBIntegerType.Max, null]); - assertValues(chunk, 4, DuckDBBigIntVector, [DuckDBBigIntType.Min, DuckDBBigIntType.Max, null]); - assertValues(chunk, 5, DuckDBHugeIntVector, [DuckDBHugeIntType.Min, DuckDBHugeIntType.Max, null]); - assertValues(chunk, 6, DuckDBUHugeIntVector, [DuckDBUHugeIntType.Min, DuckDBUHugeIntType.Max, null]); - assertValues(chunk, 7, DuckDBUTinyIntVector, [DuckDBUTinyIntType.Min, DuckDBUTinyIntType.Max, null]); - assertValues(chunk, 8, DuckDBUSmallIntVector, [DuckDBUSmallIntType.Min, DuckDBUSmallIntType.Max, null]); - assertValues(chunk, 9, DuckDBUIntegerVector, [DuckDBUIntegerType.Min, DuckDBUIntegerType.Max, null]); - assertValues(chunk, 10, DuckDBUBigIntVector, [DuckDBUBigIntType.Min, DuckDBUBigIntType.Max, null]); - assertValues(chunk, 11, DuckDBVarIntVector, [DuckDBVarIntType.Min, DuckDBVarIntType.Max, null]); - assertValues(chunk, 12, DuckDBDateVector, [DuckDBDateValue.Min, DuckDBDateValue.Max, null]); - assertValues(chunk, 13, DuckDBTimeVector, [DuckDBTimeValue.Min, DuckDBTimeValue.Max, null]); - assertValues(chunk, 14, DuckDBTimestampVector, - [DuckDBTimestampValue.Min, DuckDBTimestampValue.Max, null]); - assertValues(chunk, 15, DuckDBTimestampSecondsVector, - [DuckDBTimestampSecondsValue.Min, DuckDBTimestampSecondsValue.Max, null]); - assertValues(chunk, 16, DuckDBTimestampMillisecondsVector, - [DuckDBTimestampMillisecondsValue.Min, DuckDBTimestampMillisecondsValue.Max, null]); - assertValues(chunk, 17, DuckDBTimestampNanosecondsVector, - [DuckDBTimestampNanosecondsValue.Min, DuckDBTimestampNanosecondsValue.Max, null]); - assertValues(chunk, 18, DuckDBTimeTZVector, [DuckDBTimeTZValue.Min, DuckDBTimeTZValue.Max, null]); - assertValues(chunk, 19, DuckDBTimestampTZVector, - [DuckDBTimestampTZValue.Min, DuckDBTimestampTZValue.Max, null]); - assertValues(chunk, 20, DuckDBFloatVector, [DuckDBFloatType.Min, DuckDBFloatType.Max, null]); - assertValues(chunk, 21, DuckDBDoubleVector, [DuckDBDoubleType.Min, DuckDBDoubleType.Max, null]); - assertValues(chunk, 22, DuckDBDecimal2Vector, [ - decimalValue(-9999n, 4, 1), - decimalValue( 9999n, 4, 1), - null, - ]); - assertValues(chunk, 23, DuckDBDecimal4Vector, [ - decimalValue(-999999999n, 9, 4), - decimalValue( 999999999n, 9, 4), - null, - ]); - assertValues(chunk, 24, DuckDBDecimal8Vector, [ - decimalValue(-BI_18_9s, 18, 6), - decimalValue( BI_18_9s, 18, 6), - null, - ]); - assertValues(chunk, 25, DuckDBDecimal16Vector, [ - decimalValue(-BI_38_9s, 38, 10), - decimalValue( BI_38_9s, 38, 10), - null, - ]); - assertValues(chunk, 26, DuckDBUUIDVector, [DuckDBUUIDValue.Min, DuckDBUUIDValue.Max, null]); - assertValues(chunk, 27, DuckDBIntervalVector, [ - intervalValue(0, 0, 0n), - intervalValue(999, 999, 999999999n), - null, - ]); - assertValues(chunk, 28, DuckDBVarCharVector, ['🦆🦆🦆🦆🦆🦆', 'goo\0se', null]); - assertValues(chunk, 29, DuckDBBlobVector, [ - DuckDBBlobValue.fromString('thisisalongblob\x00withnullbytes'), - DuckDBBlobValue.fromString('\x00\x00\x00a'), - null, - ]); - assertValues(chunk, 30, DuckDBBitVector, [ - bitValue('0010001001011100010101011010111'), - bitValue('10101'), - null, - ]); - assertValues(chunk, 31, DuckDBEnum1Vector, [ - smallEnumValues[0], - smallEnumValues[smallEnumValues.length - 1], - null, - ]); - assertValues(chunk, 32, DuckDBEnum2Vector, [ - mediumEnumValues[0], - mediumEnumValues[mediumEnumValues.length - 1], - null, - ]); - assertValues(chunk, 33, DuckDBEnum4Vector, [ - largeEnumValues[0], - largeEnumValues[largeEnumValues.length - 1], - null, - ]); - // int_array - assertValues(chunk, 34, DuckDBListVector, [ - listValue([]), - listValue([42, 999, null, null, -42]), - null, - ]); - // double_array - assertValues(chunk, 35, DuckDBListVector, [ - listValue([]), - listValue([42.0, NaN, Infinity, -Infinity, null, -42.0]), - null, - ]); - // date_array - assertValues(chunk, 36, DuckDBListVector, [ - listValue([]), - listValue([dateValue(0), DuckDBDateValue.PosInf, DuckDBDateValue.NegInf, null, dateValue(19124)]), - null, - ]); - // timestamp_array - assertValues(chunk, 37, DuckDBListVector, [ - listValue([]), - listValue([ - DuckDBTimestampValue.Epoch, - DuckDBTimestampValue.PosInf, - DuckDBTimestampValue.NegInf, + assert.isDefined(chunk); + if (chunk) { + assert.strictEqual(chunk.columnCount, 54); + assert.strictEqual(chunk.rowCount, 3); + + assertValues(chunk, 0, DuckDBBooleanVector, [false, true, null]); + assertValues(chunk, 1, DuckDBTinyIntVector, [DuckDBTinyIntType.Min, DuckDBTinyIntType.Max, null]); + assertValues(chunk, 2, DuckDBSmallIntVector, [DuckDBSmallIntType.Min, DuckDBSmallIntType.Max, null]); + assertValues(chunk, 3, DuckDBIntegerVector, [DuckDBIntegerType.Min, DuckDBIntegerType.Max, null]); + assertValues(chunk, 4, DuckDBBigIntVector, [DuckDBBigIntType.Min, DuckDBBigIntType.Max, null]); + assertValues(chunk, 5, DuckDBHugeIntVector, [DuckDBHugeIntType.Min, DuckDBHugeIntType.Max, null]); + assertValues(chunk, 6, DuckDBUHugeIntVector, [DuckDBUHugeIntType.Min, DuckDBUHugeIntType.Max, null]); + assertValues(chunk, 7, DuckDBUTinyIntVector, [DuckDBUTinyIntType.Min, DuckDBUTinyIntType.Max, null]); + assertValues(chunk, 8, DuckDBUSmallIntVector, [DuckDBUSmallIntType.Min, DuckDBUSmallIntType.Max, null]); + assertValues(chunk, 9, DuckDBUIntegerVector, [DuckDBUIntegerType.Min, DuckDBUIntegerType.Max, null]); + assertValues(chunk, 10, DuckDBUBigIntVector, [DuckDBUBigIntType.Min, DuckDBUBigIntType.Max, null]); + assertValues(chunk, 11, DuckDBVarIntVector, [DuckDBVarIntType.Min, DuckDBVarIntType.Max, null]); + assertValues(chunk, 12, DuckDBDateVector, [DuckDBDateValue.Min, DuckDBDateValue.Max, null]); + assertValues(chunk, 13, DuckDBTimeVector, [DuckDBTimeValue.Min, DuckDBTimeValue.Max, null]); + assertValues(chunk, 14, DuckDBTimestampVector, + [DuckDBTimestampValue.Min, DuckDBTimestampValue.Max, null]); + assertValues(chunk, 15, DuckDBTimestampSecondsVector, + [DuckDBTimestampSecondsValue.Min, DuckDBTimestampSecondsValue.Max, null]); + assertValues(chunk, 16, DuckDBTimestampMillisecondsVector, + [DuckDBTimestampMillisecondsValue.Min, DuckDBTimestampMillisecondsValue.Max, null]); + assertValues(chunk, 17, DuckDBTimestampNanosecondsVector, + [DuckDBTimestampNanosecondsValue.Min, DuckDBTimestampNanosecondsValue.Max, null]); + assertValues(chunk, 18, DuckDBTimeTZVector, [DuckDBTimeTZValue.Min, DuckDBTimeTZValue.Max, null]); + assertValues(chunk, 19, DuckDBTimestampTZVector, + [DuckDBTimestampTZValue.Min, DuckDBTimestampTZValue.Max, null]); + assertValues(chunk, 20, DuckDBFloatVector, [DuckDBFloatType.Min, DuckDBFloatType.Max, null]); + assertValues(chunk, 21, DuckDBDoubleVector, [DuckDBDoubleType.Min, DuckDBDoubleType.Max, null]); + assertValues(chunk, 22, DuckDBDecimal2Vector, [ + decimalValue(-9999n, 4, 1), + decimalValue( 9999n, 4, 1), null, - // 1652372625 is 2022-05-12 16:23:45 - timestampValue(1652372625n * 1000n * 1000n), - ]), - null, - ]); - // timestamptz_array - assertValues(chunk, 38, DuckDBListVector, [ - listValue([]), - listValue([ - DuckDBTimestampTZValue.Epoch, - DuckDBTimestampTZValue.PosInf, - DuckDBTimestampTZValue.NegInf, + ]); + assertValues(chunk, 23, DuckDBDecimal4Vector, [ + decimalValue(-999999999n, 9, 4), + decimalValue( 999999999n, 9, 4), null, - // 1652397825 = 1652372625 + 25200, 25200 = 7 * 60 * 60 = 7 hours in seconds - // This 7 hour difference is hard coded into test_all_types (value is 2022-05-12 16:23:45-07) - timestampTZValue(1652397825n * 1000n * 1000n), - ]), - null, - ]); - // varchar_array - assertValues(chunk, 39, DuckDBListVector, [ - listValue([]), - // Note that the string 'goose' in varchar_array does NOT have an embedded null character. - listValue(['🦆🦆🦆🦆🦆🦆', 'goose', null, '']), - null, - ]); - // nested_int_array - assertValues(chunk, 40, DuckDBListVector, [ - listValue([]), - listValue([ + ]); + assertValues(chunk, 24, DuckDBDecimal8Vector, [ + decimalValue(-BI_18_9s, 18, 6), + decimalValue( BI_18_9s, 18, 6), + null, + ]); + assertValues(chunk, 25, DuckDBDecimal16Vector, [ + decimalValue(-BI_38_9s, 38, 10), + decimalValue( BI_38_9s, 38, 10), + null, + ]); + assertValues(chunk, 26, DuckDBUUIDVector, [DuckDBUUIDValue.Min, DuckDBUUIDValue.Max, null]); + assertValues(chunk, 27, DuckDBIntervalVector, [ + intervalValue(0, 0, 0n), + intervalValue(999, 999, 999999999n), + null, + ]); + assertValues(chunk, 28, DuckDBVarCharVector, ['🦆🦆🦆🦆🦆🦆', 'goo\0se', null]); + assertValues(chunk, 29, DuckDBBlobVector, [ + DuckDBBlobValue.fromString('thisisalongblob\x00withnullbytes'), + DuckDBBlobValue.fromString('\x00\x00\x00a'), + null, + ]); + assertValues(chunk, 30, DuckDBBitVector, [ + bitValue('0010001001011100010101011010111'), + bitValue('10101'), + null, + ]); + assertValues(chunk, 31, DuckDBEnum1Vector, [ + smallEnumValues[0], + smallEnumValues[smallEnumValues.length - 1], + null, + ]); + assertValues(chunk, 32, DuckDBEnum2Vector, [ + mediumEnumValues[0], + mediumEnumValues[mediumEnumValues.length - 1], + null, + ]); + assertValues(chunk, 33, DuckDBEnum4Vector, [ + largeEnumValues[0], + largeEnumValues[largeEnumValues.length - 1], + null, + ]); + // int_array + assertValues(chunk, 34, DuckDBListVector, [ listValue([]), listValue([42, 999, null, null, -42]), null, + ]); + // double_array + assertValues(chunk, 35, DuckDBListVector, [ listValue([]), - listValue([42, 999, null, null, -42]), - ]), - null, - ]); - assertValues(chunk, 41, DuckDBStructVector, [ - structValue({ 'a': null, 'b': null }), - structValue({ 'a': 42, 'b': '🦆🦆🦆🦆🦆🦆' }), - null, - ]); - // struct_of_arrays - assertValues(chunk, 42, DuckDBStructVector, [ - structValue({ 'a': null, 'b': null }), - structValue({ - 'a': listValue([42, 999, null, null, -42]), - 'b': listValue(['🦆🦆🦆🦆🦆🦆', 'goose', null, '']), - }), - null, - ]); - // array_of_structs - assertValues(chunk, 43, DuckDBListVector, [ - listValue([]), - listValue([ - structValue({ 'a': null, 'b': null }), - structValue({ 'a': 42, 'b': '🦆🦆🦆🦆🦆🦆' }), + listValue([42.0, NaN, Infinity, -Infinity, null, -42.0]), null, - ]), - null, - ]); - assertValues(chunk, 44, DuckDBMapVector, [ - mapValue([]), - mapValue([{ key: 'key1', value: '🦆🦆🦆🦆🦆🦆' }, { key: 'key2', value: 'goose' }]), - null, - ]); - assertValues(chunk, 45, DuckDBUnionVector, [ - unionValue('name', 'Frank'), - unionValue('age', 5), - null, - ]); - // fixed_int_array - assertValues(chunk, 46, DuckDBArrayVector, [ - arrayValue([null, 2, 3]), - arrayValue([4, 5, 6]), - null, - ]); - // fixed_varchar_array - assertValues(chunk, 47, DuckDBArrayVector, [ - arrayValue(['a', null, 'c']), - arrayValue(['d', 'e', 'f']), - null, - ]); - // fixed_nested_int_array - assertValues(chunk, 48, DuckDBArrayVector, [ - arrayValue([ - arrayValue([null, 2, 3]), + ]); + // date_array + assertValues(chunk, 36, DuckDBListVector, [ + listValue([]), + listValue([dateValue(0), DuckDBDateValue.PosInf, DuckDBDateValue.NegInf, null, dateValue(19124)]), null, - arrayValue([null, 2, 3]), - ]), - arrayValue([ - arrayValue([4, 5, 6]), - arrayValue([null, 2, 3]), - arrayValue([4, 5, 6]), - ]), - null, - ]); - // fixed_nested_varchar_array - assertValues(chunk, 49, DuckDBArrayVector, [ - arrayValue([ - arrayValue(['a', null, 'c']), + ]); + // timestamp_array + assertValues(chunk, 37, DuckDBListVector, [ + listValue([]), + listValue([ + DuckDBTimestampValue.Epoch, + DuckDBTimestampValue.PosInf, + DuckDBTimestampValue.NegInf, + null, + // 1652372625 is 2022-05-12 16:23:45 + timestampValue(1652372625n * 1000n * 1000n), + ]), null, - arrayValue(['a', null, 'c']), - ]), - arrayValue([ - arrayValue(['d', 'e', 'f']), - arrayValue(['a', null, 'c']), - arrayValue(['d', 'e', 'f']), - ]), - null, - ]); - // fixed_struct_array - assertValues(chunk, 50, DuckDBArrayVector, [ - arrayValue([ - structValue({ 'a': null, 'b': null }), - structValue({ 'a': 42, 'b': '🦆🦆🦆🦆🦆🦆' }), + ]); + // timestamptz_array + assertValues(chunk, 38, DuckDBListVector, [ + listValue([]), + listValue([ + DuckDBTimestampTZValue.Epoch, + DuckDBTimestampTZValue.PosInf, + DuckDBTimestampTZValue.NegInf, + null, + // 1652397825 = 1652372625 + 25200, 25200 = 7 * 60 * 60 = 7 hours in seconds + // This 7 hour difference is hard coded into test_all_types (value is 2022-05-12 16:23:45-07) + timestampTZValue(1652397825n * 1000n * 1000n), + ]), + null, + ]); + // varchar_array + assertValues(chunk, 39, DuckDBListVector, [ + listValue([]), + // Note that the string 'goose' in varchar_array does NOT have an embedded null character. + listValue(['🦆🦆🦆🦆🦆🦆', 'goose', null, '']), + null, + ]); + // nested_int_array + assertValues(chunk, 40, DuckDBListVector, [ + listValue([]), + listValue([ + listValue([]), + listValue([42, 999, null, null, -42]), + null, + listValue([]), + listValue([42, 999, null, null, -42]), + ]), + null, + ]); + assertValues(chunk, 41, DuckDBStructVector, [ structValue({ 'a': null, 'b': null }), - ]), - arrayValue([ structValue({ 'a': 42, 'b': '🦆🦆🦆🦆🦆🦆' }), + null, + ]); + // struct_of_arrays + assertValues(chunk, 42, DuckDBStructVector, [ structValue({ 'a': null, 'b': null }), - structValue({ 'a': 42, 'b': '🦆🦆🦆🦆🦆🦆' }), - ]), - null, - ]); - // struct_of_fixed_array - assertValues(chunk, 51, DuckDBStructVector, [ - structValue({ - 'a': arrayValue([null, 2, 3]), - 'b': arrayValue(['a', null, 'c']), - }), - structValue({ - 'a': arrayValue([4, 5, 6]), - 'b': arrayValue(['d', 'e', 'f']), - }), - null, - ]); - // fixed_array_of_int_list - assertValues(chunk, 52, DuckDBArrayVector, [ - arrayValue([ - listValue([]), - listValue([42, 999, null, null, -42]), - listValue([]), - ]), - arrayValue([ - listValue([42, 999, null, null, -42]), + structValue({ + 'a': listValue([42, 999, null, null, -42]), + 'b': listValue(['🦆🦆🦆🦆🦆🦆', 'goose', null, '']), + }), + null, + ]); + // array_of_structs + assertValues(chunk, 43, DuckDBListVector, [ listValue([]), - listValue([42, 999, null, null, -42]), - ]), - null, - ]); - // list_of_fixed_int_array - assertValues(chunk, 53, DuckDBListVector, [ - listValue([ - arrayValue([null, 2, 3]), - arrayValue([4, 5, 6]), - arrayValue([null, 2, 3]), - ]), - listValue([ - arrayValue([4, 5, 6]), + listValue([ + structValue({ 'a': null, 'b': null }), + structValue({ 'a': 42, 'b': '🦆🦆🦆🦆🦆🦆' }), + null, + ]), + null, + ]); + assertValues(chunk, 44, DuckDBMapVector, [ + mapValue([]), + mapValue([{ key: 'key1', value: '🦆🦆🦆🦆🦆🦆' }, { key: 'key2', value: 'goose' }]), + null, + ]); + assertValues(chunk, 45, DuckDBUnionVector, [ + unionValue('name', 'Frank'), + unionValue('age', 5), + null, + ]); + // fixed_int_array + assertValues(chunk, 46, DuckDBArrayVector, [ arrayValue([null, 2, 3]), arrayValue([4, 5, 6]), - ]), - null, - ]); + null, + ]); + // fixed_varchar_array + assertValues(chunk, 47, DuckDBArrayVector, [ + arrayValue(['a', null, 'c']), + arrayValue(['d', 'e', 'f']), + null, + ]); + // fixed_nested_int_array + assertValues(chunk, 48, DuckDBArrayVector, [ + arrayValue([ + arrayValue([null, 2, 3]), + null, + arrayValue([null, 2, 3]), + ]), + arrayValue([ + arrayValue([4, 5, 6]), + arrayValue([null, 2, 3]), + arrayValue([4, 5, 6]), + ]), + null, + ]); + // fixed_nested_varchar_array + assertValues(chunk, 49, DuckDBArrayVector, [ + arrayValue([ + arrayValue(['a', null, 'c']), + null, + arrayValue(['a', null, 'c']), + ]), + arrayValue([ + arrayValue(['d', 'e', 'f']), + arrayValue(['a', null, 'c']), + arrayValue(['d', 'e', 'f']), + ]), + null, + ]); + // fixed_struct_array + assertValues(chunk, 50, DuckDBArrayVector, [ + arrayValue([ + structValue({ 'a': null, 'b': null }), + structValue({ 'a': 42, 'b': '🦆🦆🦆🦆🦆🦆' }), + structValue({ 'a': null, 'b': null }), + ]), + arrayValue([ + structValue({ 'a': 42, 'b': '🦆🦆🦆🦆🦆🦆' }), + structValue({ 'a': null, 'b': null }), + structValue({ 'a': 42, 'b': '🦆🦆🦆🦆🦆🦆' }), + ]), + null, + ]); + // struct_of_fixed_array + assertValues(chunk, 51, DuckDBStructVector, [ + structValue({ + 'a': arrayValue([null, 2, 3]), + 'b': arrayValue(['a', null, 'c']), + }), + structValue({ + 'a': arrayValue([4, 5, 6]), + 'b': arrayValue(['d', 'e', 'f']), + }), + null, + ]); + // fixed_array_of_int_list + assertValues(chunk, 52, DuckDBArrayVector, [ + arrayValue([ + listValue([]), + listValue([42, 999, null, null, -42]), + listValue([]), + ]), + arrayValue([ + listValue([42, 999, null, null, -42]), + listValue([]), + listValue([42, 999, null, null, -42]), + ]), + null, + ]); + // list_of_fixed_int_array + assertValues(chunk, 53, DuckDBListVector, [ + listValue([ + arrayValue([null, 2, 3]), + arrayValue([4, 5, 6]), + arrayValue([null, 2, 3]), + ]), + listValue([ + arrayValue([4, 5, 6]), + arrayValue([null, 2, 3]), + arrayValue([4, 5, 6]), + ]), + null, + ]); + } }); }); test('values toString', () => { @@ -979,9 +991,12 @@ describe('api', () => { const result = await connection.run(`select current_setting('duckdb_api') as duckdb_api`); assertColumns(result, [{ name: 'duckdb_api', type: DuckDBVarCharType.instance }]); const chunk = await result.fetchChunk(); - assert.strictEqual(chunk.columnCount, 1); - assert.strictEqual(chunk.rowCount, 1); - assertValues(chunk, 0, DuckDBVarCharVector, ['node-neo-api']); + assert.isDefined(chunk); + if (chunk) { + assert.strictEqual(chunk.columnCount, 1); + assert.strictEqual(chunk.rowCount, 1); + assertValues(chunk, 0, DuckDBVarCharVector, ['node-neo-api']); + } }); test('default duckdb_api with explicit options', async () => { const instance = await DuckDBInstance.create(undefined, {}); @@ -989,9 +1004,12 @@ describe('api', () => { const result = await connection.run(`select current_setting('duckdb_api') as duckdb_api`); assertColumns(result, [{ name: 'duckdb_api', type: DuckDBVarCharType.instance }]); const chunk = await result.fetchChunk(); - assert.strictEqual(chunk.columnCount, 1); - assert.strictEqual(chunk.rowCount, 1); - assertValues(chunk, 0, DuckDBVarCharVector, ['node-neo-api']); + assert.isDefined(chunk); + if (chunk) { + assert.strictEqual(chunk.columnCount, 1); + assert.strictEqual(chunk.rowCount, 1); + assertValues(chunk, 0, DuckDBVarCharVector, ['node-neo-api']); + } }); test('overriding duckdb_api', async () => { const instance = await DuckDBInstance.create(undefined, { 'duckdb_api': 'custom-duckdb-api' }); @@ -999,8 +1017,11 @@ describe('api', () => { const result = await connection.run(`select current_setting('duckdb_api') as duckdb_api`); assertColumns(result, [{ name: 'duckdb_api', type: DuckDBVarCharType.instance }]); const chunk = await result.fetchChunk(); - assert.strictEqual(chunk.columnCount, 1); - assert.strictEqual(chunk.rowCount, 1); - assertValues(chunk, 0, DuckDBVarCharVector, ['custom-duckdb-api']); + assert.isDefined(chunk); + if (chunk) { + assert.strictEqual(chunk.columnCount, 1); + assert.strictEqual(chunk.rowCount, 1); + assertValues(chunk, 0, DuckDBVarCharVector, ['custom-duckdb-api']); + } }); }); diff --git a/api/test/bench/util/runSql.ts b/api/test/bench/util/runSql.ts index 02be4add..885744f3 100644 --- a/api/test/bench/util/runSql.ts +++ b/api/test/bench/util/runSql.ts @@ -5,7 +5,7 @@ export async function runSql(connection: DuckDBConnection, sql: string): Promise let valueCount = 0; let nullCount = 0; let chunk = await result.fetchChunk(); - while (chunk.rowCount > 0) { + while (chunk && chunk.rowCount > 0) { const col0 = chunk.getColumnVector(0); for (let i = 0; i < col0.itemCount; i++) { if (col0.getItem(i) === null) { diff --git a/bindings/pkgs/@duckdb/node-bindings/duckdb.d.ts b/bindings/pkgs/@duckdb/node-bindings/duckdb.d.ts index 628fe9a6..f75518ff 100644 --- a/bindings/pkgs/@duckdb/node-bindings/duckdb.d.ts +++ b/bindings/pkgs/@duckdb/node-bindings/duckdb.d.ts @@ -288,7 +288,10 @@ export function column_logical_type(result: Result, column_index: number): Logic export function column_count(result: Result): number; // #ifndef DUCKDB_API_NO_DEPRECATED + // DUCKDB_API idx_t duckdb_row_count(duckdb_result *result); +export function row_count(result: Result): number; + // #endif // DUCKDB_API idx_t duckdb_rows_changed(duckdb_result *result); @@ -306,9 +309,16 @@ export function rows_changed(result: Result): number; // not exposed: query, execute_prepared, and execute_pending reject promise with error // #ifndef DUCKDB_API_NO_DEPRECATED + // DUCKDB_API duckdb_data_chunk duckdb_result_get_chunk(duckdb_result result, idx_t chunk_index); +export function result_get_chunk(result: Result, chunkIndex: number): DataChunk; + // DUCKDB_API bool duckdb_result_is_streaming(duckdb_result result); +export function result_is_streaming(result: Result): boolean; + // DUCKDB_API idx_t duckdb_result_chunk_count(duckdb_result result); +export function result_chunk_count(result: Result): number; + // #endif // DUCKDB_API duckdb_result_type duckdb_result_return_type(duckdb_result result); @@ -510,7 +520,10 @@ export function bind_null(prepared_statement: PreparedStatement, index: number): export function execute_prepared(prepared_statement: PreparedStatement): Promise; // #ifndef DUCKDB_API_NO_DEPRECATED + // DUCKDB_API duckdb_state duckdb_execute_prepared_streaming(duckdb_prepared_statement prepared_statement, duckdb_result *out_result); +export function execute_prepared_streaming(prepared_statement: PreparedStatement): Promise; + // #endif // DUCKDB_API idx_t duckdb_extract_statements(duckdb_connection connection, const char *query, duckdb_extracted_statements *out_extracted_statements); @@ -529,7 +542,10 @@ export function extract_statements_error(extracted_statements: ExtractedStatemen export function pending_prepared(prepared_statement: PreparedStatement): PendingResult; // #ifndef DUCKDB_API_NO_DEPRECATED + // DUCKDB_API duckdb_state duckdb_pending_prepared_streaming(duckdb_prepared_statement prepared_statement, duckdb_pending_result *out_result); +export function pending_prepared_streaming(prepared_statement: PreparedStatement): PendingResult; + // #endif // DUCKDB_API void duckdb_destroy_pending(duckdb_pending_result *pending_result); @@ -845,7 +861,7 @@ export function struct_vector_get_child(vector: Vector, index: number): Vector; export function array_vector_get_child(vector: Vector): Vector; // DUCKDB_API bool duckdb_validity_row_is_valid(uint64_t *validity, idx_t row); -export function validity_row_is_valid(validity: Uint8Array, row_index: number): boolean; +export function validity_row_is_valid(validity: Uint8Array | null, row_index: number): boolean; // DUCKDB_API void duckdb_validity_set_row_validity(uint64_t *validity, idx_t row, bool valid); export function validity_set_row_validity(validity: Uint8Array, row_index: number, valid: boolean): void; @@ -1072,7 +1088,7 @@ export function append_data_chunk(appender: Appender, chunk: DataChunk): void; // #endif // DUCKDB_API duckdb_data_chunk duckdb_fetch_chunk(duckdb_result result); -export function fetch_chunk(result: Result): Promise; +export function fetch_chunk(result: Result): Promise; // DUCKDB_API duckdb_cast_function duckdb_create_cast_function(); // DUCKDB_API void duckdb_cast_function_set_source_type(duckdb_cast_function cast_function, duckdb_logical_type source_type); diff --git a/bindings/src/duckdb_node_bindings.cpp b/bindings/src/duckdb_node_bindings.cpp index 9fa17920..f518a77c 100644 --- a/bindings/src/duckdb_node_bindings.cpp +++ b/bindings/src/duckdb_node_bindings.cpp @@ -652,6 +652,37 @@ class ExecutePreparedWorker : public PromiseWorker { }; +class ExecutePreparedStreamingWorker : public PromiseWorker { + +public: + + ExecutePreparedStreamingWorker(Napi::Env env, duckdb_prepared_statement prepared_statement) + : PromiseWorker(env), prepared_statement_(prepared_statement) { + } + +protected: + + void Execute() override { + result_ptr_ = reinterpret_cast(duckdb_malloc(sizeof(duckdb_result))); + if (duckdb_execute_prepared_streaming(prepared_statement_, result_ptr_)) { + SetError(duckdb_result_error(result_ptr_)); + duckdb_destroy_result(result_ptr_); + duckdb_free(result_ptr_); + result_ptr_ = nullptr; + } + } + + Napi::Value Result() override { + return CreateExternalForResult(Env(), result_ptr_); + } + +private: + + duckdb_prepared_statement prepared_statement_; + duckdb_result *result_ptr_; + +}; + class ExtractStatementsWorker : public PromiseWorker { public: @@ -755,6 +786,9 @@ class FetchWorker : public PromiseWorker { void Execute() override { data_chunk_ = duckdb_fetch_chunk(*result_ptr_); + // if (!data_chunk_) { + // SetError("Failed to fetch chunk"); + // } } Napi::Value Result() override { @@ -901,7 +935,11 @@ class DuckDBNodeAddon : public Napi::Addon { InstanceMethod("result_statement_type", &DuckDBNodeAddon::result_statement_type), InstanceMethod("column_logical_type", &DuckDBNodeAddon::column_logical_type), InstanceMethod("column_count", &DuckDBNodeAddon::column_count), + InstanceMethod("row_count", &DuckDBNodeAddon::row_count), InstanceMethod("rows_changed", &DuckDBNodeAddon::rows_changed), + InstanceMethod("result_get_chunk", &DuckDBNodeAddon::result_get_chunk), + InstanceMethod("result_is_streaming", &DuckDBNodeAddon::result_is_streaming), + InstanceMethod("result_chunk_count", &DuckDBNodeAddon::result_chunk_count), InstanceMethod("result_return_type", &DuckDBNodeAddon::result_return_type), InstanceMethod("vector_size", &DuckDBNodeAddon::vector_size), @@ -955,12 +993,14 @@ class DuckDBNodeAddon : public Napi::Addon { InstanceMethod("bind_blob", &DuckDBNodeAddon::bind_blob), InstanceMethod("bind_null", &DuckDBNodeAddon::bind_null), InstanceMethod("execute_prepared", &DuckDBNodeAddon::execute_prepared), + InstanceMethod("execute_prepared_streaming", &DuckDBNodeAddon::execute_prepared_streaming), InstanceMethod("extract_statements", &DuckDBNodeAddon::extract_statements), InstanceMethod("prepare_extracted_statement", &DuckDBNodeAddon::prepare_extracted_statement), InstanceMethod("extract_statements_error", &DuckDBNodeAddon::extract_statements_error), InstanceMethod("pending_prepared", &DuckDBNodeAddon::pending_prepared), + InstanceMethod("pending_prepared_streaming", &DuckDBNodeAddon::pending_prepared_streaming), InstanceMethod("pending_error", &DuckDBNodeAddon::pending_error), InstanceMethod("pending_execute_task", &DuckDBNodeAddon::pending_execute_task), InstanceMethod("pending_execute_check_state", &DuckDBNodeAddon::pending_execute_check_state), @@ -1288,7 +1328,16 @@ class DuckDBNodeAddon : public Napi::Addon { } // #ifndef DUCKDB_API_NO_DEPRECATED + // DUCKDB_API idx_t duckdb_row_count(duckdb_result *result); + // function row_count(result: Result): number + Napi::Value row_count(const Napi::CallbackInfo& info) { + auto env = info.Env(); + auto result_ptr = GetResultFromExternal(env, info[0]); + auto row_count = duckdb_row_count(result_ptr); + return Napi::Number::New(env, row_count); + } + // #endif // DUCKDB_API idx_t duckdb_rows_changed(duckdb_result *result); @@ -1312,9 +1361,38 @@ class DuckDBNodeAddon : public Napi::Addon { // not exposed: query, execute_prepared, and execute_pending reject promise with error // #ifndef DUCKDB_API_NO_DEPRECATED + // DUCKDB_API duckdb_data_chunk duckdb_result_get_chunk(duckdb_result result, idx_t chunk_index); + // function result_get_chunk(result: Result, chunkIndex: number): DataChunk + Napi::Value result_get_chunk(const Napi::CallbackInfo& info) { + auto env = info.Env(); + auto result_ptr = GetResultFromExternal(env, info[0]); + auto chunk_index = info[1].As().Uint32Value(); + auto chunk = duckdb_result_get_chunk(*result_ptr, chunk_index); + if (!chunk) { + throw Napi::Error::New(env, "Failed to get data chunk. Only supported for materialized results."); + } + return CreateExternalForDataChunk(env, chunk); + } + // DUCKDB_API bool duckdb_result_is_streaming(duckdb_result result); + // function result_is_streaming(result: Result): boolean + Napi::Value result_is_streaming(const Napi::CallbackInfo& info) { + auto env = info.Env(); + auto result_ptr = GetResultFromExternal(env, info[0]); + auto is_streaming = duckdb_result_is_streaming(*result_ptr); + return Napi::Boolean::New(env, is_streaming); + } + // DUCKDB_API idx_t duckdb_result_chunk_count(duckdb_result result); + // function result_chunk_count(result: Result): number + Napi::Value result_chunk_count(const Napi::CallbackInfo& info) { + auto env = info.Env(); + auto result_ptr = GetResultFromExternal(env, info[0]); + auto chunk_count = duckdb_result_chunk_count(*result_ptr); + return Napi::Number::New(env, chunk_count); + } + // #endif // DUCKDB_API duckdb_result_type duckdb_result_return_type(duckdb_result result); @@ -1936,7 +2014,17 @@ class DuckDBNodeAddon : public Napi::Addon { } // #ifndef DUCKDB_API_NO_DEPRECATED + // DUCKDB_API duckdb_state duckdb_execute_prepared_streaming(duckdb_prepared_statement prepared_statement, duckdb_result *out_result); + // function execute_prepared_streaming(prepared_statement: PreparedStatement): Promise + Napi::Value execute_prepared_streaming(const Napi::CallbackInfo& info) { + auto env = info.Env(); + auto prepared_statement = GetPreparedStatementFromExternal(env, info[0]); + auto worker = new ExecutePreparedStreamingWorker(env, prepared_statement); + worker->Queue(); + return worker->Promise(); + } + // #endif // DUCKDB_API idx_t duckdb_extract_statements(duckdb_connection connection, const char *query, duckdb_extracted_statements *out_extracted_statements); @@ -1989,7 +2077,21 @@ class DuckDBNodeAddon : public Napi::Addon { } // #ifndef DUCKDB_API_NO_DEPRECATED + // DUCKDB_API duckdb_state duckdb_pending_prepared_streaming(duckdb_prepared_statement prepared_statement, duckdb_pending_result *out_result); + // function pending_prepared_streaming(prepared_statement: PreparedStatement): PendingResult + Napi::Value pending_prepared_streaming(const Napi::CallbackInfo& info) { + auto env = info.Env(); + auto prepared_statement = GetPreparedStatementFromExternal(env, info[0]); + duckdb_pending_result pending_result; + if (duckdb_pending_prepared_streaming(prepared_statement, &pending_result)) { + std::string error = duckdb_pending_error(pending_result); + duckdb_destroy_pending(&pending_result); + throw Napi::Error::New(env, error); + } + return CreateExternalForPendingResult(env, pending_result); + } + // #endif // DUCKDB_API void duckdb_destroy_pending(duckdb_pending_result *pending_result); @@ -2903,6 +3005,9 @@ class DuckDBNodeAddon : public Napi::Addon { auto vector = GetVectorFromExternal(env, info[0]); auto byte_count = info[1].As().Uint32Value(); uint64_t *data = duckdb_vector_get_validity(vector); + if (!data) { + return env.Null(); + } return Napi::Buffer::NewOrCopy(env, reinterpret_cast(data), byte_count); } @@ -2987,10 +3092,10 @@ class DuckDBNodeAddon : public Napi::Addon { } // DUCKDB_API bool duckdb_validity_row_is_valid(uint64_t *validity, idx_t row); - // function validity_row_is_valid(validity: Uint8Array, row_index: number): boolean + // function validity_row_is_valid(validity: Uint8Array | null, row_index: number): boolean Napi::Value validity_row_is_valid(const Napi::CallbackInfo& info) { auto env = info.Env(); - auto validity = reinterpret_cast(info[0].As().Data()); + auto validity = info[0].IsNull() ? nullptr : reinterpret_cast(info[0].As().Data()); auto row_index = info[1].As().Uint32Value(); auto valid = duckdb_validity_row_is_valid(validity, row_index); return Napi::Boolean::New(env, valid); @@ -3501,7 +3606,7 @@ class DuckDBNodeAddon : public Napi::Addon { // #endif // DUCKDB_API duckdb_data_chunk duckdb_fetch_chunk(duckdb_result result); - // function fetch_chunk(result: Result): Promise + // function fetch_chunk(result: Result): Promise Napi::Value fetch_chunk(const Napi::CallbackInfo& info) { auto env = info.Env(); auto result_ptr = GetResultFromExternal(env, info[0]); @@ -3546,7 +3651,7 @@ NODE_API_ADDON(DuckDBNodeAddon) --- 372 total functions - 200 instance methods + 206 instance methods 1 unimplemented logical type functions 13 unimplemented scalar function functions 4 unimplemented scalar function set functions @@ -3562,7 +3667,7 @@ NODE_API_ADDON(DuckDBNodeAddon) 8 unimplemented tasks functions 12 unimplemented cast function functions 26 functions not exposed -+ 47 deprecated functions ++ 41 unimplemented deprecated functions (of 47) --- 372 functions accounted for diff --git a/bindings/test/appender.test.ts b/bindings/test/appender.test.ts index 0e160619..c8bc8ef3 100644 --- a/bindings/test/appender.test.ts +++ b/bindings/test/appender.test.ts @@ -19,12 +19,12 @@ suite('appender', () => { await expectResult(createResult, { statementType: duckdb.StatementType.CREATE, resultType: duckdb.ResultType.NOTHING, + chunkCount: 0, + rowCount: 0, columns: [ { name: 'Count', logicalType: BIGINT }, ], - chunks: [ - { columnCount: 0, rowCount: 0, vectors: [] }, - ], + chunks: [], }); const appender = duckdb.appender_create(connection, 'main', 'appender_target'); @@ -43,6 +43,8 @@ suite('appender', () => { const result = await duckdb.query(connection, 'from appender_target'); await expectResult(result, { + chunkCount: 1, + rowCount: 3, columns: [ { name: 'i', logicalType: INTEGER }, ], @@ -81,12 +83,12 @@ suite('appender', () => { await expectResult(createResult, { statementType: duckdb.StatementType.CREATE, resultType: duckdb.ResultType.NOTHING, + chunkCount: 0, + rowCount: 0, columns: [ { name: 'Count', logicalType: BIGINT }, ], - chunks: [ - { columnCount: 0, rowCount: 0, vectors: [] }, - ], + chunks: [], }); @@ -150,6 +152,8 @@ suite('appender', () => { const result = await duckdb.query(connection, 'from appender_target'); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'bool', logicalType: BOOLEAN }, { name: 'int8', logicalType: TINYINT }, @@ -210,12 +214,12 @@ suite('appender', () => { await expectResult(createResult, { statementType: duckdb.StatementType.CREATE, resultType: duckdb.ResultType.NOTHING, + chunkCount: 0, + rowCount: 0, columns: [ { name: 'Count', logicalType: BIGINT }, ], - chunks: [ - { columnCount: 0, rowCount: 0, vectors: [] }, - ], + chunks: [], }); const appender = duckdb.appender_create(connection, 'main', 'appender_target'); @@ -223,11 +227,16 @@ suite('appender', () => { const source_result = await duckdb.query(connection, 'select int, varchar from test_all_types()'); const source_chunk = await duckdb.fetch_chunk(source_result); - duckdb.append_data_chunk(appender, source_chunk); + expect(source_chunk).toBeDefined(); + if (source_chunk) { + duckdb.append_data_chunk(appender, source_chunk); + } duckdb.appender_flush(appender); const result = await duckdb.query(connection, 'from appender_target'); await expectResult(result, { + chunkCount: 1, + rowCount: 3, columns: [ { name: 'i', logicalType: INTEGER }, { name: 'v', logicalType: VARCHAR }, diff --git a/bindings/test/config.test.ts b/bindings/test/config.test.ts index bcbda458..1207b9aa 100644 --- a/bindings/test/config.test.ts +++ b/bindings/test/config.test.ts @@ -24,6 +24,8 @@ suite('config', () => { const connection = await duckdb.connect(db); const result = await duckdb.query(connection, `select current_setting('duckdb_api') as duckdb_api`); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'duckdb_api', logicalType: { typeId: duckdb.Type.VARCHAR } }, ], @@ -38,6 +40,8 @@ suite('config', () => { const connection = await duckdb.connect(db); const result = await duckdb.query(connection, `select current_setting('duckdb_api') as duckdb_api`); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'duckdb_api', logicalType: { typeId: duckdb.Type.VARCHAR } }, ], @@ -53,6 +57,8 @@ suite('config', () => { const connection = await duckdb.connect(db); const result = await duckdb.query(connection, `select current_setting('duckdb_api') as duckdb_api`); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'duckdb_api', logicalType: { typeId: duckdb.Type.VARCHAR } }, ], diff --git a/bindings/test/extracted_statements.test.ts b/bindings/test/extracted_statements.test.ts index 046ebda0..d6854ba5 100644 --- a/bindings/test/extracted_statements.test.ts +++ b/bindings/test/extracted_statements.test.ts @@ -26,6 +26,8 @@ suite('extracted statements', () => { const prepared = await duckdb.prepare_extracted_statement(connection, extracted_statements, 0); const result = await duckdb.execute_prepared(prepared); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'a', logicalType: INTEGER }, ], @@ -45,6 +47,8 @@ suite('extracted statements', () => { const prepared0 = await duckdb.prepare_extracted_statement(connection, extracted_statements, 0); const result0 = await duckdb.execute_prepared(prepared0); await expectResult(result0, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'a', logicalType: INTEGER }, ], @@ -56,6 +60,8 @@ suite('extracted statements', () => { const prepared1 = await duckdb.prepare_extracted_statement(connection, extracted_statements, 1); const result1 = await duckdb.execute_prepared(prepared1); await expectResult(result1, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'b', logicalType: INTEGER }, ], @@ -67,6 +73,8 @@ suite('extracted statements', () => { const prepared2 = await duckdb.prepare_extracted_statement(connection, extracted_statements, 2); const result2 = await duckdb.execute_prepared(prepared2); await expectResult(result2, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'c', logicalType: INTEGER }, ], diff --git a/bindings/test/pending.test.ts b/bindings/test/pending.test.ts index ceac8d03..1981e667 100644 --- a/bindings/test/pending.test.ts +++ b/bindings/test/pending.test.ts @@ -13,6 +13,8 @@ suite('pending', () => { const pending = duckdb.pending_prepared(prepared); const result = await duckdb.execute_pending(pending); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'a', logicalType: INTEGER }, ], @@ -22,6 +24,24 @@ suite('pending', () => { }); }); }); + test('streaming', async () => { + await withConnection(async (connection) => { + const prepared = await duckdb.prepare(connection, 'select n::integer as int from range(5000) t(n)'); + const pending = duckdb.pending_prepared_streaming(prepared); + const result = await duckdb.execute_pending(pending); + await expectResult(result, { + isStreaming: true, + columns: [ + { name: 'int', logicalType: INTEGER }, + ], + chunks: [ + { rowCount: 2048, vectors: [data(4, null, Array.from({ length: 2048 }).map((_, i) => i))]}, + { rowCount: 2048, vectors: [data(4, null, Array.from({ length: 2048 }).map((_, i) => 2048 + i))]}, + { rowCount: 904, vectors: [data(4, null, Array.from({ length: 904 }).map((_, i) => 4096 + i))]}, + ], + }); + }); + }); test('tasks', async () => { await withConnection(async (connection) => { const prepared = await duckdb.prepare(connection, 'select count(*) as count from range(10_000)'); @@ -33,6 +53,8 @@ suite('pending', () => { } const result = await duckdb.execute_pending(pending); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'count', logicalType: BIGINT }, ], diff --git a/bindings/test/prepared_statements.test.ts b/bindings/test/prepared_statements.test.ts index 1da2f90a..a2648669 100644 --- a/bindings/test/prepared_statements.test.ts +++ b/bindings/test/prepared_statements.test.ts @@ -39,6 +39,8 @@ suite('prepared statements', () => { expect(duckdb.prepared_statement_type(prepared)).toBe(duckdb.StatementType.SELECT); const result = await duckdb.execute_prepared(prepared); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'seventeen', logicalType: INTEGER }, ], @@ -66,6 +68,8 @@ suite('prepared statements', () => { const result = await duckdb.execute_prepared(prepared); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'a', logicalType: INTEGER }, { name: 'b', logicalType: INTEGER }, @@ -94,6 +98,8 @@ suite('prepared statements', () => { const result = await duckdb.execute_prepared(prepared); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'two', logicalType: INTEGER }, { name: 'one', logicalType: INTEGER }, @@ -124,6 +130,8 @@ suite('prepared statements', () => { const result = await duckdb.execute_prepared(prepared); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'a', logicalType: INTEGER }, { name: 'b', logicalType: INTEGER }, @@ -153,6 +161,8 @@ suite('prepared statements', () => { const result = await duckdb.execute_prepared(prepared); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'a', logicalType: INTEGER }, { name: 'b', logicalType: INTEGER }, @@ -259,6 +269,8 @@ suite('prepared statements', () => { const result = await duckdb.execute_prepared(prepared); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'boolean', logicalType: BOOLEAN }, { name: 'int8', logicalType: TINYINT }, @@ -346,6 +358,8 @@ suite('prepared statements', () => { const result = await duckdb.execute_prepared(prepared); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'struct', logicalType: STRUCT(ENTRY('a', INTEGER), ENTRY('b', VARCHAR)) }, { name: 'list', logicalType: LIST(INTEGER) }, @@ -364,4 +378,21 @@ suite('prepared statements', () => { }); }); }); + test('streaming', async () => { + await withConnection(async (connection) => { + const prepared = await duckdb.prepare(connection, 'select n::integer as int from range(5000) t(n)'); + const result = await duckdb.execute_prepared_streaming(prepared); + await expectResult(result, { + isStreaming: true, + columns: [ + { name: 'int', logicalType: INTEGER }, + ], + chunks: [ + { rowCount: 2048, vectors: [data(4, null, Array.from({ length: 2048 }).map((_, i) => i))]}, + { rowCount: 2048, vectors: [data(4, null, Array.from({ length: 2048 }).map((_, i) => 2048 + i))]}, + { rowCount: 904, vectors: [data(4, null, Array.from({ length: 904 }).map((_, i) => 4096 + i))]}, + ], + }); + }); + }); }); diff --git a/bindings/test/query.test.ts b/bindings/test/query.test.ts index 2db38f2b..f76f6ba8 100644 --- a/bindings/test/query.test.ts +++ b/bindings/test/query.test.ts @@ -49,6 +49,8 @@ suite('query', () => { await withConnection(async (connection) => { const result = await duckdb.query(connection, 'select 17 as seventeen'); await expectResult(result, { + chunkCount: 1, + rowCount: 1, columns: [ { name: 'seventeen', logicalType: { typeId: duckdb.Type.INTEGER } }, ], @@ -68,6 +70,8 @@ suite('query', () => { const result = await duckdb.query(connection, `from test_all_types(use_large_enum=${useLargeEnum})`); const validity = [true, true, false]; await expectResult(result, { + chunkCount: 1, + rowCount: 3, columns: [ { name: 'bool', logicalType: BOOLEAN }, { name: 'tinyint', logicalType: TINYINT }, @@ -264,17 +268,19 @@ suite('query', () => { await expectResult(createResult, { statementType: duckdb.StatementType.CREATE, resultType: duckdb.ResultType.NOTHING, + chunkCount: 0, + rowCount: 0, columns: [ { name: 'Count', logicalType: BIGINT }, ], - chunks: [ - { columnCount: 0, rowCount: 0, vectors: [] }, - ], + chunks: [], }); const insertResult = await duckdb.query(connection, 'insert into test_create_and_insert from range(17)'); await expectResult(insertResult, { statementType: duckdb.StatementType.INSERT, resultType: duckdb.ResultType.CHANGED_ROWS, + chunkCount: 1, + rowCount: 1, rowsChanged: 17, columns: [ { name: 'Count', logicalType: BIGINT }, diff --git a/bindings/test/utils/ExpectedResult.ts b/bindings/test/utils/ExpectedResult.ts index 4da58d72..9b307d4f 100644 --- a/bindings/test/utils/ExpectedResult.ts +++ b/bindings/test/utils/ExpectedResult.ts @@ -16,6 +16,9 @@ export interface ExpectedChunk { export interface ExpectedResult { statementType?: duckdb.StatementType; resultType?: duckdb.ResultType; + isStreaming?: boolean; + chunkCount?: number; + rowCount?: number; rowsChanged?: number; columns: ExpectedColumn[]; chunks: ExpectedChunk[]; diff --git a/bindings/test/utils/ExpectedVector.ts b/bindings/test/utils/ExpectedVector.ts index 4c50c7f4..2970a424 100644 --- a/bindings/test/utils/ExpectedVector.ts +++ b/bindings/test/utils/ExpectedVector.ts @@ -1,13 +1,13 @@ export interface ExpectedArrayVector { kind: 'array'; itemCount: number; - validity: boolean[]; + validity: boolean[] | null; child: ExpectedVector; } export interface ExpectedDataVector { kind: 'data'; - validity: boolean[]; + validity: boolean[] | null; itemBytes: number; values: any[]; } @@ -16,7 +16,7 @@ export type ExpectedListEntry = [bigint, bigint] | null; export interface ExpectedListVector { kind: 'list'; - validity: boolean[]; + validity: boolean[] | null; entries: (ExpectedListEntry | null)[]; childItemCount: number; child: ExpectedVector; @@ -24,7 +24,7 @@ export interface ExpectedListVector { export interface ExpectedMapVector { kind: 'map'; - validity: boolean[]; + validity: boolean[] | null; entries: (ExpectedListEntry | null)[]; keys: ExpectedVector; values: ExpectedVector; @@ -33,7 +33,7 @@ export interface ExpectedMapVector { export interface ExpectedStructVector { kind: 'struct'; itemCount: number; - validity: boolean[]; + validity: boolean[] | null; children: ExpectedVector[]; } diff --git a/bindings/test/utils/expectResult.ts b/bindings/test/utils/expectResult.ts index 1ee7049b..991232c0 100644 --- a/bindings/test/utils/expectResult.ts +++ b/bindings/test/utils/expectResult.ts @@ -7,6 +7,13 @@ import { expectLogicalType } from './expectLogicalType'; export async function expectResult(result: duckdb.Result, expectedResult: ExpectedResult) { expect(duckdb.result_statement_type(result)).toBe(expectedResult.statementType ?? duckdb.StatementType.SELECT); expect(duckdb.result_return_type(result)).toBe(expectedResult.resultType ?? duckdb.ResultType.QUERY_RESULT); + expect(duckdb.result_is_streaming(result)).toBe(!!expectedResult.isStreaming); + if (expectedResult.chunkCount != undefined) { + expect(duckdb.result_chunk_count(result)).toBe(expectedResult.chunkCount); + } + if (expectedResult.rowCount != undefined) { + expect(duckdb.row_count(result)).toBe(expectedResult.rowCount); + } expect(duckdb.rows_changed(result)).toBe(expectedResult.rowsChanged ?? 0); expect(duckdb.column_count(result)).toBe(expectedResult.columns.length); for (let col = 0; col < expectedResult.columns.length; col++) { @@ -15,8 +22,17 @@ export async function expectResult(result: duckdb.Result, expectedResult: Expect expect(duckdb.column_type(result, col), `${col}`).toBe(expectedColumn.logicalType.typeId); expectLogicalType(duckdb.column_logical_type(result, col), expectedColumn.logicalType, `col ${col}`); } + if (expectedResult.chunkCount != undefined && expectedResult.chunkCount > 0) { + for (let chunkIndex = 0; chunkIndex < expectedResult.chunkCount; chunkIndex++) { + const chunk = duckdb.result_get_chunk(result, chunkIndex); + expectChunk(chunk, expectedResult.chunks[chunkIndex], expectedResult.columns); + } + } for (const expectedChunk of expectedResult.chunks) { const chunk = await duckdb.fetch_chunk(result); - expectChunk(chunk, expectedChunk, expectedResult.columns); + expect(chunk).toBeDefined(); + if (chunk) { + expectChunk(chunk, expectedChunk, expectedResult.columns); + } } } diff --git a/bindings/test/utils/expectValidity.ts b/bindings/test/utils/expectValidity.ts index 8c62f5f3..41e1e459 100644 --- a/bindings/test/utils/expectValidity.ts +++ b/bindings/test/utils/expectValidity.ts @@ -2,7 +2,7 @@ import duckdb from '@duckdb/node-bindings'; import { expect } from 'vitest'; import { isValid } from './isValid'; -export function expectValidity(validity_bytes: Uint8Array, validity: BigUint64Array, bit: number, expected: boolean, vectorName: string) { +export function expectValidity(validity_bytes: Uint8Array | null, validity: BigUint64Array | null, bit: number, expected: boolean, vectorName: string) { expect(duckdb.validity_row_is_valid(validity_bytes, bit), `${vectorName} validity_bytes_bit[${bit}]`).toBe(expected); expect(isValid(validity, bit), `${vectorName} validity_bit[${bit}]`).toBe(expected); } diff --git a/bindings/test/utils/expectVector.ts b/bindings/test/utils/expectVector.ts index 9a1be25a..6ca69eec 100644 --- a/bindings/test/utils/expectVector.ts +++ b/bindings/test/utils/expectVector.ts @@ -37,10 +37,13 @@ export function expectVector(vector: duckdb.Vector, expectedVector: ExpectedVect } } -function getVectorValidity(vector: duckdb.Vector, itemCount: number): { validity: BigUint64Array, validityBytes: Uint8Array } { +function getVectorValidity(vector: duckdb.Vector, itemCount: number): { validity: BigUint64Array | null, validityBytes: Uint8Array | null } { const validityUInt64Count = Math.ceil(itemCount / 64); const validityByteCount = validityUInt64Count * 8; const validityBytes = duckdb.vector_get_validity(vector, validityByteCount); + if (!validityBytes) { + return { validity: null, validityBytes: null }; + } const validity = new BigUint64Array(validityBytes.buffer, 0, validityUInt64Count); return { validity, validityBytes }; } @@ -59,7 +62,7 @@ function expectArrayVector(vector: duckdb.Vector, expectedVector: ExpectedArrayV const itemCount = expectedVector.itemCount; const { validity, validityBytes } = getVectorValidity(vector, itemCount); for (let row = 0; row < itemCount; row++) { - expectValidity(validityBytes, validity, row, expectedVector.validity[row], `${vectorName} row[${row}]`); + expectValidity(validityBytes, validity, row, expectedVector.validity ? expectedVector.validity[row] : true, `${vectorName} row[${row}]`); } const childVector = duckdb.array_vector_get_child(vector); @@ -71,7 +74,7 @@ function expectDataVector(vector: duckdb.Vector, expectedVector: ExpectedDataVec const { validity, validityBytes } = getVectorValidity(vector, itemCount); const dv = getVectorData(vector, itemCount, expectedVector.itemBytes); for (let row = 0; row < itemCount; row++) { - expectValidity(validityBytes, validity, row, expectedVector.validity[row], `${vectorName} row[${row}]`); + expectValidity(validityBytes, validity, row, expectedVector.validity ? expectedVector.validity[row] : true, `${vectorName} row[${row}]`); expect(getValue(expectedLogicalType, validity, dv, row), `${vectorName} row[${row}]`).toStrictEqual(expectedVector.values[row]); } } @@ -86,7 +89,7 @@ function expectListVector(vector: duckdb.Vector, expectedVector: ExpectedListVec const { validity, validityBytes } = getVectorValidity(vector, itemCount); const entriesDV = getVectorData(vector, itemCount, 16); for (let row = 0; row < itemCount; row++) { - expectValidity(validityBytes, validity, row, expectedVector.validity[row], `${vectorName} row[${row}]`); + expectValidity(validityBytes, validity, row, expectedVector.validity ? expectedVector.validity[row] : true, `${vectorName} row[${row}]`); expect(getListEntry(validity, entriesDV, row)).toStrictEqual(expectedVector.entries[row]); } @@ -106,7 +109,7 @@ function expectMapVector(vector: duckdb.Vector, expectedVector: ExpectedMapVecto const { validity, validityBytes } = getVectorValidity(vector, itemCount); const entriesDV = getVectorData(vector, itemCount, 16); for (let row = 0; row < itemCount; row++) { - expectValidity(validityBytes, validity, row, expectedVector.validity[row], `${vectorName} row[${row}]`); + expectValidity(validityBytes, validity, row, expectedVector.validity ? expectedVector.validity[row] : true, `${vectorName} row[${row}]`); expect(getListEntry(validity, entriesDV, row)).toStrictEqual(expectedVector.entries[row]); } @@ -128,7 +131,7 @@ function expectStructVector(vector: duckdb.Vector, expectedVector: ExpectedStruc const itemCount = expectedVector.itemCount; const { validity, validityBytes } = getVectorValidity(vector, itemCount); for (let row = 0; row < itemCount; row++) { - expectValidity(validityBytes, validity, row, expectedVector.validity[row], `${vectorName} row[${row}]`); + expectValidity(validityBytes, validity, row, expectedVector.validity ? expectedVector.validity[row] : true, `${vectorName} row[${row}]`); } for (let i = 0; i < expectedVector.children.length; i++) { diff --git a/bindings/test/utils/expectedVectors.ts b/bindings/test/utils/expectedVectors.ts index 1f61cb12..41f1e700 100644 --- a/bindings/test/utils/expectedVectors.ts +++ b/bindings/test/utils/expectedVectors.ts @@ -11,7 +11,7 @@ import { export function array( itemCount: number, - validity: boolean[], + validity: boolean[] | null, child: ExpectedVector ): ExpectedArrayVector { return { @@ -24,7 +24,7 @@ export function array( export function data( itemBytes: number, - validity: boolean[], + validity: boolean[] | null, values: any[] ): ExpectedDataVector { return { @@ -36,7 +36,7 @@ export function data( } export function list( - validity: boolean[], + validity: boolean[] | null, entries: (ExpectedListEntry | null)[], childItemCount: number, child: ExpectedVector @@ -51,7 +51,7 @@ export function list( } export function map( - validity: boolean[], + validity: boolean[] | null, entries: (ExpectedListEntry | null)[], keys: ExpectedVector, values: ExpectedVector @@ -67,7 +67,7 @@ export function map( export function struct( itemCount: number, - validity: boolean[], + validity: boolean[] | null, children: ExpectedVector[] ): ExpectedStructVector { return { diff --git a/bindings/test/utils/getValue.ts b/bindings/test/utils/getValue.ts index 30e59e02..a74c71ea 100644 --- a/bindings/test/utils/getValue.ts +++ b/bindings/test/utils/getValue.ts @@ -84,7 +84,7 @@ function getBuffer(dv: DataView, offset: number): Buffer { return Buffer.from(getStringBytes(dv, offset)); } -export function getValue(logicalType: ExpectedLogicalType, validity: BigUint64Array, dv: DataView, index: number): any { +export function getValue(logicalType: ExpectedLogicalType, validity: BigUint64Array | null, dv: DataView, index: number): any { if (!isValid(validity, index)) { return null; } @@ -200,7 +200,7 @@ export function getValue(logicalType: ExpectedLogicalType, validity: BigUint64Ar } } -export function getListEntry(validity: BigUint64Array, dv: DataView, index: number): [bigint, bigint] | null { +export function getListEntry(validity: BigUint64Array | null, dv: DataView, index: number): [bigint, bigint] | null { if (!isValid(validity, index)) { return null; } diff --git a/bindings/test/utils/isValid.ts b/bindings/test/utils/isValid.ts index e915775f..25487469 100644 --- a/bindings/test/utils/isValid.ts +++ b/bindings/test/utils/isValid.ts @@ -1,3 +1,6 @@ -export function isValid(validity: BigUint64Array, bit: number): boolean { +export function isValid(validity: BigUint64Array | null, bit: number): boolean { + if (!validity) { + return true; + } return (validity[Math.floor(bit / 64)] & (1n << BigInt(bit % 64))) !== 0n; } From f9743b74c0e39888c8e42f6f74a8b0d7efaf4cf6 Mon Sep 17 00:00:00 2001 From: Jeff Raymakers Date: Sun, 22 Dec 2024 17:16:56 -0800 Subject: [PATCH 2/2] add result streaming to api --- api/pkgs/@duckdb/node-api/README.md | 80 ++++++++++++++++++----------- api/src/DuckDBConnection.ts | 31 ++++++++++- api/src/DuckDBMaterializedResult.ts | 18 +++++++ api/src/DuckDBPendingResult.ts | 3 +- api/src/DuckDBPreparedStatement.ts | 28 ++++++++-- api/src/DuckDBResult.ts | 5 +- api/src/createResult.ts | 11 ++++ api/src/index.ts | 1 + 8 files changed, 139 insertions(+), 38 deletions(-) create mode 100644 api/src/DuckDBMaterializedResult.ts create mode 100644 api/src/createResult.ts diff --git a/api/pkgs/@duckdb/node-api/README.md b/api/pkgs/@duckdb/node-api/README.md index b993c500..47dc6b1b 100644 --- a/api/pkgs/@duckdb/node-api/README.md +++ b/api/pkgs/@duckdb/node-api/README.md @@ -94,6 +94,14 @@ prepared.bindInteger(2, 42); const result = await prepared.run(); ``` +### Stream Results + +Streaming results evaluate lazily when rows are read. + +```ts +const result = await connection.stream('from range(10_000)'); +``` + ### Inspect Result Get column names and types: @@ -102,6 +110,38 @@ const columnNames = result.columnNames(); const columnTypes = result.columnTypes(); ``` +### Result Reader + +Run and read all data: +```ts +const reader = await connection.runAndReadAll('from test_all_types()'); +const rows = reader.getRows(); +// OR: const columns = reader.getColumns(); +``` + +Stream and read up to (at least) some number of rows: +```ts +const reader = await connection.streamAndReadUntil('from range(5000)', 1000); +const rows = reader.getRows(); +// rows.length === 2048. (Rows are read in chunks of 2048.) +``` + +Read rows incrementally: +```ts +const reader = await connection.streamAndRead('from range(5000)'); +reader.readUntil(2000); +// reader.currentRowCount === 2048 (Rows are read in chunks of 2048.) +// reader.done === false +reader.readUntil(4000); +// reader.currentRowCount === 4096 +// reader.done === false +reader.readUntil(6000); +// reader.currentRowCount === 5000 +// reader.done === true +``` + +### Read chunks + Fetch all chunks: ```ts const chunks = await result.fetchAllChunks(); @@ -120,6 +160,16 @@ while (true) { } ``` +For materialized (non-streaming) results, chunks can be read by index: +```ts +const rowCount = result.rowCount; +const chunkCount = result.chunkCount; +for (let i = 0; i < chunkCount; i++) { + const chunk = result.getChunk(i); + // ... +} +``` + Read chunk data (column-major): ```ts // array of columns, each as an array of values @@ -148,36 +198,6 @@ for (let columnIndex = 0; columnIndex < columnCount; columnIndex++) { } ``` -### Result Reader - -Run and read all data: -```ts -const reader = await connection.runAndReadAll('from test_all_types()'); -const rows = reader.getRows(); -// OR: const columns = reader.getColumns(); -``` - -Run and read up to (at least) some number of rows: -```ts -const reader = await connection.runAndReadUtil('from range(5000)', 1000); -const rows = reader.getRows(); -// rows.length === 2048. (Rows are read in chunks of 2048.) -``` - -Read rows incrementally: -```ts -const reader = await connection.runAndRead('from range(5000)'); -reader.readUntil(2000); -// reader.currentRowCount === 2048 (Rows are read in chunks of 2048.) -// reader.done === false -reader.readUntil(4000); -// reader.currentRowCount === 4096 -// reader.done === false -reader.readUntil(6000); -// reader.currentRowCount === 5000 -// reader.done === true -``` - ### Inspect Data Types ```ts diff --git a/api/src/DuckDBConnection.ts b/api/src/DuckDBConnection.ts index 1bc28665..f52c0412 100644 --- a/api/src/DuckDBConnection.ts +++ b/api/src/DuckDBConnection.ts @@ -2,9 +2,11 @@ import duckdb from '@duckdb/node-bindings'; import { DuckDBAppender } from './DuckDBAppender'; import { DuckDBExtractedStatements } from './DuckDBExtractedStatements'; import { DuckDBInstance } from './DuckDBInstance'; +import { DuckDBMaterializedResult } from './DuckDBMaterializedResult'; import { DuckDBPreparedStatement } from './DuckDBPreparedStatement'; import { DuckDBResult } from './DuckDBResult'; import { DuckDBResultReader } from './DuckDBResultReader'; +import { DuckDBPendingResult } from './DuckDBPendingResult'; export class DuckDBConnection { private readonly connection: duckdb.Connection; @@ -22,8 +24,8 @@ export class DuckDBConnection { public get progress(): duckdb.QueryProgress { return duckdb.query_progress(this.connection); } - public async run(sql: string): Promise { - return new DuckDBResult(await duckdb.query(this.connection, sql)); + public async run(sql: string): Promise { + return new DuckDBMaterializedResult(await duckdb.query(this.connection, sql)); } public async runAndRead(sql: string): Promise { return new DuckDBResultReader(await this.run(sql)); @@ -38,6 +40,31 @@ export class DuckDBConnection { await reader.readUntil(targetRowCount); return reader; } + public async stream(sql: string): Promise { + const prepared = await this.prepare(sql); + return prepared.stream(); + } + public async streamAndRead(sql: string): Promise { + return new DuckDBResultReader(await this.stream(sql)); + } + public async streamAndReadAll(sql: string): Promise { + const reader = new DuckDBResultReader(await this.stream(sql)); + await reader.readAll(); + return reader; + } + public async streamAndReadUntil(sql: string, targetRowCount: number): Promise { + const reader = new DuckDBResultReader(await this.stream(sql)); + await reader.readUntil(targetRowCount); + return reader; + } + public async start(sql: string): Promise { + const prepared = await this.prepare(sql); + return prepared.start(); + } + public async startStream(sql: string): Promise { + const prepared = await this.prepare(sql); + return prepared.startStream(); + } public async prepare(sql: string): Promise { return new DuckDBPreparedStatement( await duckdb.prepare(this.connection, sql) diff --git a/api/src/DuckDBMaterializedResult.ts b/api/src/DuckDBMaterializedResult.ts new file mode 100644 index 00000000..d5e57269 --- /dev/null +++ b/api/src/DuckDBMaterializedResult.ts @@ -0,0 +1,18 @@ +import duckdb from '@duckdb/node-bindings'; +import { DuckDBDataChunk } from './DuckDBDataChunk'; +import { DuckDBResult } from './DuckDBResult'; + +export class DuckDBMaterializedResult extends DuckDBResult { + constructor(result: duckdb.Result) { + super(result); + } + public get rowCount(): number { + return duckdb.row_count(this.result); + } + public get chunkCount(): number { + return duckdb.result_chunk_count(this.result); + } + public getChunk(chunkIndex: number): DuckDBDataChunk { + return new DuckDBDataChunk(duckdb.result_get_chunk(this.result, chunkIndex)); + } +} diff --git a/api/src/DuckDBPendingResult.ts b/api/src/DuckDBPendingResult.ts index 48627c35..d81b55b5 100644 --- a/api/src/DuckDBPendingResult.ts +++ b/api/src/DuckDBPendingResult.ts @@ -1,4 +1,5 @@ import duckdb from '@duckdb/node-bindings'; +import { createResult } from './createResult'; import { DuckDBResult } from './DuckDBResult'; import { DuckDBResultReader } from './DuckDBResultReader'; @@ -34,7 +35,7 @@ export class DuckDBPendingResult { } } public async getResult(): Promise { - return new DuckDBResult(await duckdb.execute_pending(this.pending_result)); + return createResult(await duckdb.execute_pending(this.pending_result)); } public async read(): Promise { return new DuckDBResultReader(await this.getResult()); diff --git a/api/src/DuckDBPreparedStatement.ts b/api/src/DuckDBPreparedStatement.ts index 7f14253b..c29a0dfa 100644 --- a/api/src/DuckDBPreparedStatement.ts +++ b/api/src/DuckDBPreparedStatement.ts @@ -1,4 +1,5 @@ import duckdb from '@duckdb/node-bindings'; +import { DuckDBMaterializedResult } from './DuckDBMaterializedResult'; import { DuckDBPendingResult } from './DuckDBPendingResult'; import { DuckDBResult } from './DuckDBResult'; import { DuckDBResultReader } from './DuckDBResultReader'; @@ -111,10 +112,8 @@ export class DuckDBPreparedStatement { // public bindValue(parameterIndex: number, value: Value) { // duckdb.bind_value(this.prepared_statement, parameterIndex, value); // } - public async run(): Promise { - return new DuckDBResult( - await duckdb.execute_prepared(this.prepared_statement) - ); + public async run(): Promise { + return new DuckDBMaterializedResult(await duckdb.execute_prepared(this.prepared_statement)); } public async runAndRead(): Promise { return new DuckDBResultReader(await this.run()); @@ -129,9 +128,30 @@ export class DuckDBPreparedStatement { await reader.readUntil(targetRowCount); return reader; } + public async stream(): Promise { + return new DuckDBResult(await duckdb.execute_prepared_streaming(this.prepared_statement)); + } + public async streamAndRead(): Promise { + return new DuckDBResultReader(await this.stream()); + } + public async streamAndReadAll(): Promise { + const reader = new DuckDBResultReader(await this.stream()); + await reader.readAll(); + return reader; + } + public async streamAndReadUntil(targetRowCount: number): Promise { + const reader = new DuckDBResultReader(await this.stream()); + await reader.readUntil(targetRowCount); + return reader; + } public start(): DuckDBPendingResult { return new DuckDBPendingResult( duckdb.pending_prepared(this.prepared_statement) ); } + public startStream(): DuckDBPendingResult { + return new DuckDBPendingResult( + duckdb.pending_prepared_streaming(this.prepared_statement) + ); + } } diff --git a/api/src/DuckDBResult.ts b/api/src/DuckDBResult.ts index 19f128be..0c1553c9 100644 --- a/api/src/DuckDBResult.ts +++ b/api/src/DuckDBResult.ts @@ -8,7 +8,7 @@ import { ResultReturnType, StatementType } from './enums'; import { DuckDBValue } from './values'; export class DuckDBResult { - private readonly result: duckdb.Result; + protected readonly result: duckdb.Result; constructor(result: duckdb.Result) { this.result = result; } @@ -56,6 +56,9 @@ export class DuckDBResult { } return columnTypes; } + public get isStreaming(): boolean { + return duckdb.result_is_streaming(this.result); + } public get rowsChanged(): number { return duckdb.rows_changed(this.result); } diff --git a/api/src/createResult.ts b/api/src/createResult.ts new file mode 100644 index 00000000..2c51651f --- /dev/null +++ b/api/src/createResult.ts @@ -0,0 +1,11 @@ +import duckdb from '@duckdb/node-bindings'; +import { DuckDBMaterializedResult } from './DuckDBMaterializedResult'; +import { DuckDBResult } from './DuckDBResult'; + +export function createResult(result: duckdb.Result) { + if (duckdb.result_is_streaming(result)) { + return new DuckDBResult(result); + } else { + return new DuckDBMaterializedResult(result); + } +} diff --git a/api/src/index.ts b/api/src/index.ts index 6720ce90..d5995bc5 100644 --- a/api/src/index.ts +++ b/api/src/index.ts @@ -10,6 +10,7 @@ export * from './DuckDBDataChunk'; export * from './DuckDBExtractedStatements'; export * from './DuckDBInstance'; export * from './DuckDBLogicalType'; +export * from './DuckDBMaterializedResult'; export * from './DuckDBPendingResult'; export * from './DuckDBPreparedStatement'; export * from './DuckDBResult';