diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/constants.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/constants.ts index 879e1234a6..2abf7c36dd 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/constants.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/constants.ts @@ -6,3 +6,7 @@ export const PARTIAL_SECRET_FLAG = Symbol.for('PARTIAL_SECRET_FLAG') export const BUFFER_SIZE = 512 * 1024 export const BATCH_SIZE = 200 + +export const THREAD_COUNT = 256 +export const THREAD_COUNTER_BYTE_LENGTH = 6 +export const CURSOR_BUFFER_SIZE = THREAD_COUNT * THREAD_COUNTER_BYTE_LENGTH diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/create-adapter.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/create-adapter.ts index 68c393a051..c39f329275 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/create-adapter.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/create-adapter.ts @@ -86,6 +86,7 @@ const createAdapter = < setReplicationStatus, getReplicationState, resetReplication, + getCursorUntilEventTypes, }: AdapterFunctions, connectionDependencies: ConnectionDependencies, options: Config @@ -190,6 +191,8 @@ const createAdapter = < setReplicationStatus: wrapMethod(adapterPool, setReplicationStatus), getReplicationState: wrapMethod(adapterPool, getReplicationState), resetReplication: wrapMethod(adapterPool, resetReplication), + + getCursorUntilEventTypes: wrapMethod(adapterPool, getCursorUntilEventTypes), } Object.assign, Adapter>( diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/cursor-operations.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/cursor-operations.ts new file mode 100644 index 0000000000..e107685efb --- /dev/null +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/cursor-operations.ts @@ -0,0 +1,93 @@ +import assert from 'assert' +import { + THREAD_COUNT, + CURSOR_BUFFER_SIZE, + THREAD_COUNTER_BYTE_LENGTH, +} from './constants' +import { Cursor } from './types' + +const checkThreadCounterHexString = (threadCounter: string): void => { + assert.strictEqual( + threadCounter.length, + 12, + 'threadCounter in hex form must have length equal to 12' + ) +} + +export const initThreadArray = (): Array => { + const threadCounters = new Array(THREAD_COUNT) + threadCounters.fill(0) + return threadCounters +} + +export const threadArrayToCursor = (threadArray: Array): string => { + assert.strictEqual( + threadArray.length, + THREAD_COUNT, + 'Cursor must be represented by array of 256 numbers' + ) + + const cursorBuffer: Buffer = Buffer.alloc(CURSOR_BUFFER_SIZE) + + for (let i = 0; i < threadArray.length; ++i) { + const threadCounter = threadArray[i] + const threadCounterBuffer = threadCounterToBuffer(threadCounter) + threadCounterBuffer.copy(cursorBuffer, i * THREAD_COUNTER_BYTE_LENGTH) + } + + return cursorBuffer.toString('base64') +} + +export const cursorToThreadArray = (cursor: Cursor): Array => { + if (cursor == null) return initThreadArray() + + const cursorBuffer = Buffer.from(cursor, 'base64') + + assert.strictEqual( + cursorBuffer.length, + CURSOR_BUFFER_SIZE, + 'Wrong size of cursor buffer' + ) + + const threadCounters = new Array(THREAD_COUNT) + for (let i = 0; i < cursorBuffer.length / THREAD_COUNTER_BYTE_LENGTH; i++) { + threadCounters[i] = hexStringToThreadCounter( + cursorBuffer.slice(i * 6, (i + 1) * 6).toString('hex') + ) + } + return threadCounters +} + +export const threadCounterToHexString = (threadCounter: number): string => + threadCounter.toString(16).padStart(12, '0') + +export const hexStringToThreadCounter = (threadCounter: string): number => { + checkThreadCounterHexString(threadCounter) + const num = parseInt(threadCounter, 16) + assert(!Number.isNaN(num)) + return num +} + +export const threadCounterHexStringToBuffer = ( + threadCounter: string +): Buffer => { + checkThreadCounterHexString(threadCounter) + const b: Buffer = Buffer.alloc(THREAD_COUNTER_BYTE_LENGTH) + + for ( + let hexIndex = 0, hexPairIndex = 0; + hexIndex < threadCounter.length; + hexIndex += 2, hexPairIndex++ + ) { + b[hexPairIndex] = Buffer.from( + threadCounter.substring(hexIndex, hexIndex + 2), + 'hex' + )[0] + } + + return b +} + +export const threadCounterToBuffer = (threadCounter: number): Buffer => { + return threadCounterHexStringToBuffer(threadCounterToHexString(threadCounter)) +} diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/index.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/index.ts index 7b6191722a..fb1496476f 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/index.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/index.ts @@ -5,7 +5,6 @@ import wrapMethod from './wrap-method' import wrapEventFilter from './wrap-event-filter' import wrapDispose from './wrap-dispose' import validateEventFilter from './validate-event-filter' -import { MAINTENANCE_MODE_AUTO, MAINTENANCE_MODE_MANUAL } from './constants' import ConcurrentError from './concurrent-error' import { ResourceAlreadyExistError, @@ -48,6 +47,19 @@ import { AdapterConfigSchema, } from './types' +export { + threadArrayToCursor, + cursorToThreadArray, + initThreadArray, +} from './cursor-operations' +export { + MAINTENANCE_MODE_AUTO, + MAINTENANCE_MODE_MANUAL, + THREAD_COUNT, + CURSOR_BUFFER_SIZE, + THREAD_COUNTER_BYTE_LENGTH, +} from './constants' + const wrappedCreateAdapter = < ConnectedProps extends AdapterPoolConnectedProps, ConnectionDependencies extends any, @@ -99,8 +111,6 @@ export { AlreadyFrozenError as EventstoreAlreadyFrozenError, AlreadyUnfrozenError as EventstoreAlreadyUnfrozenError, ReplicationAlreadyInProgress, - MAINTENANCE_MODE_AUTO, - MAINTENANCE_MODE_MANUAL, throwBadCursor, getNextCursor, snapshotTrigger, diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/types.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/types.ts index 9ef9d14c36..353d472fdf 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/types.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/types.ts @@ -538,6 +538,11 @@ export interface AdapterFunctions< ConnectedProps, NonNullable > + + getCursorUntilEventTypes?: PoolMethod< + ConnectedProps, + NonNullable + > } export interface Adapter { @@ -545,7 +550,7 @@ export interface Adapter { importEvents: (options?: Partial) => ImportEventsStream exportEvents: (options?: Partial) => ExportEventsStream getLatestEvent: (filter: EventFilter) => Promise - saveEvent: (event: InputEvent) => Promise + saveEvent: (event: InputEvent) => Promise init: () => Promise drop: () => Promise dispose: () => Promise @@ -614,4 +619,9 @@ export interface Adapter { setReplicationPaused?: (pause: boolean) => Promise getReplicationState?: () => Promise resetReplication?: () => Promise + + getCursorUntilEventTypes?: ( + cursor: Cursor, + untilEventTypes: Array + ) => Promise } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/test/cursor-to-thread-array.test.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/test/cursor-to-thread-array.test.ts new file mode 100644 index 0000000000..fabecaabf1 --- /dev/null +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/test/cursor-to-thread-array.test.ts @@ -0,0 +1,33 @@ +import { AssertionError } from 'assert' +import { cursorToThreadArray, THREAD_COUNT, initThreadArray } from '../src' + +describe('calculating array of threadCounters from cursor', () => { + test('should throw AssertionError when cursor has an invalid size', () => { + expect(() => cursorToThreadArray('AAAAAAAAAAAAAAAA')).toThrow( + AssertionError + ) + }) + + test('should return all zero array from null cursor', () => { + const arr = cursorToThreadArray(null) + expect(arr).toEqual(initThreadArray()) + }) + + test('should return all zero array from initial cursor', () => { + const cursor = 'AAAAAAAA'.repeat(THREAD_COUNT) + const arr = cursorToThreadArray(cursor) + expect(arr).toEqual(initThreadArray()) + }) + + test('should return expected results', () => { + const cursor = + // eslint-disable-next-line no-useless-concat + 'AAAAAAAB' + 'AAAAAAAa' + 'AAAAAAAA'.repeat(THREAD_COUNT - 3) + 'BAAAAAAA' + const arr = cursorToThreadArray(cursor) + const expectedArr = initThreadArray() + expectedArr[0] = 1 + expectedArr[1] = 26 + expectedArr[255] = 64 ** 7 + expect(arr).toEqual(expectedArr) + }) +}) diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/test/thread-array-to-cursor.test.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/test/thread-array-to-cursor.test.ts new file mode 100644 index 0000000000..07c6f8aefc --- /dev/null +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/test/thread-array-to-cursor.test.ts @@ -0,0 +1,39 @@ +import { AssertionError } from 'assert' +import { + threadArrayToCursor, + cursorToThreadArray, + initThreadArray, +} from '../src' + +describe('calculating cursor from array of thread counters', () => { + test('should throw AssertionError when array has an invalid size', () => { + const arr = new Array(10) + expect(() => threadArrayToCursor(arr)).toThrow(AssertionError) + }) + + test('should return expected results', () => { + const arr = initThreadArray() + arr[0] = 1 + arr[1] = 61 + arr[2] = 64 + arr[3] = 64 ** 7 + arr[4] = 26 + arr[5] = 62 + arr[6] = 63 + arr[255] = 2 + + const cursor = threadArrayToCursor(arr) + expect(cursor.substring(0, 8)).toEqual('AAAAAAAB') + expect(cursor.substring(8, 8 * 2)).toEqual('AAAAAAA9') + expect(cursor.substring(8 * 2, 8 * 3)).toEqual('AAAAAABA') + expect(cursor.substring(8 * 3, 8 * 4)).toEqual('BAAAAAAA') + expect(cursor.substring(8 * 4, 8 * 5)).toEqual('AAAAAAAa') + expect(cursor.substring(8 * 5, 8 * 6)).toEqual('AAAAAAA+') + expect(cursor.substring(8 * 6, 8 * 7)).toEqual('AAAAAAA/') + expect(cursor.substring(8 * 7, 8 * 8)).toEqual('AAAAAAAA') + expect(cursor.substring(8 * 255, 8 * 256)).toEqual('AAAAAAAC') + + const translatedArr = cursorToThreadArray(cursor) + expect(translatedArr).toEqual(arr) + }) +}) diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/get-cursor-until-event-types.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/get-cursor-until-event-types.ts new file mode 100644 index 0000000000..b4dac321eb --- /dev/null +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/get-cursor-until-event-types.ts @@ -0,0 +1,53 @@ +import { AdapterPool } from './types' +import { + cursorToThreadArray, + SavedEvent, + threadArrayToCursor, + initThreadArray, + Cursor, +} from '@resolve-js/eventstore-base' + +const getCursorUntilEventTypes = async ( + { database, escapeId, escape, eventsTableName, shapeEvent }: AdapterPool, + cursor: Cursor, + untilEventTypes: Array +): Promise => { + if (untilEventTypes.length < 1) { + throw new Error('Must define at least one event type') + } + + const tableNameAsId = escapeId(eventsTableName) + + const vectorConditions = cursorToThreadArray(cursor) + + const minThreadCounterConditions = `${vectorConditions + .map( + (threadCounter, threadId) => + `${escapeId('threadId')} = ${+threadId} AND ${escapeId( + 'threadCounter' + )} >= ${+threadCounter} ` + ) + .join(' OR ')}` + + const rows = (await database.all( + `SELECT "threadId", MIN("threadCounter") AS "threadCounter" FROM ( + SELECT "threadId", MIN("threadCounter") AS "threadCounter" FROM ${tableNameAsId} WHERE type IN + (${untilEventTypes.map((t) => escape(t)).join(', ')}) + AND (${minThreadCounterConditions}) + GROUP BY "threadId" + UNION ALL + SELECT "threadId", MAX("threadCounter")+1 AS "threadCounter" FROM ${tableNameAsId} GROUP BY "threadId") + GROUP BY "threadId"` + )) as Array<{ + threadId: SavedEvent['threadId'] + threadCounter: SavedEvent['threadCounter'] + }> + + const threadCounters = initThreadArray() + for (const row of rows) { + threadCounters[row.threadId] = row.threadCounter + } + return threadArrayToCursor(threadCounters) +} + +export default getCursorUntilEventTypes diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/index.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/index.ts index 70f409dd8f..d6beecb3db 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/index.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/index.ts @@ -42,6 +42,7 @@ import setReplicationIterator from './set-replication-iterator' import setReplicationPaused from './set-replication-paused' import getReplicationState from './get-replication-state' import resetReplication from './reset-replication' +import getCursorUntilEventTypes from './get-cursor-until-event-types' import type { Adapter } from '@resolve-js/eventstore-base' import type { ConnectionDependencies, SqliteAdapterConfig } from './types' @@ -87,6 +88,7 @@ const createSqliteAdapter = (options: SqliteAdapterConfig): Adapter => { setReplicationPaused, getReplicationState, resetReplication, + getCursorUntilEventTypes, }, { sqlite, tmp, os, fs } as ConnectionDependencies, options diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/load-events-by-cursor.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/load-events-by-cursor.ts index 41ecfba10c..8f193a8012 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/load-events-by-cursor.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/load-events-by-cursor.ts @@ -1,7 +1,13 @@ -import { EventsWithCursor, CursorFilter } from '@resolve-js/eventstore-base' +import { + EventsWithCursor, + CursorFilter, + SavedEvent, + cursorToThreadArray, + threadArrayToCursor, +} from '@resolve-js/eventstore-base' import { AdapterPool } from './types' -const split2RegExp = /.{1,2}(?=(.{2})+(?!.))|.{1,2}$/g +//const split2RegExp = /.{1,2}(?=(.{2})+(?!.))|.{1,2}$/g const loadEventsByCursor = async ( { database, escapeId, escape, eventsTableName, shapeEvent }: AdapterPool, @@ -9,16 +15,8 @@ const loadEventsByCursor = async ( ): Promise => { const { eventTypes, aggregateIds, cursor, limit } = filter const injectString = (value: any): string => `${escape(value)}` - const injectNumber = (value: any): string => `${+value}` - const cursorBuffer: Buffer = - cursor != null ? Buffer.from(cursor, 'base64') : Buffer.alloc(1536, 0) - const vectorConditions = [] - for (let i = 0; i < cursorBuffer.length / 6; i++) { - vectorConditions.push( - `0x${cursorBuffer.slice(i * 6, (i + 1) * 6).toString('hex')}` - ) - } + const vectorConditions = cursorToThreadArray(cursor) const queryConditions = [] if (eventTypes != null) { @@ -40,15 +38,15 @@ const loadEventsByCursor = async ( ${vectorConditions .map( (threadCounter, threadId) => - `${escapeId('threadId')} = ${injectNumber(threadId)} AND ${escapeId( + `${escapeId('threadId')} = ${+threadId} AND ${escapeId( 'threadCounter' - )} >= ${threadCounter} ` + )} >= ${+threadCounter} ` ) .join(' OR ')} ${queryConditions.length > 0 ? ')' : ''}` const tableNameAsId = escapeId(eventsTableName) - const events: any[] = [] + const events: SavedEvent[] = [] const rows = await database.all( `SELECT * FROM ${tableNameAsId} @@ -60,35 +58,16 @@ const loadEventsByCursor = async ( for (const event of rows) { const threadId = +event.threadId const threadCounter = +event.threadCounter - const oldThreadCounter = parseInt( - vectorConditions[threadId].substring(2), - 16 - ) - - vectorConditions[threadId] = `0x${Math.max( - threadCounter + 1, - oldThreadCounter - ) - .toString(16) - .padStart(12, '0')}` + const oldThreadCounter = vectorConditions[threadId] + vectorConditions[threadId] = Math.max(threadCounter + 1, oldThreadCounter) events.push(shapeEvent(event)) } - const nextConditionsBuffer = Buffer.alloc(1536) - let byteIndex = 0 - - for (const threadCounter of vectorConditions) { - const threadCounterBytes = threadCounter.substring(2).match(split2RegExp) - if (Array.isArray(threadCounterBytes)) { - for (const byteHex of threadCounterBytes) { - nextConditionsBuffer[byteIndex++] = Buffer.from(byteHex, 'hex')[0] - } - } - } + const nextConditions = threadArrayToCursor(vectorConditions) return { - cursor: nextConditionsBuffer.toString('base64'), + cursor: nextConditions, events, } } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/save-event.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/save-event.ts index 023d5992e0..5b66c7a4be 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/save-event.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/save-event.ts @@ -1,11 +1,16 @@ -import { ConcurrentError, InputEvent } from '@resolve-js/eventstore-base' +import { + ConcurrentError, + InputEvent, + SavedEvent, + threadArrayToCursor, +} from '@resolve-js/eventstore-base' import { AdapterPool } from './types' import { EventstoreFrozenError } from '@resolve-js/eventstore-base' const saveEvent = async ( pool: AdapterPool, event: InputEvent -): Promise => { +): Promise => { const { eventsTableName, database, escapeId, escape } = pool try { const currentThreadId = Math.floor(Math.random() * 256) @@ -53,10 +58,29 @@ const saveEvent = async ( ${+event.aggregateVersion}, ${escape(event.type)}, json(CAST(${serializedPayload} AS BLOB)) - ); - - COMMIT;` + );` ) + + const rows = (await database.all( + `SELECT "threadId", MAX("threadCounter") AS "threadCounter" FROM + ${eventsTableNameAsId} GROUP BY "threadId" ORDER BY "threadId" ASC` + )) as Array<{ + threadId: SavedEvent['threadId'] + threadCounter: SavedEvent['threadCounter'] + }> + + const threadCounters = new Array(256) + threadCounters.fill(-1) + for (const row of rows) { + threadCounters[row.threadId] = row.threadCounter + } + for (let i = 0; i < threadCounters.length; ++i) { + threadCounters[i]++ + } + + const cursor = threadArrayToCursor(threadCounters) + await database.exec('COMMIT;') + return cursor } catch (error) { const errorMessage = error != null && error.message != null ? error.message : '' diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-mysql/src/save-event.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-mysql/src/save-event.ts index 313167ef74..f173fd5a36 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-mysql/src/save-event.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-mysql/src/save-event.ts @@ -10,7 +10,7 @@ import { AdapterPool } from './types' const saveEvent = async ( pool: AdapterPool, event: InputEvent -): Promise => { +): Promise => { const { eventsTableName, connection, database, escapeId, escape } = pool try { const eventsTableNameAsId: string = escapeId(eventsTableName) @@ -69,6 +69,7 @@ const saveEvent = async ( COMMIT;` ) + return '' } catch (error) { const errno = error != null && error.errno != null ? error.errno : 0 const message = error != null && error.message != null ? error.message : '' diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/get-cursor-until-event-types.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/get-cursor-until-event-types.ts new file mode 100644 index 0000000000..4cdd3278cf --- /dev/null +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/get-cursor-until-event-types.ts @@ -0,0 +1,63 @@ +import { INT8_SQL_TYPE } from './constants' +import { AdapterPool } from './types' +import { + cursorToThreadArray, + SavedEvent, + threadArrayToCursor, + initThreadArray, + Cursor, +} from '@resolve-js/eventstore-base' + +const getCursorUntilEventTypes = async ( + { + executeStatement, + escapeId, + escape, + eventsTableName, + shapeEvent, + databaseName, + }: AdapterPool, + cursor: Cursor, + untilEventTypes: Array +): Promise => { + if (untilEventTypes.length < 1) { + throw new Error('Must define at least one event type') + } + + const databaseNameAsId: string = escapeId(databaseName) + const eventsTableAsId: string = escapeId(eventsTableName) + const threadsTableAsId: string = escapeId(`${eventsTableName}-threads`) + + const vectorConditions = cursorToThreadArray(cursor) + + const minThreadCounterConditions = `${vectorConditions + .map( + (threadCounter, threadId) => + `${escapeId('threadId')} = ${+threadId} AND ${escapeId( + 'threadCounter' + )} >= ${+threadCounter}::${INT8_SQL_TYPE} ` + ) + .join(' OR ')}` + + const rows = (await executeStatement( + `SELECT "threadId", MIN("threadCounter") AS "threadCounter" FROM ( + SELECT "threadId", MIN("threadCounter") AS "threadCounter" FROM ${databaseNameAsId}.${eventsTableAsId} WHERE type IN + (${untilEventTypes.map((t) => escape(t)).join(', ')}) + AND (${minThreadCounterConditions}) + GROUP BY "threadId" + UNION ALL + SELECT "threadId", "threadCounter" FROM ${databaseNameAsId}.${threadsTableAsId}) AS "union_table" + GROUP BY "threadId"` + )) as Array<{ + threadId: SavedEvent['threadId'] + threadCounter: SavedEvent['threadCounter'] + }> + + const threadCounters = initThreadArray() + for (const row of rows) { + threadCounters[row.threadId] = row.threadCounter + } + return threadArrayToCursor(threadCounters) +} + +export default getCursorUntilEventTypes diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/index.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/index.ts index d68ad4dbd6..ddffb141af 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/index.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/index.ts @@ -47,6 +47,7 @@ import setReplicationIterator from './set-replication-iterator' import setReplicationPaused from './set-replication-paused' import getReplicationState from './get-replication-state' import resetReplication from './reset-replication' +import getCursorUntilEventTypes from './get-cursor-until-event-types' import _createResource from './resource/create' import _destroyResource from './resource/destroy' @@ -102,6 +103,7 @@ const createPostgresqlServerlessAdapter = ( setReplicationPaused, getReplicationState, resetReplication, + getCursorUntilEventTypes, }, { RDSDataService, diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/load-events-by-cursor.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/load-events-by-cursor.ts index 6cb5ec9bba..e76352c824 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/load-events-by-cursor.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/load-events-by-cursor.ts @@ -1,5 +1,11 @@ import { INT8_SQL_TYPE } from './constants' import { AdapterPool } from './types' +import { + CursorFilter, + SavedEvent, + cursorToThreadArray, + threadArrayToCursor, +} from '@resolve-js/eventstore-base' const split2RegExp = /.{1,2}(?=(.{2})+(?!.))|.{1,2}$/g // Although documentation describes a 1 MB limit, the actual limit is 512 KB @@ -22,7 +28,7 @@ const loadEventsByCursor = async ( cursor, limit: inputLimit, eventsSizeLimit: inputEventsSizeLimit, - }: any + }: CursorFilter ) => { const eventsSizeLimit = inputEventsSizeLimit != null @@ -30,24 +36,10 @@ const loadEventsByCursor = async ( : MAX_RDS_DATA_API_RESPONSE_SIZE const limit = Math.min(inputLimit, 0x7fffffff) - const makeBigIntLiteral = (numStr: any): string => - `x'${numStr}'::${INT8_SQL_TYPE}` - const parseBigIntString = (str: any): string => - str.substring(2, str.length - (INT8_SQL_TYPE.length + 3)) - - const injectBigInt = (value: any): string => - makeBigIntLiteral((+value).toString(16).padStart(12, '0')) const injectString = (value: any): string => `${escape(value)}` const injectNumber = (value: any): string => `${+value}` - const cursorBuffer: Buffer = - cursor != null ? Buffer.from(cursor, 'base64') : Buffer.alloc(1536, 0) - const vectorConditions = [] - for (let i = 0; i < cursorBuffer.length / 6; i++) { - vectorConditions.push( - makeBigIntLiteral(cursorBuffer.slice(i * 6, (i + 1) * 6).toString('hex')) - ) - } + const vectorConditions = cursorToThreadArray(cursor) const queryConditions: string[] = ['1=1'] if (eventTypes != null) { @@ -63,7 +55,7 @@ const loadEventsByCursor = async ( (threadCounter, threadId) => `"threadId"=${injectNumber( threadId - )} AND "threadCounter">=${threadCounter}` + )} AND "threadCounter">=${threadCounter}::${INT8_SQL_TYPE}` ) .join(' OR ') const resultTimestampConditions: string = vectorConditions @@ -71,7 +63,7 @@ const loadEventsByCursor = async ( (threadCounter, threadId) => `"threadId"=${injectNumber( threadId - )} AND "threadCounter"=${threadCounter}` + )} AND "threadCounter"=${threadCounter}::${INT8_SQL_TYPE}` ) .join(' OR ') @@ -156,9 +148,9 @@ const loadEventsByCursor = async ( } requestCursors[batchIndex].push( - `"threadId"= ${+threadId} AND "threadCounter" BETWEEN ${injectBigInt( - threadCounterStart - )} AND ${injectBigInt(threadCounterEnd)}` + `"threadId"= ${+threadId} AND "threadCounter" BETWEEN + ${threadCounterStart}::${INT8_SQL_TYPE} AND + ${threadCounterEnd}::${INT8_SQL_TYPE}` ) } @@ -266,31 +258,16 @@ const loadEventsByCursor = async ( for (const event of events) { const threadId = +event.threadId const threadCounter = +event.threadCounter - const oldThreadCounter = parseInt( - parseBigIntString(vectorConditions[threadId]), - 16 - ) - vectorConditions[threadId] = injectBigInt( - Math.max(threadCounter + 1, oldThreadCounter) - ) + const oldThreadCounter = vectorConditions[threadId] + vectorConditions[threadId] = Math.max(threadCounter + 1, oldThreadCounter) resultEvents.push(shapeEvent(event)) } - const nextConditionsBuffer: Buffer = Buffer.alloc(1536) - let byteIndex = 0 - - for (const threadCounter of vectorConditions) { - const threadCounterBytes = parseBigIntString(threadCounter).match( - split2RegExp - ) - for (const byteHex of threadCounterBytes as any) { - nextConditionsBuffer[byteIndex++] = parseInt(byteHex, 16) - } - } + const nextConditions = threadArrayToCursor(vectorConditions) return { - cursor: nextConditionsBuffer.toString('base64'), + cursor: nextConditions, events: resultEvents, } } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/save-event.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/save-event.ts index 2c0d5e41f7..1c0acc4510 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/save-event.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/src/save-event.ts @@ -2,15 +2,19 @@ import { ConcurrentError, InputEvent, EventstoreFrozenError, + SavedEvent, + threadArrayToCursor, + initThreadArray, } from '@resolve-js/eventstore-base' import { RESERVED_EVENT_SIZE, LONG_NUMBER_SQL_TYPE } from './constants' import { AdapterPool } from './types' +import assert from 'assert' const saveEvent = async ( pool: AdapterPool, event: InputEvent -): Promise => { +): Promise => { const { databaseName, eventsTableName, @@ -44,7 +48,7 @@ const saveEvent = async ( savingEvent: while (true) { try { // prettier-ignore - await executeStatement( + const rows = (await executeStatement( `WITH "freeze_check" AS ( SELECT 0 AS "freeze_zero" WHERE ( (SELECT 1 AS "EventStoreIsFrozen") @@ -79,7 +83,7 @@ const saveEvent = async ( SELECT "threadId" FROM "vector_id" LIMIT 1 ) RETURNING * - ) INSERT INTO ${databaseNameAsId}.${eventsTableAsId}( + ), "insert_event" AS (INSERT INTO ${databaseNameAsId}.${eventsTableAsId}( "threadId", "threadCounter", "timestamp", @@ -98,9 +102,21 @@ const saveEvent = async ( ), ${serializedEvent}, ${byteLength} - )` - ) - break + )) SELECT "threadId", "threadCounter", + (CASE WHEN (SELECT "threadId" FROM "vector_id" LIMIT 1) = "threadId" THEN "threadCounter"+1 + ELSE "threadCounter" END) AS "newThreadCounter" + FROM ${databaseNameAsId}.${threadsTableAsId} + ORDER BY "threadId" ASC` + )) as Array<{ + threadId: SavedEvent['threadId'] + newThreadCounter: SavedEvent['threadCounter'] + }> + assert.strictEqual(rows.length, 256, 'Thread table must have 256 rows') + const threadCounters = initThreadArray() + for (const row of rows) { + threadCounters[row.threadId] = row.newThreadCounter + } + return threadArrayToCursor(threadCounters) } catch (error) { if (isTimeoutError(error)) { while (true) { diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/test/save-event.unit.test.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/test/save-event.unit.test.ts deleted file mode 100644 index e477301cc1..0000000000 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql-serverless/test/save-event.unit.test.ts +++ /dev/null @@ -1,173 +0,0 @@ -import saveEvent from '../src/save-event' -import escapeId from '../src/escape-id' -import escape from '../src/escape' -import isTimeoutError from '../src/is-timeout-error' -import { AdapterPool } from '../src/types' - -const databaseName = 'databaseName' -const eventsTableName = 'eventsTableName' - -const event = { - aggregateId: 'aggregateId', - aggregateVersion: 1, - type: 'TEST', - payload: { key: 'value' }, - timestamp: 1, -} - -test('method "saveEvent" should throw an exception "ConcurrentError"', async () => { - const executeStatement = jest - .fn() - .mockRejectedValue(new Error('Conflict "aggregateIdAndVersion"')) - - const pool: AdapterPool = { - databaseName, - eventsTableName, - executeStatement, - isTimeoutError, - escapeId, - escape, - } as any - - try { - await saveEvent(pool, event) - return Promise.reject('Test failed') - } catch (error) { - expect(error.name).toEqual('ConcurrentError') - } -}) - -test('method "saveEvent" should throw an exception "Event store is frozen"', async () => { - const executeStatement = jest - .fn() - .mockRejectedValue(new Error('subquery used as an expression')) - - const pool: AdapterPool = { - databaseName, - eventsTableName, - executeStatement, - isTimeoutError, - escapeId, - escape, - } as any - - try { - await saveEvent(pool, event) - return Promise.reject('Test failed') - } catch (error) { - expect(error.message).toEqual('Event store is frozen') - } -}) - -test('method "saveEvent" should save an event after StatementTimeoutException', async () => { - let insertCounter = 0 - let selectCounter = 0 - const executeStatement = jest.fn().mockImplementation(async (sql) => { - if (/INSERT/i.test(sql)) { - if (insertCounter++ < 3) { - throw new Error('StatementTimeoutException') - } - } else if (/SELECT/i.test(sql)) { - if (selectCounter++ < 3) { - throw new Error('StatementTimeoutException') - } - return [ - { - aggregateId: event.aggregateId, - aggregateVersion: event.aggregateVersion, - type: event.type, - payload: JSON.stringify(event.payload), - }, - ] - } else { - throw new Error(sql) - } - }) - - const pool: AdapterPool = { - databaseName, - eventsTableName, - executeStatement, - isTimeoutError, - escapeId, - escape, - } as any - - await saveEvent(pool, event) - - expect(executeStatement).toBeCalled() -}) - -test('method "saveEvent" should throw an exception "ConcurrentError" after StatementTimeoutException', async () => { - let insertCounter = 0 - let selectCounter = 0 - const executeStatement = jest.fn().mockImplementation(async (sql) => { - if (/INSERT/i.test(sql)) { - if (insertCounter++ < 3) { - throw new Error('StatementTimeoutException') - } - } else if (/SELECT/i.test(sql)) { - if (selectCounter++ < 3) { - throw new Error('StatementTimeoutException') - } - return [ - { - aggregateId: event.aggregateId, - aggregateVersion: event.aggregateVersion, - type: 'ANOTHER_TYPE', - payload: 'another-payload', - }, - ] - } else { - throw new Error(sql) - } - }) - - const pool: AdapterPool = { - databaseName, - eventsTableName, - executeStatement, - isTimeoutError, - escapeId, - escape, - } as any - - try { - await saveEvent(pool, event) - return Promise.reject('Test failed') - } catch (error) { - expect(error.name).toEqual('ConcurrentError') - } -}) - -test('method "saveEvent" should throw an exception "Event store is frozen" after StatementTimeoutException', async () => { - let insertCounter = 0 - const executeStatement = jest.fn().mockImplementation(async (sql) => { - if (/INSERT/i.test(sql)) { - if (insertCounter++ < 3) { - throw new Error('StatementTimeoutException') - } - throw new Error('subquery used as an expression') - } else if (/SELECT/i.test(sql)) { - return [] - } else { - throw new Error(sql) - } - }) - - const pool: AdapterPool = { - databaseName, - eventsTableName, - executeStatement, - isTimeoutError, - escapeId, - escape, - } as any - - try { - await saveEvent(pool, event) - return Promise.reject('Test failed') - } catch (error) { - expect(error.message).toEqual('Event store is frozen') - } -}) diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/get-cursor-until-event-types.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/get-cursor-until-event-types.ts new file mode 100644 index 0000000000..4cdd3278cf --- /dev/null +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/get-cursor-until-event-types.ts @@ -0,0 +1,63 @@ +import { INT8_SQL_TYPE } from './constants' +import { AdapterPool } from './types' +import { + cursorToThreadArray, + SavedEvent, + threadArrayToCursor, + initThreadArray, + Cursor, +} from '@resolve-js/eventstore-base' + +const getCursorUntilEventTypes = async ( + { + executeStatement, + escapeId, + escape, + eventsTableName, + shapeEvent, + databaseName, + }: AdapterPool, + cursor: Cursor, + untilEventTypes: Array +): Promise => { + if (untilEventTypes.length < 1) { + throw new Error('Must define at least one event type') + } + + const databaseNameAsId: string = escapeId(databaseName) + const eventsTableAsId: string = escapeId(eventsTableName) + const threadsTableAsId: string = escapeId(`${eventsTableName}-threads`) + + const vectorConditions = cursorToThreadArray(cursor) + + const minThreadCounterConditions = `${vectorConditions + .map( + (threadCounter, threadId) => + `${escapeId('threadId')} = ${+threadId} AND ${escapeId( + 'threadCounter' + )} >= ${+threadCounter}::${INT8_SQL_TYPE} ` + ) + .join(' OR ')}` + + const rows = (await executeStatement( + `SELECT "threadId", MIN("threadCounter") AS "threadCounter" FROM ( + SELECT "threadId", MIN("threadCounter") AS "threadCounter" FROM ${databaseNameAsId}.${eventsTableAsId} WHERE type IN + (${untilEventTypes.map((t) => escape(t)).join(', ')}) + AND (${minThreadCounterConditions}) + GROUP BY "threadId" + UNION ALL + SELECT "threadId", "threadCounter" FROM ${databaseNameAsId}.${threadsTableAsId}) AS "union_table" + GROUP BY "threadId"` + )) as Array<{ + threadId: SavedEvent['threadId'] + threadCounter: SavedEvent['threadCounter'] + }> + + const threadCounters = initThreadArray() + for (const row of rows) { + threadCounters[row.threadId] = row.threadCounter + } + return threadArrayToCursor(threadCounters) +} + +export default getCursorUntilEventTypes diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/index.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/index.ts index 5f4d87166a..23a305bb57 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/index.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/index.ts @@ -46,6 +46,7 @@ import setReplicationIterator from './set-replication-iterator' import setReplicationPaused from './set-replication-paused' import getReplicationState from './get-replication-state' import resetReplication from './reset-replication' +import getCursorUntilEventTypes from './get-cursor-until-event-types' import type { Adapter } from '@resolve-js/eventstore-base' import type { ConnectionDependencies, PostgresqlAdapterConfig } from './types' @@ -94,6 +95,7 @@ const createPostgresqlAdapter = (options: PostgresqlAdapterConfig): Adapter => { setReplicationPaused, getReplicationState, resetReplication, + getCursorUntilEventTypes, }, { Postgres, diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/load-events-by-cursor.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/load-events-by-cursor.ts index 1f424f398a..a58ff61f1c 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/load-events-by-cursor.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/load-events-by-cursor.ts @@ -1,8 +1,11 @@ import { INT8_SQL_TYPE } from './constants' import { AdapterPool } from './types' -import { CursorFilter } from '@resolve-js/eventstore-base' - -const split2RegExp = /.{1,2}(?=(.{2})+(?!.))|.{1,2}$/g +import { + CursorFilter, + SavedEvent, + cursorToThreadArray, + threadArrayToCursor, +} from '@resolve-js/eventstore-base' const loadEventsByCursor = async ( { @@ -18,16 +21,7 @@ const loadEventsByCursor = async ( const injectString = (value: any): string => `${escape(value)}` const injectNumber = (value: any): string => `${+value}` - const cursorBuffer: Buffer = - cursor != null ? Buffer.from(cursor, 'base64') : Buffer.alloc(1536, 0) - const vectorConditions = [] - for (let i = 0; i < cursorBuffer.length / 6; i++) { - vectorConditions.push( - `x'${cursorBuffer - .slice(i * 6, (i + 1) * 6) - .toString('hex')}'::${INT8_SQL_TYPE}` - ) - } + const vectorConditions = cursorToThreadArray(cursor) const queryConditions: any[] = [] if (eventTypes != null) { @@ -45,7 +39,7 @@ const loadEventsByCursor = async ( (threadCounter, threadId) => `"threadId" = ${injectNumber( threadId - )} AND "threadCounter" >= ${threadCounter} ` + )} AND "threadCounter" >= ${threadCounter}::${INT8_SQL_TYPE} ` ) .join(' OR ')} ${queryConditions.length > 0 ? ')' : ''}` @@ -61,43 +55,21 @@ const loadEventsByCursor = async ( ].join('\n') const rows: any[] = await executeStatement(sqlQuery) - const events: any[] = [] + const events: SavedEvent[] = [] for (const event of rows) { const threadId = +event.threadId const threadCounter = +event.threadCounter - const oldThreadCounter = parseInt( - vectorConditions[threadId].substring( - 2, - vectorConditions[threadId].length - (INT8_SQL_TYPE.length + 3) - ), - 16 - ) - - vectorConditions[threadId] = `x'${Math.max( - threadCounter + 1, - oldThreadCounter - ) - .toString(16) - .padStart(12, '0')}'::${INT8_SQL_TYPE}` + const oldThreadCounter = vectorConditions[threadId] + vectorConditions[threadId] = Math.max(threadCounter + 1, oldThreadCounter) events.push(shapeEvent(event)) } - const nextConditionsBuffer: Buffer = Buffer.alloc(1536) - let byteIndex = 0 - - for (const threadCounter of vectorConditions) { - const threadCounterBytes = threadCounter - .substring(2, threadCounter.length - (INT8_SQL_TYPE.length + 3)) - .match(split2RegExp) - for (const byteHex of threadCounterBytes as any) { - nextConditionsBuffer[byteIndex++] = Buffer.from(byteHex, 'hex')[0] - } - } + const nextConditions = threadArrayToCursor(vectorConditions) return { - cursor: nextConditionsBuffer.toString('base64'), + cursor: nextConditions, events, } } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/save-event.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/save-event.ts index cece0381eb..4365ef8531 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/save-event.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/save-event.ts @@ -2,10 +2,14 @@ import { ConcurrentError, InputEvent, EventstoreFrozenError, + SavedEvent, + threadArrayToCursor, + initThreadArray, } from '@resolve-js/eventstore-base' import { RESERVED_EVENT_SIZE, LONG_NUMBER_SQL_TYPE } from './constants' import { AdapterPool } from './types' +import assert from 'assert' const saveEvent = async ( { @@ -16,7 +20,7 @@ const saveEvent = async ( escape, }: AdapterPool, event: InputEvent -): Promise => { +): Promise => { while (true) { try { const serializedEvent = [ @@ -36,7 +40,7 @@ const saveEvent = async ( const threadsTableAsId = escapeId(`${eventsTableName}-threads`) const eventsTableAsId = escapeId(eventsTableName) - await executeStatement( + const rows = (await executeStatement( `WITH "freeze_check" AS ( SELECT 0 AS "freeze_zero" WHERE ( (SELECT 1 AS "EventStoreIsFrozen") @@ -71,7 +75,7 @@ const saveEvent = async ( SELECT "threadId" FROM "vector_id" LIMIT 1 ) RETURNING * - ) INSERT INTO ${databaseNameAsId}.${eventsTableAsId}( + ), "insert_event" AS (INSERT INTO ${databaseNameAsId}.${eventsTableAsId}( "threadId", "threadCounter", "timestamp", @@ -90,10 +94,22 @@ const saveEvent = async ( ), ${serializedEvent}, ${byteLength} - )` - ) + )) SELECT "threadId", "threadCounter", + (CASE WHEN (SELECT "threadId" FROM "vector_id" LIMIT 1) = "threadId" THEN "threadCounter"+1 + ELSE "threadCounter" END) AS "newThreadCounter" + FROM ${databaseNameAsId}.${threadsTableAsId} + ORDER BY "threadId" ASC` + )) as Array<{ + threadId: SavedEvent['threadId'] + newThreadCounter: SavedEvent['threadCounter'] + }> - break + assert.strictEqual(rows.length, 256, 'Thread table must have 256 rows') + const threadCounters = initThreadArray() + for (const row of rows) { + threadCounters[row.threadId] = row.newThreadCounter + } + return threadArrayToCursor(threadCounters) } catch (error) { const errorMessage = error != null && error.message != null ? error.message : '' diff --git a/tests/eventstore-save-load/index.test.ts b/tests/eventstore-save-load/index.test.ts index 4459967d2d..98fa4ad216 100644 --- a/tests/eventstore-save-load/index.test.ts +++ b/tests/eventstore-save-load/index.test.ts @@ -1,29 +1,184 @@ -import { adapterFactory, adapters, jestTimeout } from '../eventstore-test-utils' +import { + adapterFactory, + adapters, + jestTimeout, + makeTestEvent, +} from '../eventstore-test-utils' + +import { + threadArrayToCursor, + cursorToThreadArray, + THREAD_COUNT, +} from '@resolve-js/eventstore-base' jest.setTimeout(jestTimeout()) -beforeAll(adapterFactory.create('save_and_load_testing')) -afterAll(adapterFactory.destroy('save_and_load_testing')) - -const adapter = adapters['save_and_load_testing'] - -test(`${adapterFactory.name}. Eventstore adapter should be able to save and load an event`, async () => { - await adapter.saveEvent({ - aggregateVersion: 1, - aggregateId: 'ID_1', - type: 'TYPE_1', - payload: { message: 'hello' }, - timestamp: 1, - }) - const { events, cursor } = await adapter.loadEvents({ - eventTypes: null, - aggregateIds: null, - limit: 1, - cursor: null, - }) - expect(events).toHaveLength(1) - expect(events[0].type).toEqual('TYPE_1') - expect(events[0].payload).toEqual({ message: 'hello' }) - expect(events[0].timestamp).toBeGreaterThan(0) - expect(typeof cursor).toBe('string') +describe(`${adapterFactory.name}. Eventstore adapter events saving and loading`, () => { + beforeAll(adapterFactory.create('save_and_load_testing')) + afterAll(adapterFactory.destroy('save_and_load_testing')) + + const adapter = adapters['save_and_load_testing'] + + test('should be able to save and load an event', async () => { + const returnedCursor = await adapter.saveEvent({ + aggregateVersion: 1, + aggregateId: 'ID_1', + type: 'TYPE_1', + payload: { message: 'hello' }, + timestamp: 1, + }) + const { events, cursor } = await adapter.loadEvents({ + eventTypes: null, + aggregateIds: null, + limit: 1, + cursor: null, + }) + expect(events).toHaveLength(1) + expect(events[0].type).toEqual('TYPE_1') + expect(events[0].payload).toEqual({ message: 'hello' }) + expect(events[0].timestamp).toBeGreaterThan(0) + expect(typeof cursor).toBe('string') + expect(returnedCursor).toEqual(cursor) + }) + + test('should be able to save many events and returned cursors must match the subsequent loadEvents cursor', async () => { + const checkCount = 16 + + for (let i = 0; i < checkCount; ++i) { + const event = makeTestEvent(i) + const nextCursor = await adapter.saveEvent(event) + const { events, cursor } = await adapter.loadEvents({ + limit: checkCount + 1, + cursor: null, + }) + expect(nextCursor).toEqual(cursor) + expect(events).toHaveLength(i + 2) + } + }) +}) + +describe(`${adapterFactory.name}. Eventstore adapter getCursorUntilEventTypes`, () => { + beforeAll(adapterFactory.create('until_event_type_testing')) + afterAll(adapterFactory.destroy('until_event_type_testing')) + + const adapter = adapters['until_event_type_testing'] + + test('should return initial cursor if event-store is empty', async () => { + const cursor = await adapter.getCursorUntilEventTypes(null, ['TYPE_1']) + const arr = new Array(THREAD_COUNT) + arr.fill(0) + expect(cursor).toEqual(threadArrayToCursor(arr)) + }) + + let firstStopEventCursor: string | null + + test('should return cursor past the last event if no events of stop type in event-store and start with null cursor', async () => { + await adapter.saveEvent({ + aggregateVersion: 1, + aggregateId: 'ID_1', + type: 'TYPE_1', + payload: { message: 'hello' }, + timestamp: 1, + }) + + const endCursor = await adapter.saveEvent({ + aggregateVersion: 1, + aggregateId: 'ID_2', + type: 'TYPE_1', + payload: { message: 'hello' }, + timestamp: 1, + }) + + const cursor = await adapter.getCursorUntilEventTypes(null, ['TYPE_2']) + expect(cursor).toEqual(endCursor) + }) + + test('should return cursor to the event of stop type when starting with null cursor', async () => { + const { cursor: endCursor } = await adapter.loadEvents({ + limit: 2, + cursor: null, + }) + + firstStopEventCursor = await adapter.saveEvent({ + aggregateVersion: 1, + aggregateId: 'ID_3', + type: 'TYPE_2', + payload: { message: 'hello' }, + timestamp: 1, + }) + const cursor = await adapter.getCursorUntilEventTypes(null, ['TYPE_2']) + expect(cursor).toEqual(endCursor) + }) + + test('should return cursor to the event of stop type when starting with non-null cursor', async () => { + const { cursor: startCursor } = await adapter.loadEvents({ + limit: 2, + cursor: null, + }) + + const cursor = await adapter.getCursorUntilEventTypes(startCursor, [ + 'TYPE_2', + ]) + expect(cursor).toEqual(startCursor) + }) + + test('should return cursor past the last event if starting with non-null cursor past the event of stop type', async () => { + await adapter.saveEvent({ + aggregateVersion: 1, + aggregateId: 'ID_4', + type: 'TYPE_1', + payload: { message: 'hello' }, + timestamp: 1, + }) + + await adapter.saveEvent({ + aggregateVersion: 1, + aggregateId: 'ID_5', + type: 'TYPE_1', + payload: { message: 'hello' }, + timestamp: 1, + }) + + const { cursor: endCursor } = await adapter.loadEvents({ + limit: 5, + cursor: null, + }) + + const cursor = await adapter.getCursorUntilEventTypes( + firstStopEventCursor, + ['TYPE_2'] + ) + expect(cursor).toEqual(endCursor) + }) + + test('should return cursor to the next event of stop type with cursor after the previous event of stop type', async () => { + const { cursor: endCursor } = await adapter.loadEvents({ + limit: 5, + cursor: null, + }) + + await adapter.saveEvent({ + aggregateVersion: 1, + aggregateId: 'ID_6', + type: 'TYPE_2', + payload: { message: 'hello' }, + timestamp: 1, + }) + + const cursor = await adapter.getCursorUntilEventTypes( + firstStopEventCursor, + ['TYPE_2'] + ) + expect(cursor).toEqual(endCursor) + }) + + test('should return to cursor that can be used to find all next events of stop type', async () => { + const cursor = await adapter.getCursorUntilEventTypes(null, ['TYPE_2']) + const { events } = await adapter.loadEvents({ + cursor: cursor, + eventTypes: ['TYPE_2'], + limit: 8, + }) + expect(events).toHaveLength(2) + }) }) diff --git a/tests/package.json b/tests/package.json index 26ff009209..4186c040cc 100644 --- a/tests/package.json +++ b/tests/package.json @@ -17,7 +17,8 @@ "test:import-export": "jest --config=./jest.config.js --testMatch=**/import-export-sample/*.test.ts", "test:secrets": "jest --config=./jest.config.js --testMatch=**/eventstore-secrets/*.test.ts", "test:replication": "jest --config=./jest.config.js --testMatch=**/eventstore-replication/*.test.ts", - "test:freeze": "jest --config=./jest.config.js --testMatch=**/eventstore-freeze-unfreeze/*.test.ts" + "test:freeze": "jest --config=./jest.config.js --testMatch=**/eventstore-freeze-unfreeze/*.test.ts", + "test:save-load": "jest --config=./jest.config.js --testMatch=**/eventstore-save-load/*.test.ts" }, "devDependencies": { "@babel/runtime": "7.9.6",