diff --git a/package.json b/package.json index 791f08a2..48bb0aa2 100644 --- a/package.json +++ b/package.json @@ -7,7 +7,7 @@ "scripts": { "clean": "rm -r packages/*/lib", "build": "lerna run build", - "watch": "lerna run --parallel watch", + "watch": "lerna run --parallel --stream watch -- --preserveWatchOutput", "lint": "tslint --project tsconfig.json -t verbose", "lint!": "npm run lint -- --fix", "test": "npm run lint && lerna run --stream test" diff --git a/packages/example/sql/schema.sql b/packages/example/sql/schema.sql index 6a79672b..ddfa9ceb 100644 --- a/packages/example/sql/schema.sql +++ b/packages/example/sql/schema.sql @@ -77,3 +77,27 @@ VALUES (1, 'Black Swan', 1), INSERT INTO book_comments (user_id, book_id, body) VALUES (1, 1, 'Fantastic read, recommend it!'), (1, 2, 'Did not like it, expected much more...'); + +CREATE TYPE "Iso31661Alpha2" AS ENUM ( + 'AD', 'AE', 'AF', 'AG', 'AI', 'AL', 'AM', 'AO', 'AQ', 'AR', 'AS', 'AT', 'AU', 'AW', 'AX', 'AZ', 'BA', 'BB', 'BD', 'BE', 'BF', + 'BG', 'BH', 'BI', 'BJ', 'BL', 'BM', 'BN', 'BO', 'BQ', 'BR', 'BS', 'BT', 'BV', 'BW', 'BY', 'BZ', 'CA', 'CC', 'CD', 'CF', 'CG', + 'CH', 'CI', 'CK', 'CL', 'CM', 'CN', 'CO', 'CR', 'CU', 'CV', 'CW', 'CX', 'CY', 'CZ', 'DE', 'DJ', 'DK', 'DM', 'DO', 'DZ', 'EC', + 'EE', 'EG', 'EH', 'ER', 'ES', 'ET', 'FI', 'FJ', 'FK', 'FM', 'FO', 'FR', 'GA', 'GB', 'GD', 'GE', 'GF', 'GG', 'GH', 'GI', 'GL', + 'GM', 'GN', 'GP', 'GQ', 'GR', 'GS', 'GT', 'GU', 'GW', 'GY', 'HK', 'HM', 'HN', 'HR', 'HT', 'HU', 'ID', 'IE', 'IL', 'IM', 'IN', + 'IQ', 'IR', 'IS', 'IT', 'JE', 'JM', 'JO', 'JP', 'KE', 'KG', 'KH', 'KI', 'KM', 'KN', 'KP', 'KR', 'KW', 'KY', 'KZ', 'LA', 'LB', + 'LC', 'LI', 'LK', 'IO', 'LR', 'LS', 'LT', 'LU', 'LV', 'LY', 'MA', 'MC', 'MD', 'ME', 'MF', 'MG', 'MH', 'MK', 'ML', 'MM', 'MN', + 'MS', 'MT', 'MU', 'MV', 'MW', 'MX', 'MY', 'MZ', 'NA', 'NC', 'NE', 'NF', 'NG', 'NI', 'NL', 'NO', 'NP', 'NR', 'NU', 'NZ', 'OM', + 'MO', 'MP', 'MQ', 'MR', 'PA', 'PE', 'PF', 'PG', 'PH', 'PK', 'PL', 'PM', 'PN', 'PR', 'PS', 'PT', 'PW', 'PY', 'QA', 'RE', 'RO', + 'RS', 'RU', 'RW', 'SA', 'SB', 'SC', 'SD', 'SE', 'SG', 'SH', 'SI', 'SJ', 'SK', 'SL', 'SM', 'SN', 'SO', 'SR', 'SS', 'ST', 'SV', + 'SX', 'SY', 'SZ', 'TC', 'TD', 'TF', 'TG', 'TH', 'TJ', 'TK', 'TL', 'TM' + -- Will sometime stay hanging when we add these countries + --, 'TN', 'TO', 'TR', 'TT', 'TV' +); + +CREATE TABLE book_country ( + id SERIAL PRIMARY KEY, + country "Iso31661Alpha2" NOT NULL +); + +INSERT INTO book_country (country) +VALUES ('CZ'), ('DE'); diff --git a/packages/example/src/books/books.queries.ts b/packages/example/src/books/books.queries.ts index 439e401b..383a9a42 100644 --- a/packages/example/src/books/books.queries.ts +++ b/packages/example/src/books/books.queries.ts @@ -1,6 +1,8 @@ /** Types generated for queries found in "src/books/books.sql" */ import { PreparedQuery } from '@pgtyped/runtime'; +export type Iso31661Alpha2 = 'AD' | 'AE' | 'AF' | 'AG' | 'AI' | 'AL' | 'AM' | 'AO' | 'AQ' | 'AR' | 'AS' | 'AT' | 'AU' | 'AW' | 'AX' | 'AZ' | 'BA' | 'BB' | 'BD' | 'BE' | 'BF' | 'BG' | 'BH' | 'BI' | 'BJ' | 'BL' | 'BM' | 'BN' | 'BO' | 'BQ' | 'BR' | 'BS' | 'BT' | 'BV' | 'BW' | 'BY' | 'BZ' | 'CA' | 'CC' | 'CD' | 'CF' | 'CG' | 'CH' | 'CI' | 'CK' | 'CL' | 'CM' | 'CN' | 'CO' | 'CR' | 'CU' | 'CV' | 'CW' | 'CX' | 'CY' | 'CZ' | 'DE' | 'DJ' | 'DK' | 'DM' | 'DO' | 'DZ' | 'EC' | 'EE' | 'EG' | 'EH' | 'ER' | 'ES' | 'ET' | 'FI' | 'FJ' | 'FK' | 'FM' | 'FO' | 'FR' | 'GA' | 'GB' | 'GD' | 'GE' | 'GF' | 'GG' | 'GH' | 'GI' | 'GL' | 'GM' | 'GN' | 'GP' | 'GQ' | 'GR' | 'GS' | 'GT' | 'GU' | 'GW' | 'GY' | 'HK' | 'HM' | 'HN' | 'HR' | 'HT' | 'HU' | 'ID' | 'IE' | 'IL' | 'IM' | 'IN' | 'IO' | 'IQ' | 'IR' | 'IS' | 'IT' | 'JE' | 'JM' | 'JO' | 'JP' | 'KE' | 'KG' | 'KH' | 'KI' | 'KM' | 'KN' | 'KP' | 'KR' | 'KW' | 'KY' | 'KZ' | 'LA' | 'LB' | 'LC' | 'LI' | 'LK' | 'LR' | 'LS' | 'LT' | 'LU' | 'LV' | 'LY' | 'MA' | 'MC' | 'MD' | 'ME' | 'MF' | 'MG' | 'MH' | 'MK' | 'ML' | 'MM' | 'MN' | 'MO' | 'MP' | 'MQ' | 'MR' | 'MS' | 'MT' | 'MU' | 'MV' | 'MW' | 'MX' | 'MY' | 'MZ' | 'NA' | 'NC' | 'NE' | 'NF' | 'NG' | 'NI' | 'NL' | 'NO' | 'NP' | 'NR' | 'NU' | 'NZ' | 'OM' | 'PA' | 'PE' | 'PF' | 'PG' | 'PH' | 'PK' | 'PL' | 'PM' | 'PN' | 'PR' | 'PS' | 'PT' | 'PW' | 'PY' | 'QA' | 'RE' | 'RO' | 'RS' | 'RU' | 'RW' | 'SA' | 'SB' | 'SC' | 'SD' | 'SE' | 'SG' | 'SH' | 'SI' | 'SJ' | 'SK' | 'SL' | 'SM' | 'SN' | 'SO' | 'SR' | 'SS' | 'ST' | 'SV' | 'SX' | 'SY' | 'SZ' | 'TC' | 'TD' | 'TF' | 'TG' | 'TH' | 'TJ' | 'TK' | 'TL' | 'TM'; + export type category = 'novel' | 'science-fiction' | 'thriller'; export type categoryArray = (category)[]; @@ -317,3 +319,29 @@ const getBooksIR: any = {"usedParamSet":{},"params":[],"statement":"SELECT id, n export const getBooks = new PreparedQuery(getBooksIR); +/** 'GetBookCountries' parameters type */ +export type IGetBookCountriesParams = void; + +/** 'GetBookCountries' return type */ +export interface IGetBookCountriesResult { + country: Iso31661Alpha2; + id: number; +} + +/** 'GetBookCountries' query type */ +export interface IGetBookCountriesQuery { + params: IGetBookCountriesParams; + result: IGetBookCountriesResult; +} + +const getBookCountriesIR: any = {"usedParamSet":{},"params":[],"statement":"SELECT * FROM book_country"}; + +/** + * Query generated from SQL: + * ``` + * SELECT * FROM book_country + * ``` + */ +export const getBookCountries = new PreparedQuery(getBookCountriesIR); + + diff --git a/packages/example/src/books/books.sql b/packages/example/src/books/books.sql index 769de41b..8c04208c 100644 --- a/packages/example/src/books/books.sql +++ b/packages/example/src/books/books.sql @@ -58,3 +58,6 @@ SELECT array_agg(email) as "emails!", array_agg(age) = :testAges as ageTest FROM /* @name GetBooks */ SELECT id, name as "name!" FROM books; + +/* @name GetBookCountries */ +SELECT * FROM book_country; diff --git a/packages/query/src/actions.ts b/packages/query/src/actions.ts index e46dc68d..9330a35f 100644 --- a/packages/query/src/actions.ts +++ b/packages/query/src/actions.ts @@ -89,15 +89,20 @@ export async function startup( response: SASLContinueResponse, }); - const finalMessage = await queue.multiMessageReply( - messages.authenticationSASLFinal, - messages.authenticationOk, - messages.parameterStatus, - messages.backendKeyData, - messages.readyForQuery, - ); + const finalSASL = await queue.reply(messages.authenticationSASLFinal); + await queue.reply(messages.authenticationOk); + while (true) { + const res = await queue.reply( + messages.parameterStatus, + messages.backendKeyData, + messages.readyForQuery, + ); + // break when we get readyForQuery + if ('trxStatus' in res) { + break; + } + } - const finalSASL = finalMessage.AuthenticationSASLFinal; if ('SASLData' in finalSASL) { checkServerFinalMessage(finalSASL.SASLData, calculatedServerSignature); } else { diff --git a/packages/wire/src/protocol.test.ts b/packages/wire/src/protocol.test.ts index f947fea6..8dc958d4 100644 --- a/packages/wire/src/protocol.test.ts +++ b/packages/wire/src/protocol.test.ts @@ -1,9 +1,10 @@ import { messages } from '../src/messages.js'; import { buildMessage, + IMessagePayload, parseMessage, - parseMultiple, parseOneOf, + ParseResult, } from '../src/protocol.js'; test('buildMessage for StartupMessage works', () => { @@ -224,6 +225,7 @@ test('parseMessage for NoData works', () => { const buf = Buffer.from([0x6e, 0x00, 0x00, 0x00, 0x04]); const result = parseMessage(messages.noData, buf); + assertParseSuccess(result); const { bufferOffset } = result; @@ -320,6 +322,14 @@ test('parseOneOf results in MessageMismatchError when no message matches buffer' expect(result.bufferOffset).toBe(buf.length); }); +function assertParseSuccess( + result: ParseResult, +): asserts result is IMessagePayload { + if (result.type !== 'MessagePayload') { + throw new Error('Expected MessagePayload'); + } +} + test('parseMultiple works with SASL Authentication example with two parameter statuses', () => { // prettier-ignore const buf = Buffer.from([ @@ -340,44 +350,75 @@ test('parseMultiple works with SASL Authentication example with two parameter st 0x5a, 0x00, 0x00, 0x00, 0x05, 0x49, ]); - let bufOffset = 0; - const results = parseMultiple( - [ + let bufferOffset = 0; + { + const result = parseMessage( messages.authenticationSASLFinal, - messages.authenticationOk, - messages.parameterStatus, - messages.readyForQuery, - ], - buf, - bufOffset, - ); - const expectedMessageData = [ - { - status: null, - SASLData: 'v=RmE40SXBwskKrDeQaFSQTkug/X/p06V20bXnyxRXWbs=', - }, - { - status: null, - }, - { - name: 'client_encoding', - value: 'UTF8', - }, - { - name: 'DateStyle', - value: 'ISO, DMY', - }, - { trxStatus: 'I' }, - ]; - results.map((result, i) => { - if (result.type !== 'MessagePayload') { - throw new Error(`Expected MessagePayload for message at index: ${i}`); - } - const { data, bufferOffset } = result; - expect(data).toEqual(expectedMessageData[i]); - - bufOffset = bufferOffset; - }); - - expect(bufOffset).toBe(buf.length); + buf, + bufferOffset, + ); + assertParseSuccess(result); + bufferOffset = result.bufferOffset; + expect(result).toEqual({ + messageName: 'AuthenticationSASLFinal', + type: 'MessagePayload', + bufferOffset: 55, + data: { + status: null, + SASLData: 'v=RmE40SXBwskKrDeQaFSQTkug/X/p06V20bXnyxRXWbs=', + }, + }); + } + { + const result = parseMessage(messages.authenticationOk, buf, bufferOffset); + assertParseSuccess(result); + bufferOffset = result.bufferOffset; + expect(result).toEqual({ + messageName: 'AuthenticationOk', + type: 'MessagePayload', + bufferOffset: 64, + data: { + status: null, + }, + }); + } + { + const result = parseMessage(messages.parameterStatus, buf, bufferOffset); + assertParseSuccess(result); + bufferOffset = result.bufferOffset; + expect(result).toEqual({ + messageName: 'ParameterStatus', + type: 'MessagePayload', + bufferOffset: 90, + data: { + name: 'client_encoding', + value: 'UTF8', + }, + }); + } + { + const result = parseMessage(messages.parameterStatus, buf, bufferOffset); + assertParseSuccess(result); + bufferOffset = result.bufferOffset; + expect(result).toEqual({ + messageName: 'ParameterStatus', + type: 'MessagePayload', + bufferOffset: 114, + data: { + name: 'DateStyle', + value: 'ISO, DMY', + }, + }); + } + { + const result = parseMessage(messages.readyForQuery, buf, bufferOffset); + expect(result).toEqual({ + messageName: 'ReadyForQuery', + type: 'MessagePayload', + bufferOffset: 120, + data: { + trxStatus: 'I', + }, + }); + } }); diff --git a/packages/wire/src/protocol.ts b/packages/wire/src/protocol.ts index 3cf7d6cc..1a8bf0ac 100644 --- a/packages/wire/src/protocol.ts +++ b/packages/wire/src/protocol.ts @@ -77,6 +77,11 @@ interface IMessageMismatchError { bufferOffset: number; } +interface IIncompleteMessageError { + type: 'IncompleteMessageError'; + messageName: string; +} + interface IServerError { type: 'ServerError'; severity: @@ -95,7 +100,8 @@ interface IServerError { export type ParseResult = | IMessagePayload | IMessageMismatchError - | IServerError; + | IServerError + | IIncompleteMessageError; const errorResponseMessageIndicator = pgMessages.errorResponse.indicator.charCodeAt(0); @@ -119,6 +125,13 @@ export const parseMessage = ( // Add extra one because message id isnt counted into size const messageEnd = messageSize + messageOffset + 1; + if (messageEnd > buf.length) { + return { + type: 'IncompleteMessageError', + messageName: message.name, + }; + } + if (indicator !== expectedIndicator && !isUnexpectedErrorMessage) { return { type: 'MessageMismatchError', @@ -272,24 +285,3 @@ export const parseOneOf = ( bufferOffset: lastBufferOffset, }; }; - -export const parseMultiple = ( - messages: Array>, - buffer: Buffer, - offset: number, -): ParseResult[] => { - const result: ParseResult[] = []; - const bufferEnd = buffer.byteLength; - let lastBufferOffset = offset; - - while (lastBufferOffset < bufferEnd) { - const parseResult = parseOneOf(messages, buffer, lastBufferOffset); - if (parseResult.type !== 'MessageMismatchError') { - result.push(parseResult); - lastBufferOffset = parseResult.bufferOffset; - } else { - return [parseResult]; - } - } - return result; -}; diff --git a/packages/wire/src/queue.ts b/packages/wire/src/queue.ts index a53193f7..54e1b66d 100644 --- a/packages/wire/src/queue.ts +++ b/packages/wire/src/queue.ts @@ -4,7 +4,6 @@ import * as tls from 'tls'; import { buildMessage, parseMessage, - parseMultiple, parseOneOf, ParseResult, } from './protocol.js'; @@ -18,16 +17,13 @@ type Box = T extends IServerMessage ? P : any; type Boxified = { [P in keyof T]: Box }; export class AsyncQueue { - public queue: Buffer[] = []; public bufferOffset: number = 0; + public buffer: Buffer = Buffer.alloc(0); public socket: net.Socket; public replyPending: { resolve: (data: any) => any; reject: (data: any) => any; - parser: ( - buf: Buffer, - offset: number, - ) => ParseResult | ParseResult[]; + parser: (buf: Buffer, offset: number) => ParseResult; } | null = null; constructor() { this.socket = new net.Socket({}); @@ -43,7 +39,7 @@ export class AsyncQueue { const attachDataListener = () => { this.socket.on('data', (buffer: Buffer) => { debug('received %o bytes', buffer.length); - this.queue.push(buffer); + this.buffer = Buffer.concat([this.buffer, buffer]); this.processQueue(); }); }; @@ -120,58 +116,32 @@ export class AsyncQueue { } public processQueue() { - if (!this.replyPending || this.queue.length === 0) { + if (!this.replyPending || this.buffer.length === 0) { return; } - const buf = this.queue[0]; - - const parsed = this.replyPending.parser(buf, this.bufferOffset); - - if (Array.isArray(parsed)) { - // Move queue cursor in any case - const lastBufferOffset = parsed[parsed.length - 1].bufferOffset; - if (lastBufferOffset >= buf.length) { - this.bufferOffset = 0; - this.queue.pop(); - } else { - this.bufferOffset = lastBufferOffset; - } + const parsed = this.replyPending.parser(this.buffer, this.bufferOffset); - const res = parsed.reduce( - (acc, result) => ({ - ...acc, - ...(result.type !== 'ServerError' && - result.type !== 'MessageMismatchError' - ? { [result.messageName]: result.data } - : {}), - }), - {}, - ) as Record, IServerMessage>; - - if (!Object.keys(res).length) { - this.replyPending.reject(parsed); - } else { - debug('resolved awaited %o message', res); - this.replyPending.resolve(res); - } + if (parsed.type === 'IncompleteMessageError') { + debug('received incomplete message'); + return; + } + + // Move queue cursor in any case + if (parsed.bufferOffset === this.buffer.length) { + this.bufferOffset = 0; + this.buffer = Buffer.alloc(0); } else { - // Move queue cursor in any case - if (parsed.bufferOffset >= buf.length) { - this.bufferOffset = 0; - this.queue.pop(); - } else { - this.bufferOffset = parsed.bufferOffset; - } + this.bufferOffset = parsed.bufferOffset; + } - if (parsed.type === 'ServerError') { - this.replyPending.reject(parsed); - } else if (parsed.type === 'MessagePayload') { - debug('resolved awaited %o message', parsed.messageName); - this.replyPending.resolve(parsed.data); - } else { - debug('received ignored message'); - this.processQueue(); - } + if (parsed.type === 'ServerError') { + this.replyPending.reject(parsed); + } else if (parsed.type === 'MessagePayload') { + debug('resolved awaited %o message', parsed.messageName); + this.replyPending.resolve(parsed.data); + } else { + debug('received ignored message'); + this.processQueue(); } } /** @@ -199,24 +169,4 @@ export class AsyncQueue { this.processQueue(); }); } - - /** - * Waits for the next buffer consisting of multiple messages to arrive and parses it, resolving with the parsed - * values. - * @param serverMessages The array of messages to match - * @returns The parsed params - */ - public async multiMessageReply>>( - ...serverMessages: Messages - ): Promise> { - return new Promise((resolve, reject) => { - this.replyPending = { - resolve, - reject, - parser: (buf: Buffer, offset: number) => - parseMultiple(serverMessages, buf, offset), - }; - this.processQueue(); - }); - } }