diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts index 4d04d440b..c7c2d1cd6 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts @@ -288,7 +288,7 @@ export type OmitObject = { export type BuildInfo = { eventsWithCursors?: Array - initiator: 'command' | 'read-model-next' + initiator: 'command' | 'command-foreign' | 'read-model-next' notificationId: string sendTime: number coldStart?: boolean diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-mysql/tsconfig.json b/packages/runtime/adapters/readmodel-adapters/readmodel-mysql/tsconfig.json index 3ab5d2cc8..5ddc772f8 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-mysql/tsconfig.json +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-mysql/tsconfig.json @@ -14,17 +14,11 @@ "jsx": "react", "outDir": "lib", "declarationDir": "types", - "composite": true, "typeRoots": [ "./typings", "./node_modules/@types", "../../../../../node_modules/@types" ] }, - "include": ["src", "typings", "test"], - "references": [ - { - "path": "../readmodel-base" - } - ] + "include": ["src", "typings", "test"] } diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/build.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/build.ts index 40777d7d5..7926f0d4e 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/build.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/build.ts @@ -758,7 +758,7 @@ const build: ExternalMethods['build'] = async ( const getVacantTimeInMillis = () => Math.max(inputGetVacantTimeInMillis() - immediatelyStopTimeout, 0) eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis) - const { eventsWithCursors, ...inputMetricData } = buildInfo + const { eventsWithCursors, retryAttempt, ...inputMetricData } = buildInfo const metricData = { ...inputMetricData, pureLedgerTime: 0, @@ -950,11 +950,17 @@ const build: ExternalMethods['build'] = async ( } } } catch (error) { + const nextArgs: Parameters = [ + Math.min(Math.pow(2, ~~retryAttempt) * 100, 10000), + { retryAttempt: ~~retryAttempt + 1 }, + ] + if (error === immediatelyStopError) { try { await basePool.connection.end() } catch (e) {} - await next() + + await next(...nextArgs) return } @@ -993,7 +999,7 @@ const build: ExternalMethods['build'] = async ( error.name === 'ServiceBusyError' ) { log.debug(`PassthroughError is retryable. Going to the next step`) - await next() + await next(...nextArgs) } } finally { log.debug(`Building is finished`) diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/connect.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/connect.ts index 6d2694c21..eecb8b646 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/connect.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/connect.ts @@ -7,6 +7,7 @@ import type { OmitObject, } from './types' +const MAX_DISCONNECT_TIME = 3000 const connect: CurrentConnectMethod = async (imports, pool, options) => { let { tablePrefix, @@ -70,7 +71,6 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => { connectionErrorsMap.get(connection)?.push(error) }) await connection.connect() - await connection.query('SELECT 0 AS "defunct"') } catch (error) { connectionErrorsMap.get(connection)?.push(error) } @@ -99,17 +99,27 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => { // eslint-disable-next-line @typescript-eslint/no-non-null-assertion pool.connection = null! } + + affectedReadModelOperationsSet.clear() + let timeout: NodeJS.Timeout | null = null try { - affectedReadModelOperationsSet.clear() - await connection.end() - } catch (err) {} + await Promise.race([ + new Promise((resolve) => { + timeout = setTimeout(resolve, MAX_DISCONNECT_TIME) + }), + connection.end(), + ]) + } catch (err) { + } finally { + if (timeout != null) { + clearTimeout(timeout) + } + } + throw summaryError } } - const initialConnection = await establishConnection() - await maybeThrowConnectionErrors(initialConnection) - const inlineLedgerRunQuery: InlineLedgerRunQueryMethod = async ( sql, passthroughRuntimeErrors = false @@ -184,7 +194,8 @@ const connect: CurrentConnectMethod = async (imports, pool, options) => { schemaName: databaseName, tablePrefix, inlineLedgerRunQuery, - connection: initialConnection, + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + connection: null!, activePassthrough: false, buildMode, useSqs, diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/disconnect.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/disconnect.ts index d97764816..b6b5e2634 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/disconnect.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/disconnect.ts @@ -1,6 +1,6 @@ import type { CurrentDisconnectMethod } from './types' -const MAX_DISCONNECT_TIME = 10000 +const MAX_DISCONNECT_TIME = 3000 const disconnect: CurrentDisconnectMethod = async (pool) => { if (pool.connection != null) { diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/test/__snapshots__/index.test.ts.snap b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/test/__snapshots__/index.test.ts.snap index bdec2688a..3b9b5a60a 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/test/__snapshots__/index.test.ts.snap +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/test/__snapshots__/index.test.ts.snap @@ -10,22 +10,6 @@ Array [ Array [ "connect", ], - Array [ - "query", - "SELECT 0 AS \\"defunct\\"", - ], - Array [ - "on", - "error", - [Function], - ], - Array [ - "connect", - ], - Array [ - "query", - "SELECT 0 AS \\"defunct\\"", - ], Array [ "query", " @@ -90,6 +74,14 @@ Array [ \\"AggregateIds\\" = 'null' ", ], + Array [ + "on", + "error", + [Function], + ], + Array [ + "connect", + ], Array [ "query", " @@ -477,10 +469,6 @@ Array [ Array [ "connect", ], - Array [ - "query", - "SELECT 0 AS \\"defunct\\"", - ], Array [ "query", " @@ -774,10 +762,6 @@ Array [ Array [ "connect", ], - Array [ - "query", - "SELECT 0 AS \\"defunct\\"", - ], Array [ "query", " @@ -1057,10 +1041,6 @@ Array [ Array [ "connect", ], - Array [ - "query", - "SELECT 0 AS \\"defunct\\"", - ], Array [ "query", " @@ -1462,10 +1442,6 @@ Array [ Array [ "connect", ], - Array [ - "query", - "SELECT 0 AS \\"defunct\\"", - ], Array [ "query", " @@ -1735,10 +1711,6 @@ Array [ Array [ "connect", ], - Array [ - "query", - "SELECT 0 AS \\"defunct\\"", - ], Array [ "query", " diff --git a/tests/package.json b/tests/package.json index 19034f4bc..ff26181e4 100644 --- a/tests/package.json +++ b/tests/package.json @@ -10,6 +10,7 @@ "test:read-model": "jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s --runInBand", "test:read-model-mysql": "env TEST_MYSQL=true jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s", "test:read-model-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/read-model-store-*/*.test.[jt]s --runInBand", + "test:read-model-builder-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/read-model-builder-postgres/*.test.[jt]s --runInBand", "test:eventstore-order-events": "jest --config=./jest.config.js --testMatch=**/eventstore-order-events/*.test.[jt]s", "test:eventstore": "jest --config=./jest.config.js --testMatch=**/eventstore-*/*.test.[jt]s", "test:eventstore-postgres": "env TEST_POSTGRES=true jest --config=./jest.config.js --testMatch=**/eventstore-*/*.test.[jt]s --runInBand", diff --git a/tests/read-model-builder-postgres/index.test.ts b/tests/read-model-builder-postgres/index.test.ts new file mode 100644 index 000000000..1d870f4c4 --- /dev/null +++ b/tests/read-model-builder-postgres/index.test.ts @@ -0,0 +1,162 @@ +import { + default as factory, + isPostgres, + adapters, +} from '../readmodel-test-utils' +type UnPromise = T extends Promise ? T : never +const maybeDescribe = isPostgres() ? describe : describe.skip +jest.setTimeout(60000) + +maybeDescribe('Postgres reconnections with delay', () => { + const uniqueName = 'postgres-reconnections-delay' as const + const readModelName = 'PostgresReconnectionsDelay' as const + const getDelay = (delay: number | null) => (delay != null ? +delay : 0) + const delayFunction = async (delay: number) => + await new Promise((resolve) => setTimeout(resolve, getDelay(delay))) + const eventType = 'EVENT_TYPE' + const getNotificationObj = ( + isNext: boolean, + notificationExtraPayload?: object + ) => ({ + eventSubscriber: readModelName, + initiator: isNext ? 'read-model-next' : 'command', + notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`, + sendTime: Date.now(), + ...(notificationExtraPayload != null ? notificationExtraPayload : {}), + }) + const subscriptionOptions = { eventTypes: [eventType], aggregateIds: null } + const currentConnections = new Set() + const currentBuilders = new Set() + let adapter: typeof adapters[typeof uniqueName] + let baseConnection: UnPromise> + const connectionsDelays = new WeakMap>() + let flushWorkers: Function + let buildOnConnection: Function + let performWorkers = true + + beforeAll(async () => { + await factory.create(uniqueName)() + adapter = adapters[uniqueName] + baseConnection = await adapter.connect(readModelName) + flushWorkers = async () => + await adapter.subscribe( + baseConnection, + readModelName, + subscriptionOptions.eventTypes, + subscriptionOptions.aggregateIds, + async () => null + ) + const baseSubscribe = flushWorkers + await baseSubscribe() + buildOnConnection = async ( + connection: typeof baseConnection, + parameters: any + ) => { + if (!performWorkers) { + return + } + let buildPromise + try { + buildPromise = adapter.build( + connection, + readModelName, + connection, + { + acquireInitHandler: async () => delayFunction.bind(null, 100), + acquireEventHandler: async () => delayFunction.bind(null, 5000), + acquireResolver: async () => delayFunction.bind(null, 100), + connectorName: 'default', + name: readModelName, + }, + (timeout: number, notificationExtraPayload: object) => { + void (async () => { + if (!connectionsDelays.has(connection)) { + connectionsDelays.set(connection, []) + } + connectionsDelays.get(connection).push(getDelay(timeout)) + await buildPromise + await delayFunction(timeout) + await buildOnConnection( + connection, + getNotificationObj(true, notificationExtraPayload) + ) + })() + return Promise.resolve(null) + }, + { + loadEvents: async ({ cursor }) => ({ + events: cursor === 'true' ? [{ type: eventType }] : [], + }), + getNextCursor: (cursor) => (cursor == null ? 'true' : 'false'), + establishTimeLimit: delayFunction, + }, + getDelay.bind(null, 60000), + parameters + ) + currentBuilders.add(buildPromise) + await buildPromise + } finally { + if (buildPromise != null) { + currentBuilders.delete(buildPromise) + } + } + } + }) + + afterAll(async () => { + await Promise.all( + Array.from(currentConnections).map(async (connection) => { + try { + await adapter.disconnect(connection, readModelName) + } catch (e) {} + }) + ) + currentConnections.clear() + + await adapter.unsubscribe(baseConnection, readModelName, async () => null) + await adapter.disconnect(baseConnection, readModelName) + + await factory.destroy(uniqueName)() + }) + + test('should perform with correct jitter', async () => { + while (currentConnections.size < 3) { + currentConnections.add(await adapter.connect(readModelName)) + } + for (const connection of currentConnections) { + void buildOnConnection(connection, getNotificationObj(false)) + } + + const stopThrottlingTimestamp = Date.now() + 10000 + while (Date.now() < stopThrottlingTimestamp) { + await flushWorkers() + await delayFunction(100) + } + + while (currentBuilders.size > 0) { + await delayFunction(500) + } + performWorkers = false + + for (const connection of currentConnections) { + const delays = connectionsDelays.get(connection) ?? [] + const delaysBase2Degrees = delays + .filter((delay) => delay > 0) + .map((delay) => Math.log2(delay / 100)) + + for (let index = 0; index < delaysBase2Degrees.length; index++) { + expect(Math.floor(delaysBase2Degrees[index])).toEqual( + delaysBase2Degrees[index] + ) + expect( + delaysBase2Degrees[index] === 0 || + (index > 0 + ? delaysBase2Degrees[index - 1] + 1 === delaysBase2Degrees[index] + : false) + ).toEqual(true) + } + await adapter.disconnect(connection, readModelName) + } + currentConnections.clear() + }) +}) diff --git a/tests/readmodel-test-utils/index.ts b/tests/readmodel-test-utils/index.ts index 19a629300..c20eae5d0 100644 --- a/tests/readmodel-test-utils/index.ts +++ b/tests/readmodel-test-utils/index.ts @@ -147,7 +147,7 @@ export const adapterFactory = isPostgres() } : isMySQL() ? { - name: '@resolve-js/readmodel-postgresql', + name: '@resolve-js/readmodel-mysql', create(uniqueName: string) { return async () => { const options = getMySQLOptions(uniqueName)