Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve jitter in readmodel-postgres adapter #2194

Merged
merged 4 commits into from
Jan 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ export type OmitObject<T extends object, U extends object> = {

export type BuildInfo = {
eventsWithCursors?: Array<EventWithCursor>
initiator: 'command' | 'read-model-next'
initiator: 'command' | 'command-foreign' | 'read-model-next'
notificationId: string
sendTime: number
coldStart?: boolean
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,17 +14,11 @@
"jsx": "react",
"outDir": "lib",
"declarationDir": "types",
"composite": true,
"typeRoots": [
"./typings",
"./node_modules/@types",
"../../../../../node_modules/@types"
]
},
"include": ["src", "typings", "test"],
"references": [
{
"path": "../readmodel-base"
}
]
"include": ["src", "typings", "test"]
}
Original file line number Diff line number Diff line change
Expand Up @@ -758,7 +758,7 @@ const build: ExternalMethods['build'] = async (
const getVacantTimeInMillis = () =>
Math.max(inputGetVacantTimeInMillis() - immediatelyStopTimeout, 0)
eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis)
const { eventsWithCursors, ...inputMetricData } = buildInfo
const { eventsWithCursors, retryAttempt, ...inputMetricData } = buildInfo
const metricData = {
...inputMetricData,
pureLedgerTime: 0,
Expand Down Expand Up @@ -950,11 +950,17 @@ const build: ExternalMethods['build'] = async (
}
}
} catch (error) {
const nextArgs: Parameters<typeof next> = [
Math.min(Math.pow(2, ~~retryAttempt) * 100, 10000),
{ retryAttempt: ~~retryAttempt + 1 },
]

if (error === immediatelyStopError) {
try {
await basePool.connection.end()
} catch (e) {}
await next()

await next(...nextArgs)
return
}

Expand Down Expand Up @@ -993,7 +999,7 @@ const build: ExternalMethods['build'] = async (
error.name === 'ServiceBusyError'
) {
log.debug(`PassthroughError is retryable. Going to the next step`)
await next()
await next(...nextArgs)
}
} finally {
log.debug(`Building is finished`)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import type {
OmitObject,
} from './types'

const MAX_DISCONNECT_TIME = 3000
const connect: CurrentConnectMethod = async (imports, pool, options) => {
let {
tablePrefix,
Expand Down Expand Up @@ -70,7 +71,6 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
connectionErrorsMap.get(connection)?.push(error)
})
await connection.connect()
await connection.query('SELECT 0 AS "defunct"')
} catch (error) {
connectionErrorsMap.get(connection)?.push(error)
}
Expand Down Expand Up @@ -99,17 +99,27 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
pool.connection = null!
}

affectedReadModelOperationsSet.clear()
let timeout: NodeJS.Timeout | null = null
try {
affectedReadModelOperationsSet.clear()
await connection.end()
} catch (err) {}
await Promise.race([
new Promise((resolve) => {
timeout = setTimeout(resolve, MAX_DISCONNECT_TIME)
}),
connection.end(),
])
} catch (err) {
} finally {
if (timeout != null) {
clearTimeout(timeout)
}
}

throw summaryError
}
}

const initialConnection = await establishConnection()
await maybeThrowConnectionErrors(initialConnection)

const inlineLedgerRunQuery: InlineLedgerRunQueryMethod = async (
sql,
passthroughRuntimeErrors = false
Expand Down Expand Up @@ -184,7 +194,8 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => {
schemaName: databaseName,
tablePrefix,
inlineLedgerRunQuery,
connection: initialConnection,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
connection: null!,
activePassthrough: false,
buildMode,
useSqs,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import type { CurrentDisconnectMethod } from './types'

const MAX_DISCONNECT_TIME = 10000
const MAX_DISCONNECT_TIME = 3000

const disconnect: CurrentDisconnectMethod = async (pool) => {
if (pool.connection != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,22 +10,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"on",
"error",
[Function],
],
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -90,6 +74,14 @@ Array [
\\"AggregateIds\\" = 'null'
",
],
Array [
"on",
"error",
[Function],
],
Array [
"connect",
],
Array [
"query",
"
Expand Down Expand Up @@ -477,10 +469,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -774,10 +762,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -1057,10 +1041,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -1462,10 +1442,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down Expand Up @@ -1735,10 +1711,6 @@ Array [
Array [
"connect",
],
Array [
"query",
"SELECT 0 AS \\"defunct\\"",
],
Array [
"query",
"
Expand Down
1 change: 1 addition & 0 deletions tests/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
"test:read-model": "jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s --runInBand",
"test:read-model-mysql": "env TEST_MYSQL=true jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s",
"test:read-model-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s --runInBand",
"test:read-model-builder-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/read-model-builder-postgres/*.test.[jt]s --runInBand",
"test:eventstore-order-events": "jest --config=./jest.config.js --testMatch=**/eventstore-order-events/*.test.[jt]s",
"test:eventstore": "jest --config=./jest.config.js --testMatch=**/eventstore-*/*.test.[jt]s",
"test:eventstore-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/eventstore-*/*.test.[jt]s --runInBand",
Expand Down
162 changes: 162 additions & 0 deletions tests/read-model-builder-postgres/index.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,162 @@
import {
default as factory,
isPostgres,
adapters,
} from '../readmodel-test-utils'
type UnPromise<T> = T extends Promise<T> ? T : never
const maybeDescribe = isPostgres() ? describe : describe.skip
jest.setTimeout(60000)

maybeDescribe('Postgres reconnections with delay', () => {
const uniqueName = 'postgres-reconnections-delay' as const
const readModelName = 'PostgresReconnectionsDelay' as const
const getDelay = (delay: number | null) => (delay != null ? +delay : 0)
const delayFunction = async (delay: number) =>
await new Promise((resolve) => setTimeout(resolve, getDelay(delay)))
const eventType = 'EVENT_TYPE'
const getNotificationObj = (
isNext: boolean,
notificationExtraPayload?: object
) => ({
eventSubscriber: readModelName,
initiator: isNext ? 'read-model-next' : 'command',
notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`,
sendTime: Date.now(),
...(notificationExtraPayload != null ? notificationExtraPayload : {}),
})
const subscriptionOptions = { eventTypes: [eventType], aggregateIds: null }
const currentConnections = new Set()
const currentBuilders = new Set()
let adapter: typeof adapters[typeof uniqueName]
let baseConnection: UnPromise<ReturnType<typeof adapter['connect']>>
const connectionsDelays = new WeakMap<typeof baseConnection, Array<number>>()
let flushWorkers: Function
let buildOnConnection: Function
let performWorkers = true

beforeAll(async () => {
await factory.create(uniqueName)()
adapter = adapters[uniqueName]
baseConnection = await adapter.connect(readModelName)
flushWorkers = async () =>
await adapter.subscribe(
baseConnection,
readModelName,
subscriptionOptions.eventTypes,
subscriptionOptions.aggregateIds,
async () => null
)
const baseSubscribe = flushWorkers
await baseSubscribe()
buildOnConnection = async (
connection: typeof baseConnection,
parameters: any
) => {
if (!performWorkers) {
return
}
let buildPromise
try {
buildPromise = adapter.build(
connection,
readModelName,
connection,
{
acquireInitHandler: async () => delayFunction.bind(null, 100),
acquireEventHandler: async () => delayFunction.bind(null, 5000),
acquireResolver: async () => delayFunction.bind(null, 100),
connectorName: 'default',
name: readModelName,
},
(timeout: number, notificationExtraPayload: object) => {
void (async () => {
if (!connectionsDelays.has(connection)) {
connectionsDelays.set(connection, [])
}
connectionsDelays.get(connection).push(getDelay(timeout))
await buildPromise
await delayFunction(timeout)
await buildOnConnection(
connection,
getNotificationObj(true, notificationExtraPayload)
)
})()
return Promise.resolve(null)
},
{
loadEvents: async ({ cursor }) => ({
events: cursor === 'true' ? [{ type: eventType }] : [],
}),
getNextCursor: (cursor) => (cursor == null ? 'true' : 'false'),
establishTimeLimit: delayFunction,
},
getDelay.bind(null, 60000),
parameters
)
currentBuilders.add(buildPromise)
await buildPromise
} finally {
if (buildPromise != null) {
currentBuilders.delete(buildPromise)
}
}
}
})

afterAll(async () => {
await Promise.all(
Array.from(currentConnections).map(async (connection) => {
try {
await adapter.disconnect(connection, readModelName)
} catch (e) {}
})
)
currentConnections.clear()

await adapter.unsubscribe(baseConnection, readModelName, async () => null)
await adapter.disconnect(baseConnection, readModelName)

await factory.destroy(uniqueName)()
})

test('should perform with correct jitter', async () => {
while (currentConnections.size < 3) {
currentConnections.add(await adapter.connect(readModelName))
}
for (const connection of currentConnections) {
void buildOnConnection(connection, getNotificationObj(false))
}

const stopThrottlingTimestamp = Date.now() + 10000
while (Date.now() < stopThrottlingTimestamp) {
await flushWorkers()
await delayFunction(100)
}

while (currentBuilders.size > 0) {
await delayFunction(500)
}
performWorkers = false

for (const connection of currentConnections) {
const delays = connectionsDelays.get(connection) ?? []
const delaysBase2Degrees = delays
.filter((delay) => delay > 0)
.map((delay) => Math.log2(delay / 100))

for (let index = 0; index < delaysBase2Degrees.length; index++) {
expect(Math.floor(delaysBase2Degrees[index])).toEqual(
delaysBase2Degrees[index]
)
expect(
delaysBase2Degrees[index] === 0 ||
(index > 0
? delaysBase2Degrees[index - 1] + 1 === delaysBase2Degrees[index]
: false)
).toEqual(true)
}
await adapter.disconnect(connection, readModelName)
}
currentConnections.clear()
})
})
Loading