Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix fragmented TCP packet handling #487

Merged
merged 8 commits into from
Feb 2, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
"scripts": {
"clean": "rm -r packages/*/lib",
"build": "lerna run build",
"watch": "lerna run --parallel watch",
"watch": "lerna run --parallel --stream watch -- --preserveWatchOutput",
"lint": "tslint --project tsconfig.json -t verbose",
"lint!": "npm run lint -- --fix",
"test": "npm run lint && lerna run --stream test"
Expand Down
24 changes: 24 additions & 0 deletions packages/example/sql/schema.sql
Original file line number Diff line number Diff line change
Expand Up @@ -77,3 +77,27 @@ VALUES (1, 'Black Swan', 1),
INSERT INTO book_comments (user_id, book_id, body)
VALUES (1, 1, 'Fantastic read, recommend it!'),
(1, 2, 'Did not like it, expected much more...');

CREATE TYPE "Iso31661Alpha2" AS ENUM (
'AD', 'AE', 'AF', 'AG', 'AI', 'AL', 'AM', 'AO', 'AQ', 'AR', 'AS', 'AT', 'AU', 'AW', 'AX', 'AZ', 'BA', 'BB', 'BD', 'BE', 'BF',
'BG', 'BH', 'BI', 'BJ', 'BL', 'BM', 'BN', 'BO', 'BQ', 'BR', 'BS', 'BT', 'BV', 'BW', 'BY', 'BZ', 'CA', 'CC', 'CD', 'CF', 'CG',
'CH', 'CI', 'CK', 'CL', 'CM', 'CN', 'CO', 'CR', 'CU', 'CV', 'CW', 'CX', 'CY', 'CZ', 'DE', 'DJ', 'DK', 'DM', 'DO', 'DZ', 'EC',
'EE', 'EG', 'EH', 'ER', 'ES', 'ET', 'FI', 'FJ', 'FK', 'FM', 'FO', 'FR', 'GA', 'GB', 'GD', 'GE', 'GF', 'GG', 'GH', 'GI', 'GL',
'GM', 'GN', 'GP', 'GQ', 'GR', 'GS', 'GT', 'GU', 'GW', 'GY', 'HK', 'HM', 'HN', 'HR', 'HT', 'HU', 'ID', 'IE', 'IL', 'IM', 'IN',
'IQ', 'IR', 'IS', 'IT', 'JE', 'JM', 'JO', 'JP', 'KE', 'KG', 'KH', 'KI', 'KM', 'KN', 'KP', 'KR', 'KW', 'KY', 'KZ', 'LA', 'LB',
'LC', 'LI', 'LK', 'IO', 'LR', 'LS', 'LT', 'LU', 'LV', 'LY', 'MA', 'MC', 'MD', 'ME', 'MF', 'MG', 'MH', 'MK', 'ML', 'MM', 'MN',
'MS', 'MT', 'MU', 'MV', 'MW', 'MX', 'MY', 'MZ', 'NA', 'NC', 'NE', 'NF', 'NG', 'NI', 'NL', 'NO', 'NP', 'NR', 'NU', 'NZ', 'OM',
'MO', 'MP', 'MQ', 'MR', 'PA', 'PE', 'PF', 'PG', 'PH', 'PK', 'PL', 'PM', 'PN', 'PR', 'PS', 'PT', 'PW', 'PY', 'QA', 'RE', 'RO',
'RS', 'RU', 'RW', 'SA', 'SB', 'SC', 'SD', 'SE', 'SG', 'SH', 'SI', 'SJ', 'SK', 'SL', 'SM', 'SN', 'SO', 'SR', 'SS', 'ST', 'SV',
'SX', 'SY', 'SZ', 'TC', 'TD', 'TF', 'TG', 'TH', 'TJ', 'TK', 'TL', 'TM'
-- Will sometime stay hanging when we add these countries
--, 'TN', 'TO', 'TR', 'TT', 'TV'
);

CREATE TABLE book_country (
id SERIAL PRIMARY KEY,
country "Iso31661Alpha2" NOT NULL
);

INSERT INTO book_country (country)
VALUES ('CZ'), ('DE');
28 changes: 28 additions & 0 deletions packages/example/src/books/books.queries.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
/** Types generated for queries found in "src/books/books.sql" */
import { PreparedQuery } from '@pgtyped/runtime';

export type Iso31661Alpha2 = 'AD' | 'AE' | 'AF' | 'AG' | 'AI' | 'AL' | 'AM' | 'AO' | 'AQ' | 'AR' | 'AS' | 'AT' | 'AU' | 'AW' | 'AX' | 'AZ' | 'BA' | 'BB' | 'BD' | 'BE' | 'BF' | 'BG' | 'BH' | 'BI' | 'BJ' | 'BL' | 'BM' | 'BN' | 'BO' | 'BQ' | 'BR' | 'BS' | 'BT' | 'BV' | 'BW' | 'BY' | 'BZ' | 'CA' | 'CC' | 'CD' | 'CF' | 'CG' | 'CH' | 'CI' | 'CK' | 'CL' | 'CM' | 'CN' | 'CO' | 'CR' | 'CU' | 'CV' | 'CW' | 'CX' | 'CY' | 'CZ' | 'DE' | 'DJ' | 'DK' | 'DM' | 'DO' | 'DZ' | 'EC' | 'EE' | 'EG' | 'EH' | 'ER' | 'ES' | 'ET' | 'FI' | 'FJ' | 'FK' | 'FM' | 'FO' | 'FR' | 'GA' | 'GB' | 'GD' | 'GE' | 'GF' | 'GG' | 'GH' | 'GI' | 'GL' | 'GM' | 'GN' | 'GP' | 'GQ' | 'GR' | 'GS' | 'GT' | 'GU' | 'GW' | 'GY' | 'HK' | 'HM' | 'HN' | 'HR' | 'HT' | 'HU' | 'ID' | 'IE' | 'IL' | 'IM' | 'IN' | 'IO' | 'IQ' | 'IR' | 'IS' | 'IT' | 'JE' | 'JM' | 'JO' | 'JP' | 'KE' | 'KG' | 'KH' | 'KI' | 'KM' | 'KN' | 'KP' | 'KR' | 'KW' | 'KY' | 'KZ' | 'LA' | 'LB' | 'LC' | 'LI' | 'LK' | 'LR' | 'LS' | 'LT' | 'LU' | 'LV' | 'LY' | 'MA' | 'MC' | 'MD' | 'ME' | 'MF' | 'MG' | 'MH' | 'MK' | 'ML' | 'MM' | 'MN' | 'MO' | 'MP' | 'MQ' | 'MR' | 'MS' | 'MT' | 'MU' | 'MV' | 'MW' | 'MX' | 'MY' | 'MZ' | 'NA' | 'NC' | 'NE' | 'NF' | 'NG' | 'NI' | 'NL' | 'NO' | 'NP' | 'NR' | 'NU' | 'NZ' | 'OM' | 'PA' | 'PE' | 'PF' | 'PG' | 'PH' | 'PK' | 'PL' | 'PM' | 'PN' | 'PR' | 'PS' | 'PT' | 'PW' | 'PY' | 'QA' | 'RE' | 'RO' | 'RS' | 'RU' | 'RW' | 'SA' | 'SB' | 'SC' | 'SD' | 'SE' | 'SG' | 'SH' | 'SI' | 'SJ' | 'SK' | 'SL' | 'SM' | 'SN' | 'SO' | 'SR' | 'SS' | 'ST' | 'SV' | 'SX' | 'SY' | 'SZ' | 'TC' | 'TD' | 'TF' | 'TG' | 'TH' | 'TJ' | 'TK' | 'TL' | 'TM';

export type category = 'novel' | 'science-fiction' | 'thriller';

export type categoryArray = (category)[];
Expand Down Expand Up @@ -317,3 +319,29 @@ const getBooksIR: any = {"usedParamSet":{},"params":[],"statement":"SELECT id, n
export const getBooks = new PreparedQuery<IGetBooksParams,IGetBooksResult>(getBooksIR);


/** 'GetBookCountries' parameters type */
export type IGetBookCountriesParams = void;

/** 'GetBookCountries' return type */
export interface IGetBookCountriesResult {
country: Iso31661Alpha2;
id: number;
}

/** 'GetBookCountries' query type */
export interface IGetBookCountriesQuery {
params: IGetBookCountriesParams;
result: IGetBookCountriesResult;
}

const getBookCountriesIR: any = {"usedParamSet":{},"params":[],"statement":"SELECT * FROM book_country"};

/**
* Query generated from SQL:
* ```
* SELECT * FROM book_country
* ```
*/
export const getBookCountries = new PreparedQuery<IGetBookCountriesParams,IGetBookCountriesResult>(getBookCountriesIR);


3 changes: 3 additions & 0 deletions packages/example/src/books/books.sql
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ SELECT array_agg(email) as "emails!", array_agg(age) = :testAges as ageTest FROM

/* @name GetBooks */
SELECT id, name as "name!" FROM books;

/* @name GetBookCountries */
SELECT * FROM book_country;
21 changes: 13 additions & 8 deletions packages/query/src/actions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -89,15 +89,20 @@ export async function startup(
response: SASLContinueResponse,
});

const finalMessage = await queue.multiMessageReply(
messages.authenticationSASLFinal,
messages.authenticationOk,
messages.parameterStatus,
messages.backendKeyData,
messages.readyForQuery,
);
const finalSASL = await queue.reply(messages.authenticationSASLFinal);
await queue.reply(messages.authenticationOk);
while (true) {
const res = await queue.reply(
messages.parameterStatus,
messages.backendKeyData,
messages.readyForQuery,
);
// break when we get readyForQuery
if ('trxStatus' in res) {
break;
}
}

const finalSASL = finalMessage.AuthenticationSASLFinal;
if ('SASLData' in finalSASL) {
checkServerFinalMessage(finalSASL.SASLData, calculatedServerSignature);
} else {
Expand Down
121 changes: 81 additions & 40 deletions packages/wire/src/protocol.test.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import { messages } from '../src/messages.js';
import {
buildMessage,
IMessagePayload,
parseMessage,
parseMultiple,
parseOneOf,
ParseResult,
} from '../src/protocol.js';

test('buildMessage for StartupMessage works', () => {
Expand Down Expand Up @@ -224,6 +225,7 @@ test('parseMessage for NoData works', () => {
const buf = Buffer.from([0x6e, 0x00, 0x00, 0x00, 0x04]);

const result = parseMessage(messages.noData, buf);
assertParseSuccess(result);

const { bufferOffset } = result;

Expand Down Expand Up @@ -320,6 +322,14 @@ test('parseOneOf results in MessageMismatchError when no message matches buffer'
expect(result.bufferOffset).toBe(buf.length);
});

function assertParseSuccess<A>(
result: ParseResult<A>,
): asserts result is IMessagePayload<A> {
if (result.type !== 'MessagePayload') {
throw new Error('Expected MessagePayload');
}
}

test('parseMultiple works with SASL Authentication example with two parameter statuses', () => {
// prettier-ignore
const buf = Buffer.from([
Expand All @@ -340,44 +350,75 @@ test('parseMultiple works with SASL Authentication example with two parameter st

0x5a, 0x00, 0x00, 0x00, 0x05, 0x49,
]);
let bufOffset = 0;
const results = parseMultiple(
[
let bufferOffset = 0;
{
const result = parseMessage(
messages.authenticationSASLFinal,
messages.authenticationOk,
messages.parameterStatus,
messages.readyForQuery,
],
buf,
bufOffset,
);
const expectedMessageData = [
{
status: null,
SASLData: 'v=RmE40SXBwskKrDeQaFSQTkug/X/p06V20bXnyxRXWbs=',
},
{
status: null,
},
{
name: 'client_encoding',
value: 'UTF8',
},
{
name: 'DateStyle',
value: 'ISO, DMY',
},
{ trxStatus: 'I' },
];
results.map((result, i) => {
if (result.type !== 'MessagePayload') {
throw new Error(`Expected MessagePayload for message at index: ${i}`);
}
const { data, bufferOffset } = result;
expect(data).toEqual(expectedMessageData[i]);

bufOffset = bufferOffset;
});

expect(bufOffset).toBe(buf.length);
buf,
bufferOffset,
);
assertParseSuccess(result);
bufferOffset = result.bufferOffset;
expect(result).toEqual({
messageName: 'AuthenticationSASLFinal',
type: 'MessagePayload',
bufferOffset: 55,
data: {
status: null,
SASLData: 'v=RmE40SXBwskKrDeQaFSQTkug/X/p06V20bXnyxRXWbs=',
},
});
}
{
const result = parseMessage(messages.authenticationOk, buf, bufferOffset);
assertParseSuccess(result);
bufferOffset = result.bufferOffset;
expect(result).toEqual({
messageName: 'AuthenticationOk',
type: 'MessagePayload',
bufferOffset: 64,
data: {
status: null,
},
});
}
{
const result = parseMessage(messages.parameterStatus, buf, bufferOffset);
assertParseSuccess(result);
bufferOffset = result.bufferOffset;
expect(result).toEqual({
messageName: 'ParameterStatus',
type: 'MessagePayload',
bufferOffset: 90,
data: {
name: 'client_encoding',
value: 'UTF8',
},
});
}
{
const result = parseMessage(messages.parameterStatus, buf, bufferOffset);
assertParseSuccess(result);
bufferOffset = result.bufferOffset;
expect(result).toEqual({
messageName: 'ParameterStatus',
type: 'MessagePayload',
bufferOffset: 114,
data: {
name: 'DateStyle',
value: 'ISO, DMY',
},
});
}
{
const result = parseMessage(messages.readyForQuery, buf, bufferOffset);
expect(result).toEqual({
messageName: 'ReadyForQuery',
type: 'MessagePayload',
bufferOffset: 120,
data: {
trxStatus: 'I',
},
});
}
});
36 changes: 14 additions & 22 deletions packages/wire/src/protocol.ts
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,11 @@ interface IMessageMismatchError {
bufferOffset: number;
}

interface IIncompleteMessageError {
type: 'IncompleteMessageError';
messageName: string;
}

interface IServerError {
type: 'ServerError';
severity:
Expand All @@ -95,7 +100,8 @@ interface IServerError {
export type ParseResult<Params> =
| IMessagePayload<Params>
| IMessageMismatchError
| IServerError;
| IServerError
| IIncompleteMessageError;

const errorResponseMessageIndicator =
pgMessages.errorResponse.indicator.charCodeAt(0);
Expand All @@ -119,6 +125,13 @@ export const parseMessage = <Params extends object>(
// Add extra one because message id isnt counted into size
const messageEnd = messageSize + messageOffset + 1;

if (messageEnd > buf.length) {
return {
type: 'IncompleteMessageError',
messageName: message.name,
};
}

if (indicator !== expectedIndicator && !isUnexpectedErrorMessage) {
return {
type: 'MessageMismatchError',
Expand Down Expand Up @@ -272,24 +285,3 @@ export const parseOneOf = (
bufferOffset: lastBufferOffset,
};
};

export const parseMultiple = (
messages: Array<IServerMessage<any>>,
buffer: Buffer,
offset: number,
): ParseResult<object>[] => {
const result: ParseResult<object>[] = [];
const bufferEnd = buffer.byteLength;
let lastBufferOffset = offset;

while (lastBufferOffset < bufferEnd) {
const parseResult = parseOneOf(messages, buffer, lastBufferOffset);
if (parseResult.type !== 'MessageMismatchError') {
result.push(parseResult);
lastBufferOffset = parseResult.bufferOffset;
} else {
return [parseResult];
}
}
return result;
};
Loading