Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve passthrough errors #1944

Merged
merged 2 commits into from
Jul 12, 2021
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -239,7 +239,7 @@ const buildEvents: (

for (metricData.eventLoopCount = 0; true; metricData.eventLoopCount++) {
if (events.length === 0) {
throw new PassthroughError(false, false)
throw new PassthroughError(false)
}
let nextCursorPromise: Promise<ReadModelCursor> = getContinuousLatestCursor(
cursor,
Expand Down Expand Up @@ -527,7 +527,7 @@ const buildEvents: (
await next()
}

throw new PassthroughError(false, false)
throw new PassthroughError(false)
}
}
}
Expand Down Expand Up @@ -648,7 +648,7 @@ const build: ExternalMethods['build'] = async (

let readModelLedger = rows.length === 1 ? rows[0] : null
if (readModelLedger == null || readModelLedger.Errors != null) {
throw new PassthroughError(false, false)
throw new PassthroughError(false)
}

const eventTypes =
Expand Down Expand Up @@ -697,7 +697,7 @@ const build: ExternalMethods['build'] = async (
try {
await inlineLedgerRunQuery(`ROLLBACK`)
} catch (err) {
if (!(err instanceof PassthroughError && err.isEmptyTransaction)) {
if (!(err instanceof PassthroughError)) {
throw err
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,17 +41,47 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
(async () => {
await Promise.resolve()
connectionErrorsMap.set(connection, [])
await connection.connect()
await connection.query('SELECT 0 AS "defunct"')
connection.on('error', (error) => {
try {
connection.on('error', (error) => {
connectionErrorsMap.get(connection)?.push(error)
})
await connection.connect()
await connection.query('SELECT 0 AS "defunct"')
} catch (error) {
connectionErrorsMap.get(connection)?.push(error)
})
}
})()
)
await connectPromiseMap.get(connection)
return connection
}

const maybeThrowConnectionErrors = async (
connection: typeof pool.connection
) => {
const connectionErrors = connectionErrorsMap.get(connection) ?? []
if (connectionErrors.length > 0) {
let summaryError = connectionErrors[0]
if (connectionErrors.length > 1) {
summaryError = new Error(
connectionErrors.map(({ message }) => message).join('\n')
)
summaryError.stack = connectionErrors
.map(({ stack }) => stack)
.join('\n')
}
if (pool.connection === connection) {
pool.connection = null!
}
try {
await connection.end()
} catch (err) {}
throw summaryError
}
}

const initialConnection = await establishConnection()
await maybeThrowConnectionErrors(initialConnection)

const inlineLedgerRunQuery: InlineLedgerRunQueryMethod = async (
sql,
Expand All @@ -61,22 +91,7 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
for (;;) {
try {
const connection = await establishConnection()
const connectionErrors = connectionErrorsMap.get(connection) ?? []
if (connectionErrors.length > 0) {
let summaryError = connectionErrors[0]
if (connectionErrors.length > 1) {
summaryError = new Error(
connectionErrors.map(({ message }) => message).join('\n')
)
summaryError.stack = connectionErrors
.map(({ stack }) => stack)
.join('\n')
}
if (pool.connection === connection) {
pool.connection = null!
}
throw summaryError
}
await maybeThrowConnectionErrors(connection)
result = await connection.query(sql)
break
} catch (error) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@ import type { CurrentDisconnectMethod } from './types'

const disconnect: CurrentDisconnectMethod = async (pool) => {
if (pool.connection != null) {
await pool.connection.end()
try {
await pool.connection.end()
} catch (err) {}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,37 +30,24 @@ const checkFuzzyError = (error: PassthroughErrorLike, value: RegExp): boolean =>
value.test(error.message) || value.test(error.stack)

const PassthroughError: PassthroughErrorFactory = Object.assign(
(function (
this: PassthroughErrorInstance,
isRetryable: boolean,
isEmptyTransaction: boolean
): void {
(function (this: PassthroughErrorInstance, isRetryable: boolean): void {
Error.call(this)
this.name = 'PassthroughError'
this.isRetryable = isRetryable
this.isEmptyTransaction = isEmptyTransaction
if (Error.captureStackTrace) {
Error.captureStackTrace(this, PassthroughError)
} else {
this.stack = new Error().stack
}
} as Function) as ExtractNewable<PassthroughErrorFactory>,
{
isEmptyTransactionError(error: PassthroughErrorLike): boolean {
return (
error != null &&
(checkFormalError(error, PostgresErrors.TRANSACTION_ROLLBACK) ||
checkFormalError(error, PostgresErrors.NO_ACTIVE_SQL_TRANSACTION))
)
},
isRetryablePassthroughError(error: PassthroughErrorLike): boolean {
return (
error != null &&
(PassthroughError.isEmptyTransactionError(error) ||
checkFuzzyError(
error,
/terminating connection due to serverless scale event timeout/i
) ||
(checkFuzzyError(
error,
/terminating connection due to serverless scale event timeout/i
) ||
checkFuzzyError(
error,
/terminating connection due to administrator command/i
Expand All @@ -82,6 +69,8 @@ const PassthroughError: PassthroughErrorFactory = Object.assign(
error,
PostgresErrors.SQLSERVER_REJECTED_ESTABLISHMENT_OF_SQLCONNECTION
) ||
checkFormalError(error, PostgresErrors.TRANSACTION_ROLLBACK) ||
checkFormalError(error, PostgresErrors.NO_ACTIVE_SQL_TRANSACTION) ||
checkFormalError(error, PostgresErrors.IN_FAILED_SQL_TRANSACTION) ||
checkFormalError(error, PostgresErrors.SERIALIZATION_FAILURE) ||
checkFormalError(error, PostgresErrors.DEADLOCK_DETECTED))
Expand Down Expand Up @@ -118,9 +107,8 @@ const PassthroughError: PassthroughErrorFactory = Object.assign(
if (!PassthroughError.isPassthroughError(error, includeRuntimeErrors)) {
throw error
}
const isEmptyTransaction = PassthroughError.isEmptyTransactionError(error)
const isRetryable = PassthroughError.isRetryablePassthroughError(error)
throw new PassthroughError(isRetryable, isEmptyTransaction)
throw new PassthroughError(isRetryable)
},
}
)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,6 @@ export type UpdateToSetExpressionMethod = (
export interface PassthroughErrorInstance extends Error {
name: string
isRetryable: boolean
isEmptyTransaction: boolean
}

export type PassthroughErrorLike = Error & {
Expand All @@ -79,10 +78,7 @@ export type PassthroughErrorLike = Error & {
}

export type PassthroughErrorFactory = {
new (
isRetryable: boolean,
isEmptyTransaction: boolean
): PassthroughErrorInstance
new (isRetryable: boolean): PassthroughErrorInstance
} & {
isRetryablePassthroughError: (error: PassthroughErrorLike) => boolean
isRegularFatalPassthroughError: (error: PassthroughErrorLike) => boolean
Expand All @@ -91,7 +87,6 @@ export type PassthroughErrorFactory = {
error: PassthroughErrorLike,
includeRuntimeErrors: boolean
) => boolean
isEmptyTransactionError: (error: PassthroughErrorLike) => boolean
maybeThrowPassthroughError: (
error: PassthroughErrorLike,
includeRuntimeErrors: boolean
Expand Down
26 changes: 21 additions & 5 deletions packages/runtime/runtime/src/cloud/init-subscriber.js
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,28 @@ const initSubscriber = (resolve, lambdaContext) => {
}

resolve.sendSqsMessage = async (destination, parameters) => {
// Send SQS messages within 1 minute with exponential jitter from 64 ms
const getRemainingSendingTime = ((endTime) => endTime - Date.now()).bind(
null,
Date.now() + 60 * 1000
)
const getJitterTime = (attempt) => Math.pow(2, attempt + 7)

const queueUrl = `https://sqs.${region}.amazonaws.com/${accountId}/${destination}`
await sendMessage({
Region: region,
QueueUrl: queueUrl,
MessageBody: JSON.stringify(parameters),
})
for (let attempt = 0; getRemainingSendingTime() > 0; attempt++) {
try {
await sendMessage({
Region: region,
QueueUrl: queueUrl,
MessageBody: JSON.stringify(parameters),
})
break
} catch (err) {
await new Promise((resolve) =>
setTimeout(resolve, getJitterTime(attempt))
)
}
}
}

resolve.invokeBuildAsync = async (parameters) => {
Expand Down