Skip to content

Commit

Permalink
Improve jitter in readmodel-postgres adapter (#2194)
Browse files Browse the repository at this point in the history
  • Loading branch information
IhostVlad authored Jan 12, 2022
1 parent cff6706 commit 3aab783
Show file tree
Hide file tree
Showing 9 changed files with 203 additions and 57 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ export type OmitObject<T extends object, U extends object> = {

export type BuildInfo = {
eventsWithCursors?: Array<EventWithCursor>
initiator: 'command' | 'read-model-next'
initiator: 'command' | 'command-foreign' | 'read-model-next'
notificationId: string
sendTime: number
coldStart?: boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
"jsx": "react",
"outDir": "lib",
"declarationDir": "types",
"composite": true,
"typeRoots": [
"./typings",
"./node_modules/@types",
"../../../../../node_modules/@types"
]
},
"include": ["src", "typings", "test"],
"references": [
{
"path": "../readmodel-base"
}
]
"include": ["src", "typings", "test"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ const build: ExternalMethods['build'] = async (
const getVacantTimeInMillis = () =>
Math.max(inputGetVacantTimeInMillis() - immediatelyStopTimeout, 0)
eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis)
const { eventsWithCursors, ...inputMetricData } = buildInfo
const { eventsWithCursors, retryAttempt, ...inputMetricData } = buildInfo
const metricData = {
...inputMetricData,
pureLedgerTime: 0,
Expand Down Expand Up @@ -950,11 +950,17 @@ const build: ExternalMethods['build'] = async (
}
}
} catch (error) {
const nextArgs: Parameters<typeof next> = [
Math.min(Math.pow(2, ~~retryAttempt) * 100, 10000),
{ retryAttempt: ~~retryAttempt + 1 },
]

if (error === immediatelyStopError) {
try {
await basePool.connection.end()
} catch (e) {}
await next()

await next(...nextArgs)
return
}

Expand Down Expand Up @@ -993,7 +999,7 @@ const build: ExternalMethods['build'] = async (
error.name === 'ServiceBusyError'
) {
log.debug(`PassthroughError is retryable. Going to the next step`)
await next()
await next(...nextArgs)
}
} finally {
log.debug(`Building is finished`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
OmitObject,
} from './types'

const MAX_DISCONNECT_TIME = 3000
const connect: CurrentConnectMethod = async (imports, pool, options) => {
let {
tablePrefix,
Expand Down Expand Up @@ -70,7 +71,6 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
connectionErrorsMap.get(connection)?.push(error)
})
await connection.connect()
await connection.query('SELECT 0 AS "defunct"')
} catch (error) {
connectionErrorsMap.get(connection)?.push(error)
}
Expand Down Expand Up @@ -99,17 +99,27 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
pool.connection = null!
}

affectedReadModelOperationsSet.clear()
let timeout: NodeJS.Timeout | null = null
try {
affectedReadModelOperationsSet.clear()
await connection.end()
} catch (err) {}
await Promise.race([
new Promise((resolve) => {
timeout = setTimeout(resolve, MAX_DISCONNECT_TIME)
}),
connection.end(),
])
} catch (err) {
} finally {
if (timeout != null) {
clearTimeout(timeout)
}
}

throw summaryError
}
}

const initialConnection = await establishConnection()
await maybeThrowConnectionErrors(initialConnection)

const inlineLedgerRunQuery: InlineLedgerRunQueryMethod = async (
sql,
passthroughRuntimeErrors = false
Expand Down Expand Up @@ -184,7 +194,8 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
schemaName: databaseName,
tablePrefix,
inlineLedgerRunQuery,
connection: initialConnection,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
connection: null!,
activePassthrough: false,
buildMode,
useSqs,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { CurrentDisconnectMethod } from './types'

const MAX_DISCONNECT_TIME = 10000
const MAX_DISCONNECT_TIME = 3000

const disconnect: CurrentDisconnectMethod = async (pool) => {
if (pool.connection != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"on",
"error",
[Function],
],
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -90,6 +74,14 @@ Array [
\\"AggregateIds\\" = 'null'
",
],
Array [
"on",
"error",
[Function],
],
Array [
"connect",
],
Array [
"query",
"
Expand Down Expand Up @@ -477,10 +469,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -774,10 +762,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -1057,10 +1041,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -1462,10 +1442,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -1735,10 +1711,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down
1 change: 1 addition & 0 deletions tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"test:read-model": "jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s --runInBand",
"test:read-model-mysql": "env TEST_MYSQL=true jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s",
"test:read-model-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s --runInBand",
"test:read-model-builder-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/read-model-builder-postgres/*.test.[jt]s --runInBand",
"test:eventstore-order-events": "jest --config=./jest.config.js --testMatch=**/eventstore-order-events/*.test.[jt]s",
"test:eventstore": "jest --config=./jest.config.js --testMatch=**/eventstore-*/*.test.[jt]s",
"test:eventstore-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/eventstore-*/*.test.[jt]s --runInBand",
Expand Down
162 changes: 162 additions & 0 deletions tests/read-model-builder-postgres/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import {
default as factory,
isPostgres,
adapters,
} from '../readmodel-test-utils'
type UnPromise<T> = T extends Promise<T> ? T : never
const maybeDescribe = isPostgres() ? describe : describe.skip
jest.setTimeout(60000)

maybeDescribe('Postgres reconnections with delay', () => {
const uniqueName = 'postgres-reconnections-delay' as const
const readModelName = 'PostgresReconnectionsDelay' as const
const getDelay = (delay: number | null) => (delay != null ? +delay : 0)
const delayFunction = async (delay: number) =>
await new Promise((resolve) => setTimeout(resolve, getDelay(delay)))
const eventType = 'EVENT_TYPE'
const getNotificationObj = (
isNext: boolean,
notificationExtraPayload?: object
) => ({
eventSubscriber: readModelName,
initiator: isNext ? 'read-model-next' : 'command',
notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`,
sendTime: Date.now(),
...(notificationExtraPayload != null ? notificationExtraPayload : {}),
})
const subscriptionOptions = { eventTypes: [eventType], aggregateIds: null }
const currentConnections = new Set()
const currentBuilders = new Set()
let adapter: typeof adapters[typeof uniqueName]
let baseConnection: UnPromise<ReturnType<typeof adapter['connect']>>
const connectionsDelays = new WeakMap<typeof baseConnection, Array<number>>()
let flushWorkers: Function
let buildOnConnection: Function
let performWorkers = true

beforeAll(async () => {
await factory.create(uniqueName)()
adapter = adapters[uniqueName]
baseConnection = await adapter.connect(readModelName)
flushWorkers = async () =>
await adapter.subscribe(
baseConnection,
readModelName,
subscriptionOptions.eventTypes,
subscriptionOptions.aggregateIds,
async () => null
)
const baseSubscribe = flushWorkers
await baseSubscribe()
buildOnConnection = async (
connection: typeof baseConnection,
parameters: any
) => {
if (!performWorkers) {
return
}
let buildPromise
try {
buildPromise = adapter.build(
connection,
readModelName,
connection,
{
acquireInitHandler: async () => delayFunction.bind(null, 100),
acquireEventHandler: async () => delayFunction.bind(null, 5000),
acquireResolver: async () => delayFunction.bind(null, 100),
connectorName: 'default',
name: readModelName,
},
(timeout: number, notificationExtraPayload: object) => {
void (async () => {
if (!connectionsDelays.has(connection)) {
connectionsDelays.set(connection, [])
}
connectionsDelays.get(connection).push(getDelay(timeout))
await buildPromise
await delayFunction(timeout)
await buildOnConnection(
connection,
getNotificationObj(true, notificationExtraPayload)
)
})()
return Promise.resolve(null)
},
{
loadEvents: async ({ cursor }) => ({
events: cursor === 'true' ? [{ type: eventType }] : [],
}),
getNextCursor: (cursor) => (cursor == null ? 'true' : 'false'),
establishTimeLimit: delayFunction,
},
getDelay.bind(null, 60000),
parameters
)
currentBuilders.add(buildPromise)
await buildPromise
} finally {
if (buildPromise != null) {
currentBuilders.delete(buildPromise)
}
}
}
})

afterAll(async () => {
await Promise.all(
Array.from(currentConnections).map(async (connection) => {
try {
await adapter.disconnect(connection, readModelName)
} catch (e) {}
})
)
currentConnections.clear()

await adapter.unsubscribe(baseConnection, readModelName, async () => null)
await adapter.disconnect(baseConnection, readModelName)

await factory.destroy(uniqueName)()
})

test('should perform with correct jitter', async () => {
while (currentConnections.size < 3) {
currentConnections.add(await adapter.connect(readModelName))
}
for (const connection of currentConnections) {
void buildOnConnection(connection, getNotificationObj(false))
}

const stopThrottlingTimestamp = Date.now() + 10000
while (Date.now() < stopThrottlingTimestamp) {
await flushWorkers()
await delayFunction(100)
}

while (currentBuilders.size > 0) {
await delayFunction(500)
}
performWorkers = false

for (const connection of currentConnections) {
const delays = connectionsDelays.get(connection) ?? []
const delaysBase2Degrees = delays
.filter((delay) => delay > 0)
.map((delay) => Math.log2(delay / 100))

for (let index = 0; index < delaysBase2Degrees.length; index++) {
expect(Math.floor(delaysBase2Degrees[index])).toEqual(
delaysBase2Degrees[index]
)
expect(
delaysBase2Degrees[index] === 0 ||
(index > 0
? delaysBase2Degrees[index - 1] + 1 === delaysBase2Degrees[index]
: false)
).toEqual(true)
}
await adapter.disconnect(connection, readModelName)
}
currentConnections.clear()
})
})
Loading

0 comments on commit 3aab783

Please sign in to comment.