From cff42b298fcff5ccd9bb4d7d17e5a29ba7d0ace0 Mon Sep 17 00:00:00 2001 From: Vladislav V Date: Thu, 24 Jun 2021 15:45:31 +0300 Subject: [PATCH] Feature/hot read models (#1910) --- .../src/create-adapter-factory.ts | 129 +++++++++++++++ .../readmodel-base/src/create-adapter.ts | 149 ++++-------------- .../readmodel-base/src/index.ts | 35 +--- .../src/make-split-nested-path.ts | 4 +- .../readmodel-base/src/split-nested-path.ts | 8 + .../readmodel-base/src/types.ts | 33 ++-- .../readmodel-base/test/index.test.ts | 26 +-- .../readmodel-lite/src/build.ts | 6 +- .../readmodel-mysql/src/build.ts | 6 +- .../src/build.ts | 102 ++++++++---- .../test/build-events.test.ts | 22 ++- .../readmodel-postgresql/src/build.ts | 109 +++++++++---- .../scripts/src/get_webpack_client_configs.js | 134 ++++++++-------- 13 files changed, 446 insertions(+), 317 deletions(-) create mode 100644 packages/runtime/adapters/readmodel-adapters/readmodel-base/src/create-adapter-factory.ts create mode 100644 packages/runtime/adapters/readmodel-adapters/readmodel-base/src/split-nested-path.ts diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/create-adapter-factory.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/create-adapter-factory.ts new file mode 100644 index 0000000000..9f49825436 --- /dev/null +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/create-adapter-factory.ts @@ -0,0 +1,129 @@ +import type { + CommonAdapterPool, + CommonAdapterOptions, + AdapterImplementation, + BaseAdapterImports, + BaseAdapterPool, + AdapterOperations, + AdapterApi, + ObjectKeys, + StoreApi, +} from './types' + +const createAdapter = < + AdapterPool extends CommonAdapterPool, + AdapterOptions extends CommonAdapterOptions +>( + imports: BaseAdapterImports, + implementation: AdapterImplementation, + options: AdapterOptions +): AdapterApi => { + const { + splitNestedPath, + checkEventsContinuity, + withPerformanceTracer, + wrapConnect, + wrapDisconnect, + wrapDispose, + wrapOperation, + } = imports + + const { + connect, + disconnect, + subscribe, + unsubscribe, + resubscribe, + resume, + pause, + reset, + status, + build, + defineTable, + findOne, + find, + count, + insert, + update, + delete: del, + ...restApi + } = implementation + + if (Object.keys(restApi).length > 0) { + throw new Error( + `Read model adapter implementation should not provide extra methods: ${JSON.stringify( + Object.keys(restApi) + )}` + ) + } + + const storeApi: StoreApi = { + defineTable, + findOne, + find, + count, + insert, + update, + delete: del, + } + + const adapterOperations: AdapterOperations = { + subscribe, + unsubscribe, + resubscribe, + resume, + pause, + reset, + status, + build, + } + + for (const key of Object.keys(storeApi) as ObjectKeys< + StoreApi + >) { + if (typeof storeApi[key] !== 'function') { + throw new Error(`Store API method ${key} should be a function`) + } + } + + for (const key of Object.keys(adapterOperations) as ObjectKeys< + AdapterOperations + >) { + if (typeof adapterOperations[key] !== 'function') { + throw new Error(`Adapter operation method ${key} should be a function`) + } + } + + const { performanceTracer, monitoring, ...adapterOptions } = options + + const pool: BaseAdapterPool = { + commonAdapterPool: { + performanceTracer, + checkEventsContinuity, + splitNestedPath, + monitoring, + }, + adapterPoolMap: new Map(), + withPerformanceTracer, + performanceTracer, + monitoring, + } + + const adapter: AdapterApi = { + connect: wrapConnect(pool, connect, storeApi, adapterOptions), + disconnect: wrapDisconnect(pool, disconnect), + dispose: wrapDispose(pool, disconnect), + subscribe: wrapOperation(pool, 'subscribe', subscribe), + unsubscribe: wrapOperation(pool, 'unsubscribe', unsubscribe), + resubscribe: wrapOperation(pool, 'resubscribe', resubscribe), + resume: wrapOperation(pool, 'resume', resume), + pause: wrapOperation(pool, 'pause', pause), + reset: wrapOperation(pool, 'reset', reset), + status: wrapOperation(pool, 'status', status), + build: wrapOperation(pool, 'build', build), + } + + return Object.freeze(adapter) +} + +export default createAdapter diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/create-adapter.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/create-adapter.ts index c735629ee4..68d6db4cf9 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/create-adapter.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/create-adapter.ts @@ -1,124 +1,31 @@ -import type { - CommonAdapterPool, - CommonAdapterOptions, - AdapterImplementation, - BaseAdapterImports, - BaseAdapterPool, - AdapterOperations, - AdapterApi, - ObjectKeys, - StoreApi, -} from './types' - -const createAdapter = < - AdapterPool extends CommonAdapterPool, - AdapterOptions extends CommonAdapterOptions ->( - imports: BaseAdapterImports, - implementation: AdapterImplementation, - options: AdapterOptions -): AdapterApi => { - const { - makeSplitNestedPath, - withPerformanceTracer, - wrapConnect, - wrapDisconnect, - wrapDispose, - wrapOperation, - } = imports - const splitNestedPath = makeSplitNestedPath(imports) - - const { - connect, - disconnect, - subscribe, - unsubscribe, - resubscribe, - resume, - pause, - reset, - status, - build, - defineTable, - findOne, - find, - count, - insert, - update, - delete: del, - ...restApi - } = implementation - - if (Object.keys(restApi).length > 0) { - throw new Error( - `Read model adapter implementation should not provide extra methods: ${JSON.stringify( - Object.keys(restApi) - )}` - ) - } - - const storeApi: StoreApi = { - defineTable, - findOne, - find, - count, - insert, - update, - delete: del, - } - - const adapterOperations: AdapterOperations = { - subscribe, - unsubscribe, - resubscribe, - resume, - pause, - reset, - status, - build, - } - - for (const key of Object.keys(storeApi) as ObjectKeys< - StoreApi - >) { - if (typeof storeApi[key] !== 'function') { - throw new Error(`Store API method ${key} should be a function`) - } - } - - for (const key of Object.keys(adapterOperations) as ObjectKeys< - AdapterOperations - >) { - if (typeof adapterOperations[key] !== 'function') { - throw new Error(`Adapter operation method ${key} should be a function`) - } - } - - const { performanceTracer, monitoring, ...adapterOptions } = options - - const pool: BaseAdapterPool = { - commonAdapterPool: { performanceTracer, splitNestedPath, monitoring }, - adapterPoolMap: new Map(), - withPerformanceTracer, - performanceTracer, - monitoring, - } - - const adapter: AdapterApi = { - connect: wrapConnect(pool, connect, storeApi, adapterOptions), - disconnect: wrapDisconnect(pool, disconnect), - dispose: wrapDispose(pool, disconnect), - subscribe: wrapOperation(pool, 'subscribe', subscribe), - unsubscribe: wrapOperation(pool, 'unsubscribe', unsubscribe), - resubscribe: wrapOperation(pool, 'resubscribe', resubscribe), - resume: wrapOperation(pool, 'resume', resume), - pause: wrapOperation(pool, 'pause', pause), - reset: wrapOperation(pool, 'reset', reset), - status: wrapOperation(pool, 'status', status), - build: wrapOperation(pool, 'build', build), - } - - return Object.freeze(adapter) +//eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore +import PathToolkit from 'path-toolkit' +import { checkEventsContinuity } from '@resolve-js/eventstore-base' + +import _createAdapter from './create-adapter-factory' +import makeSplitNestedPath from './make-split-nested-path' +import withPerformanceTracer from './with-performance-tracer' +import wrapConnect from './wrap-connect' +import wrapDisconnect from './wrap-disconnect' +import wrapDispose from './wrap-dispose' +import wrapOperation from './wrap-operation' +import { CreateAdapterMethod } from './types' + +const baseAdapterImports = { + splitNestedPath: makeSplitNestedPath(PathToolkit), + checkEventsContinuity, + makeSplitNestedPath, + withPerformanceTracer, + wrapConnect, + wrapDisconnect, + wrapDispose, + wrapOperation, } +const createAdapter = _createAdapter.bind( + null, + baseAdapterImports +) as CreateAdapterMethod + export default createAdapter diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/index.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/index.ts index 2cdafd87d5..c82796ad88 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/index.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/index.ts @@ -1,34 +1,3 @@ -//eslint-disable-next-line @typescript-eslint/ban-ts-comment -// @ts-ignore -import PathToolkit from 'path-toolkit' - -import _createAdapter from './create-adapter' -import makeSplitNestedPath from './make-split-nested-path' -import withPerformanceTracer from './with-performance-tracer' -import wrapConnect from './wrap-connect' -import wrapDisconnect from './wrap-disconnect' -import wrapDispose from './wrap-dispose' -import wrapOperation from './wrap-operation' -import { CreateAdapterMethod } from './types' +export { default as default } from './create-adapter' +export { default as splitNestedPath } from './split-nested-path' export * from './types' - -const baseAdapterImports = { - PathToolkit, - makeSplitNestedPath, - withPerformanceTracer, - wrapConnect, - wrapDisconnect, - wrapDispose, - wrapOperation, -} - -const createAdapter = _createAdapter.bind( - null, - baseAdapterImports -) as CreateAdapterMethod - -const splitNestedPath = makeSplitNestedPath(baseAdapterImports) - -export default createAdapter - -export { splitNestedPath } diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/make-split-nested-path.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/make-split-nested-path.ts index 20d8cff81c..d5decb582a 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/make-split-nested-path.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/make-split-nested-path.ts @@ -1,7 +1,7 @@ import { MakeSplitNestedPathMethod, SplitNestedPathMethod } from './types' -const makeSplitNestedPath: MakeSplitNestedPathMethod = (imports) => { - const pathToolkit = new imports.PathToolkit() +const makeSplitNestedPath: MakeSplitNestedPathMethod = (PathToolkit) => { + const pathToolkit = new PathToolkit() pathToolkit.setOptions({ cache: false, force: false, diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/split-nested-path.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/split-nested-path.ts new file mode 100644 index 0000000000..09bad2114c --- /dev/null +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/split-nested-path.ts @@ -0,0 +1,8 @@ +//eslint-disable-next-line @typescript-eslint/ban-ts-comment +// @ts-ignore +import PathToolkit from 'path-toolkit' +import makeSplitNestedPath from './make-split-nested-path' + +const splitNestedPath = makeSplitNestedPath(PathToolkit) + +export default splitNestedPath diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts index cb5e21f8cf..035b4343ad 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-base/src/types.ts @@ -1,9 +1,14 @@ -import { +import type { Adapter as EventStoreAdapter, Cursor, SavedEvent, + EventWithCursor as EventStoreEventWithCursor, + checkEventsContinuity, } from '@resolve-js/eventstore-base' +export type CheckEventsContinuityMethod = typeof checkEventsContinuity +export type EventWithCursor = EventStoreEventWithCursor + export type JsonPrimitive = string | number | boolean | null export type JsonMap = { [member: string]: JsonPrimitive | JsonArray | JsonMap @@ -149,13 +154,7 @@ export type MonitoringLike = { export type ReadModelCursor = Cursor // TODO brand type export type ReadModelEvent = SavedEvent -export type EventstoreAdapterLike = { - loadEvents: EventStoreAdapter['loadEvents'] - getNextCursor: EventStoreAdapter['getNextCursor'] - getSecretsManager: EventStoreAdapter['getSecretsManager'] - loadSecrets?: EventStoreAdapter['loadSecrets'] - gatherSecretsFromEvents: EventStoreAdapter['gatherSecretsFromEvents'] -} +export type EventstoreAdapterLike = EventStoreAdapter export type SplitNestedPathMethod = (input: string) => Array @@ -163,6 +162,7 @@ export type CommonAdapterPool = { monitoring?: MonitoringLike performanceTracer?: PerformanceTracerLike splitNestedPath: SplitNestedPathMethod + checkEventsContinuity: CheckEventsContinuityMethod } export type CommonAdapterOptions = { @@ -269,6 +269,14 @@ export type OmitObject = { [K in Exclude]: T[K] } +export type BuildInfo = { + eventsWithCursors?: Array + initiator: 'command' | 'read-model-next' + notificationId: string + sendTime: number + coldStart?: boolean +} + export type AdapterConnection< AdapterPool extends CommonAdapterPool, AdapterOptions extends OmitObject @@ -331,7 +339,8 @@ export type AdapterOperations = { }, next: MethodNext, eventstoreAdapter: EventstoreAdapterLike, - getVacantTimeInMillis: MethodGetRemainingTime + getVacantTimeInMillis: MethodGetRemainingTime, + buildInfo: BuildInfo ): Promise } @@ -428,12 +437,12 @@ export type PathToolkitLib = { } export type MakeSplitNestedPathMethod = ( - imports: BaseAdapterImports + PathToolkitLib: PathToolkitLib ) => SplitNestedPathMethod export type BaseAdapterImports = { - PathToolkit: PathToolkitLib - makeSplitNestedPath: MakeSplitNestedPathMethod + splitNestedPath: SplitNestedPathMethod + checkEventsContinuity: CheckEventsContinuityMethod withPerformanceTracer: WithPerformanceTracerMethod wrapConnect: WrapConnectMethod wrapDisconnect: WrapDisconnectMethod diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-base/test/index.test.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-base/test/index.test.ts index 2b9a8a902a..1b53e51e45 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-base/test/index.test.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-base/test/index.test.ts @@ -3,7 +3,6 @@ import createReadModelConnector, { CommonAdapterPool, CommonAdapterOptions, } from '../src' -import makeSplitNestedPath from '../src/make-split-nested-path' jest.mock('../src/make-split-nested-path', () => jest.fn()) @@ -31,9 +30,6 @@ test('@resolve-js/readmodel-base should wrap descendant adapter', async () => { delete: jest.fn().mockImplementation(async () => void 0), } - const adapterPool = { - splitNestedPath: makeSplitNestedPath({} as any), - } const eventstoreAdapter = { loadEvents: jest.fn().mockResolvedValue({ cursor: 'CURSOR', events: [] }), getNextCursor: jest.fn().mockReturnValue('CURSOR'), @@ -52,16 +48,13 @@ test('@resolve-js/readmodel-base should wrap descendant adapter', async () => { existingSecrets: [], deletedSecrets: [], }), - } + } as any const adapterOptions = { parameter: 'content', - } + } as const - const adapter = createReadModelConnector(implementation, { - ...adapterPool, - ...adapterOptions, - }) + const adapter = createReadModelConnector(implementation, adapterOptions) const getVacantTimeInMillis = jest.fn().mockReturnValue(15000) @@ -100,6 +93,7 @@ test('@resolve-js/readmodel-base should wrap descendant adapter', async () => { const readModelName = 'ReadModelName' const store = await adapter.connect(readModelName) + const adapterPool = (implementation.connect as any).mock.calls[0][0] expect(implementation.connect).toBeCalledWith(adapterPool, adapterOptions) await adapter.subscribe(store, readModelName, null, null) @@ -110,6 +104,12 @@ test('@resolve-js/readmodel-base should wrap descendant adapter', async () => { null ) + const buildInfo = { + initiator: 'read-model-next', + notificationId: '0', + sendTime: 0, + } as const + const buildStep = jest.fn().mockImplementation(async () => { await new Promise((resolve) => setImmediate(resolve)) await adapter.build( @@ -119,7 +119,8 @@ test('@resolve-js/readmodel-base should wrap descendant adapter', async () => { modelInterop, buildStep, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) }) await buildStep() @@ -130,7 +131,8 @@ test('@resolve-js/readmodel-base should wrap descendant adapter', async () => { modelInterop, buildStep, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) await modelInterop.acquireInitHandler(store)() diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-lite/src/build.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-lite/src/build.ts index fb04af2ef6..60a3dd0f94 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-lite/src/build.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-lite/src/build.ts @@ -362,7 +362,8 @@ const build: ExternalMethods['build'] = async ( modelInterop, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) => { const { PassthroughError, @@ -494,7 +495,8 @@ const build: ExternalMethods['build'] = async ( modelInterop, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) } catch (error) { if (!(error instanceof PassthroughError)) { diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-mysql/src/build.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-mysql/src/build.ts index d9fa99f1b4..88239c9708 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-mysql/src/build.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-mysql/src/build.ts @@ -337,7 +337,8 @@ const build: ExternalMethods['build'] = async ( modelInterop, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) => { const { PassthroughError, @@ -429,7 +430,8 @@ const build: ExternalMethods['build'] = async ( modelInterop, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) } catch (error) { if (!(error instanceof PassthroughError)) { diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql-serverless/src/build.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql-serverless/src/build.ts index dc3655621f..5aa4cafbc5 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql-serverless/src/build.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql-serverless/src/build.ts @@ -165,11 +165,13 @@ export const buildEvents: ( modelInterop, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) => { const pool = { ...basePool, ...currentPool } const { PassthroughError, + checkEventsContinuity, inlineLedgerExecuteTransaction, inlineLedgerExecuteStatement, generateGuid, @@ -183,6 +185,24 @@ export const buildEvents: ( metricData, monitoring, } = pool + const { eventsWithCursors } = buildInfo + const isContinuousMode = + typeof eventstoreAdapter.getCursorUntilEventTypes === 'function' + const getContinuousLatestCursor = async ( + cursor: ReadModelCursor, + events: Array, + eventTypes: Array | null + ) => { + let nextCursor = await eventstoreAdapter.getNextCursor(cursor, events) + if (isContinuousMode && eventTypes != null) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + nextCursor = await eventstoreAdapter.getCursorUntilEventTypes!( + nextCursor, + eventTypes + ) + } + return nextCursor + } let lastSuccessEvent: ReadModelEvent | null = null let lastFailedEvent: ReadModelEvent | null = null @@ -193,6 +213,13 @@ export const buildEvents: ( let eventsApplyStartTimestamp = Date.now() let eventCount = 0 + const hotEvents: Array | null = + isContinuousMode && + Array.isArray(eventsWithCursors) && + checkEventsContinuity(inputCursor, eventsWithCursors) + ? eventsWithCursors.map(({ event }) => event) + : null + let transactionIdPromise: Promise = inlineLedgerExecuteTransaction( pool, 'begin' @@ -200,18 +227,21 @@ export const buildEvents: ( const firstEventsLoadStartTimestamp = Date.now() - let eventsPromise: Promise> = eventstoreAdapter - .loadEvents({ - eventTypes, - eventsSizeLimit: MAX_RDS_DATA_API_RESPONSE_SIZE, - limit: 100, - cursor, - }) - .then((result) => { - metricData.eventBatchLoadTime += - Date.now() - firstEventsLoadStartTimestamp - return result != null ? result.events : [] - }) + let eventsPromise: Promise> = + hotEvents == null + ? eventstoreAdapter + .loadEvents({ + eventTypes, + eventsSizeLimit: MAX_RDS_DATA_API_RESPONSE_SIZE, + limit: 100, + cursor, + }) + .then((result) => { + metricData.eventBatchLoadTime += + Date.now() - firstEventsLoadStartTimestamp + return result != null ? result.events : [] + }) + : Promise.resolve(hotEvents) let transactionId: string = await transactionIdPromise let rootSavePointId: string = generateGuid(transactionId, 'ROOT') @@ -272,18 +302,21 @@ export const buildEvents: ( 'begin' ).then((result) => (result != null ? result : RDS_TRANSACTION_FAILED_KEY)) - let nextCursor: ReadModelCursor = eventstoreAdapter.getNextCursor( + let nextCursorPromise: Promise = getContinuousLatestCursor( cursor, - events + events, + eventTypes ) const eventsLoadStartTimestamp = Date.now() - eventsPromise = eventstoreAdapter - .loadEvents({ - eventTypes, - eventsSizeLimit: MAX_RDS_DATA_API_RESPONSE_SIZE, - limit: 1000, - cursor: nextCursor, - }) + eventsPromise = Promise.resolve(nextCursorPromise) + .then((nextCursor) => + eventstoreAdapter.loadEvents({ + eventTypes, + eventsSizeLimit: MAX_RDS_DATA_API_RESPONSE_SIZE, + limit: 1000, + cursor: nextCursor, + }) + ) .then((result) => { metricData.eventBatchLoadTime += Date.now() - eventsLoadStartTimestamp return result != null ? result.events : [] @@ -322,9 +355,10 @@ export const buildEvents: ( appliedEventsCount++ if (getVacantTimeInMillis() < 0) { - nextCursor = eventstoreAdapter.getNextCursor( + nextCursorPromise = getContinuousLatestCursor( cursor, - events.slice(0, appliedEventsCount) + events.slice(0, appliedEventsCount), + eventTypes ) localContinue = false break @@ -334,9 +368,10 @@ export const buildEvents: ( throw error } - nextCursor = eventstoreAdapter.getNextCursor( + nextCursorPromise = getContinuousLatestCursor( cursor, - events.slice(0, appliedEventsCount) + events.slice(0, appliedEventsCount), + eventTypes ) await inlineLedgerExecuteStatement( @@ -357,7 +392,7 @@ export const buildEvents: ( throw originalError } - nextCursor = cursor + nextCursorPromise = Promise.resolve(cursor) appliedEventsCount = 0 const composedError = new Error( `Fatal inline ledger building error: ${originalError.message}` @@ -375,6 +410,7 @@ export const buildEvents: ( ) } + const nextCursor = await nextCursorPromise if (lastError == null) { await inlineLedgerExecuteStatement( pool, @@ -507,14 +543,17 @@ const build: ExternalMethods['build'] = async ( next, eventstoreAdapter, getVacantTimeInMillis, - ...args + buildInfo ) => { - const metricData: any = { - ...(args as any)[0], + const { eventsWithCursors, ...inputMetricData } = buildInfo + const metricData = { + ...inputMetricData, eventBatchLoadTime: 0, pureProjectionApplyTime: 0, pureLedgerTime: 0, + insideProjection: false, } + void eventsWithCursors const { PassthroughError, @@ -658,7 +697,8 @@ const build: ExternalMethods['build'] = async ( modelInterop, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) } catch (error) { if (!(error instanceof PassthroughError)) { diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql-serverless/test/build-events.test.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql-serverless/test/build-events.test.ts index d0ced7aacc..7b3fe6b8f1 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql-serverless/test/build-events.test.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql-serverless/test/build-events.test.ts @@ -90,7 +90,13 @@ describe('buildEvents', () => { existingSecrets: [], deletedSecrets: [], }), - } + } as any + + const buildInfo = { + initiator: 'read-model-next', + notificationId: '0', + sendTime: 0, + } as const const inlineLedgerExecuteStatement = jest .fn() @@ -135,7 +141,8 @@ describe('buildEvents', () => { projection, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) throw new Error('Test failed') } catch (error) { @@ -185,6 +192,12 @@ describe('buildEvents', () => { }, } + const buildInfo = { + initiator: 'read-model-next', + notificationId: '0', + sendTime: 0, + } as const + const eventstoreAdapter = { loadEvents: jest .fn() @@ -215,7 +228,7 @@ describe('buildEvents', () => { existingSecrets: [], deletedSecrets: [], }), - } + } as any const inlineLedgerExecuteStatement = jest .fn() @@ -260,7 +273,8 @@ describe('buildEvents', () => { projection, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) throw new Error('Test failed') } catch (error) { diff --git a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/build.ts b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/build.ts index 7f94b73579..4b76e6c8cb 100644 --- a/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/build.ts +++ b/packages/runtime/adapters/readmodel-adapters/readmodel-postgresql/src/build.ts @@ -1,3 +1,4 @@ +import { ReadModelEvent } from '../../readmodel-base/types' import type { PassthroughErrorInstance, ExternalMethods, @@ -125,12 +126,14 @@ const buildEvents: ( modelInterop, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) => { const pool = { ...basePool, ...currentPool } const { readModelLedger: { IsProcedural: isProcedural }, PassthroughError, + checkEventsContinuity, inlineLedgerRunQuery, generateGuid, escapeStr, @@ -143,6 +146,24 @@ const buildEvents: ( escapeId, xaKey, } = pool + const { eventsWithCursors } = buildInfo + const isContinuousMode = + typeof eventstoreAdapter.getCursorUntilEventTypes === 'function' + const getContinuousLatestCursor = async ( + cursor: ReadModelCursor, + events: Array, + eventTypes: Array | null + ) => { + let nextCursor = await eventstoreAdapter.getNextCursor(cursor, events) + if (isContinuousMode && eventTypes != null) { + // eslint-disable-next-line @typescript-eslint/no-non-null-assertion + nextCursor = await eventstoreAdapter.getCursorUntilEventTypes!( + nextCursor, + eventTypes + ) + } + return nextCursor + } let lastSuccessEvent = null let lastFailedEvent = null @@ -154,18 +175,28 @@ const buildEvents: ( let eventsApplyStartTimestamp = Date.now() let eventCount = 0 - let eventsPromise = eventstoreAdapter - .loadEvents({ - eventTypes, - eventsSizeLimit: 6553600, - limit: 100, - cursor, - }) - .then((result) => { - metricData.eventBatchLoadTime += - Date.now() - firstEventsLoadStartTimestamp - return result != null ? result.events : [] - }) + const hotEvents: Array | null = + isContinuousMode && + Array.isArray(eventsWithCursors) && + checkEventsContinuity(inputCursor, eventsWithCursors) + ? eventsWithCursors.map(({ event }) => event) + : null + + let eventsPromise = + hotEvents == null + ? eventstoreAdapter + .loadEvents({ + eventTypes, + eventsSizeLimit: 6553600, + limit: 100, + cursor, + }) + .then((result) => { + metricData.eventBatchLoadTime += + Date.now() - firstEventsLoadStartTimestamp + return result != null ? result.events : [] + }) + : Promise.resolve(hotEvents) let rootSavePointId = generateGuid(xaKey, 'ROOT') @@ -192,19 +223,22 @@ const buildEvents: ( if (events.length === 0) { throw new PassthroughError(false, false) } - let nextCursor: ReadModelCursor = eventstoreAdapter.getNextCursor( + let nextCursorPromise: Promise = getContinuousLatestCursor( cursor, - events + events, + eventTypes ) const eventsLoadStartTimestamp = Date.now() - eventsPromise = eventstoreAdapter - .loadEvents({ - eventTypes, - eventsSizeLimit: 65536000, - limit: 1000, - cursor: nextCursor, - }) + eventsPromise = Promise.resolve(nextCursorPromise) + .then((nextCursor) => + eventstoreAdapter.loadEvents({ + eventTypes, + eventsSizeLimit: 65536000, + limit: 1000, + cursor: nextCursor, + }) + ) .then((result) => { metricData.eventBatchLoadTime += Date.now() - eventsLoadStartTimestamp return result != null ? result.events : [] @@ -249,9 +283,10 @@ const buildEvents: ( appliedEventsCount = appliedCount eventCount += appliedCount if (status === 'OK_PARTIAL' || status === 'CUSTOM_ERROR') { - nextCursor = eventstoreAdapter.getNextCursor( + nextCursorPromise = getContinuousLatestCursor( cursor, - events.slice(0, appliedCount) + events.slice(0, appliedCount), + eventTypes ) } if (status === 'OK_ALL' || status === 'OK_PARTIAL') { @@ -308,9 +343,10 @@ const buildEvents: ( appliedEventsCount++ if (getVacantTimeInMillis() < 0) { - nextCursor = eventstoreAdapter.getNextCursor( + nextCursorPromise = getContinuousLatestCursor( cursor, - events.slice(0, appliedEventsCount) + events.slice(0, appliedEventsCount), + eventTypes ) localContinue = false break @@ -320,9 +356,10 @@ const buildEvents: ( throw error } - nextCursor = eventstoreAdapter.getNextCursor( + nextCursorPromise = getContinuousLatestCursor( cursor, - events.slice(0, appliedEventsCount) + events.slice(0, appliedEventsCount), + eventTypes ) await inlineLedgerRunQuery( @@ -341,7 +378,7 @@ const buildEvents: ( throw originalError } - nextCursor = cursor + nextCursorPromise = Promise.resolve(cursor) appliedEventsCount = 0 const composedError = new Error( `Fatal inline ledger building error: ${originalError.message}` @@ -357,7 +394,7 @@ const buildEvents: ( ) } } - + const nextCursor = await nextCursorPromise if (lastError == null) { await inlineLedgerRunQuery( `UPDATE ${databaseNameAsId}.${ledgerTableNameAsId} SET @@ -460,14 +497,17 @@ const build: ExternalMethods['build'] = async ( next, eventstoreAdapter, getVacantTimeInMillis, - ...args + buildInfo ) => { - const metricData: any = { - ...(args as any)[0], + const { eventsWithCursors, ...inputMetricData } = buildInfo + const metricData = { + ...inputMetricData, eventBatchLoadTime: 0, pureProjectionApplyTime: 0, pureLedgerTime: 0, + insideProjection: false, } + void eventsWithCursors const { PassthroughError, @@ -606,7 +646,8 @@ const build: ExternalMethods['build'] = async ( modelInterop, next, eventstoreAdapter, - getVacantTimeInMillis + getVacantTimeInMillis, + buildInfo ) } catch (error) { if (!(error instanceof PassthroughError)) { diff --git a/packages/tools/scripts/src/get_webpack_client_configs.js b/packages/tools/scripts/src/get_webpack_client_configs.js index 778d080caa..26c36e9fc8 100644 --- a/packages/tools/scripts/src/get_webpack_client_configs.js +++ b/packages/tools/scripts/src/get_webpack_client_configs.js @@ -92,6 +92,73 @@ const getClientWebpackConfigs = ({ resolveConfig, alias }) => { plugins: [], }) + const getReadModelEntryConfig = (name) => ({ + ...getBaseClientConfig(false), + name: `${OPTIONAL_ASSET_PREFIX} Read-model adapter-inline chunk ${name}`, + entry: { + [`common/${targetMode}-entry/read-model-${name}.js`]: `${path.resolve( + __dirname, + './alias/$resolve.readModelProcedure.js' + )}?readModelName=${name}`, + }, + module: { + ...getBaseClientConfig(false).module, + rules: [ + { + test: /\.js$/, + sideEffects: false, + use: { + loader: require.resolve('babel-loader'), + options: { + sourceType: 'unambiguous', + cacheDirectory: false, + babelrc: false, + presets: [ + [ + '@babel/preset-env', + { + loose: true, + modules: false, + }, + ], + ], + plugins: [ + [ + '@babel/plugin-transform-runtime', + { + corejs: false, + helpers: true, + regenerator: true, + useESModules: false, + }, + ], + ], + }, + }, + exclude: [ + ...Object.values(alias), + /@babel\/runtime/, + /regenerator-runtime/, + ], + }, + ...getBaseClientConfig(false).module.rules, + ], + }, + optimization: { + ...getBaseClientConfig(false).optimization, + noEmitOnErrors: true, + }, + output: { + ...getBaseClientConfig(false).output, + libraryTarget: 'var', + library: '__READ_MODEL_ENTRY__', + }, + plugins: [...getBaseClientConfig(false).plugins], + mode: 'production', + devtool: undefined, + target: 'node', + }) + const clientConfigs = [ { ...getBaseClientConfig(true), @@ -117,70 +184,9 @@ const getClientWebpackConfigs = ({ resolveConfig, alias }) => { }, plugins: [...getBaseClientConfig(true).plugins, new EsmWebpackPlugin()], }, - ...resolveConfig.readModels.map(({ name }) => ({ - ...getBaseClientConfig(false), - name: `${OPTIONAL_ASSET_PREFIX} Read-model adapter-inline chunk ${name}`, - entry: { - [`common/${targetMode}-entry/read-model-${name}.js`]: `${path.resolve( - __dirname, - './alias/$resolve.readModelProcedure.js' - )}?readModelName=${name}`, - }, - module: { - ...getBaseClientConfig(false).module, - rules: [ - { - test: /\.js$/, - use: { - loader: require.resolve('babel-loader'), - options: { - sourceType: 'unambiguous', - cacheDirectory: false, - babelrc: false, - presets: [ - [ - '@babel/preset-env', - { - loose: true, - }, - ], - ], - plugins: [ - [ - '@babel/plugin-transform-runtime', - { - corejs: false, - helpers: true, - regenerator: true, - useESModules: false, - }, - ], - ], - }, - }, - exclude: [ - ...Object.values(alias), - /@babel\/runtime/, - /regenerator-runtime/, - ], - }, - ...getBaseClientConfig(false).module.rules, - ], - }, - optimization: { - ...getBaseClientConfig(false).optimization, - noEmitOnErrors: true, - }, - output: { - ...getBaseClientConfig(false).output, - libraryTarget: 'var', - library: '__READ_MODEL_ENTRY__', - }, - plugins: [...getBaseClientConfig(false).plugins], - mode: 'production', - devtool: undefined, - target: 'node', - })), + ...resolveConfig.readModels.map(({ name }) => + getReadModelEntryConfig(name) + ), ] attachWebpackConfigsClientEntries(