Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Eventstore adapter: saveEvent now returns the next cursor, add getCursorUntilEventTypes #1878

Merged
merged 5 commits into from
May 21, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -6,3 +6,7 @@ export const PARTIAL_SECRET_FLAG = Symbol.for('PARTIAL_SECRET_FLAG')

export const BUFFER_SIZE = 512 * 1024
export const BATCH_SIZE = 200

export const THREAD_COUNT = 256
export const THREAD_COUNTER_BYTE_LENGTH = 6
export const CURSOR_BUFFER_SIZE = THREAD_COUNT * THREAD_COUNTER_BYTE_LENGTH
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ const createAdapter = <
setReplicationStatus,
getReplicationState,
resetReplication,
getCursorUntilEventTypes,
}: AdapterFunctions<ConnectedProps, ConnectionDependencies, Config>,
connectionDependencies: ConnectionDependencies,
options: Config
Expand Down Expand Up @@ -190,6 +191,8 @@ const createAdapter = <
setReplicationStatus: wrapMethod(adapterPool, setReplicationStatus),
getReplicationState: wrapMethod(adapterPool, getReplicationState),
resetReplication: wrapMethod(adapterPool, resetReplication),

getCursorUntilEventTypes: wrapMethod(adapterPool, getCursorUntilEventTypes),
}

Object.assign<AdapterPoolPossiblyUnconnected<ConnectedProps>, Adapter>(
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
import assert from 'assert'
import {
THREAD_COUNT,
CURSOR_BUFFER_SIZE,
THREAD_COUNTER_BYTE_LENGTH,
} from './constants'
import { Cursor } from './types'

const checkThreadCounterHexString = (threadCounter: string): void => {
assert.strictEqual(
threadCounter.length,
12,
'threadCounter in hex form must have length equal to 12'
)
}

export const initThreadArray = (): Array<number> => {
const threadCounters = new Array<number>(THREAD_COUNT)
threadCounters.fill(0)
return threadCounters
}

export const threadArrayToCursor = (threadArray: Array<number>): string => {
assert.strictEqual(
threadArray.length,
THREAD_COUNT,
'Cursor must be represented by array of 256 numbers'
)

const cursorBuffer: Buffer = Buffer.alloc(CURSOR_BUFFER_SIZE)

for (let i = 0; i < threadArray.length; ++i) {
const threadCounter = threadArray[i]
const threadCounterBuffer = threadCounterToBuffer(threadCounter)
threadCounterBuffer.copy(cursorBuffer, i * THREAD_COUNTER_BYTE_LENGTH)
}

return cursorBuffer.toString('base64')
}

export const cursorToThreadArray = (cursor: Cursor): Array<number> => {
if (cursor == null) return initThreadArray()

const cursorBuffer = Buffer.from(cursor, 'base64')

assert.strictEqual(
cursorBuffer.length,
CURSOR_BUFFER_SIZE,
'Wrong size of cursor buffer'
)

const threadCounters = new Array<number>(THREAD_COUNT)
for (let i = 0; i < cursorBuffer.length / THREAD_COUNTER_BYTE_LENGTH; i++) {
threadCounters[i] = hexStringToThreadCounter(
cursorBuffer.slice(i * 6, (i + 1) * 6).toString('hex')
)
}
return threadCounters
}

export const threadCounterToHexString = (threadCounter: number): string =>
threadCounter.toString(16).padStart(12, '0')

export const hexStringToThreadCounter = (threadCounter: string): number => {
checkThreadCounterHexString(threadCounter)
const num = parseInt(threadCounter, 16)
assert(!Number.isNaN(num))
return num
}

export const threadCounterHexStringToBuffer = (
threadCounter: string
): Buffer => {
checkThreadCounterHexString(threadCounter)
const b: Buffer = Buffer.alloc(THREAD_COUNTER_BYTE_LENGTH)

for (
let hexIndex = 0, hexPairIndex = 0;
hexIndex < threadCounter.length;
hexIndex += 2, hexPairIndex++
) {
b[hexPairIndex] = Buffer.from(
threadCounter.substring(hexIndex, hexIndex + 2),
'hex'
)[0]
}

return b
}

export const threadCounterToBuffer = (threadCounter: number): Buffer => {
return threadCounterHexStringToBuffer(threadCounterToHexString(threadCounter))
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ import wrapMethod from './wrap-method'
import wrapEventFilter from './wrap-event-filter'
import wrapDispose from './wrap-dispose'
import validateEventFilter from './validate-event-filter'
import { MAINTENANCE_MODE_AUTO, MAINTENANCE_MODE_MANUAL } from './constants'
import ConcurrentError from './concurrent-error'
import {
ResourceAlreadyExistError,
Expand Down Expand Up @@ -48,6 +47,19 @@ import {
AdapterConfigSchema,
} from './types'

export {
threadArrayToCursor,
cursorToThreadArray,
initThreadArray,
} from './cursor-operations'
export {
MAINTENANCE_MODE_AUTO,
MAINTENANCE_MODE_MANUAL,
THREAD_COUNT,
CURSOR_BUFFER_SIZE,
THREAD_COUNTER_BYTE_LENGTH,
} from './constants'

const wrappedCreateAdapter = <
ConnectedProps extends AdapterPoolConnectedProps,
ConnectionDependencies extends any,
Expand Down Expand Up @@ -99,8 +111,6 @@ export {
AlreadyFrozenError as EventstoreAlreadyFrozenError,
AlreadyUnfrozenError as EventstoreAlreadyUnfrozenError,
ReplicationAlreadyInProgress,
MAINTENANCE_MODE_AUTO,
MAINTENANCE_MODE_MANUAL,
throwBadCursor,
getNextCursor,
snapshotTrigger,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -538,14 +538,19 @@ export interface AdapterFunctions<
ConnectedProps,
NonNullable<Adapter['resetReplication']>
>

getCursorUntilEventTypes?: PoolMethod<
ConnectedProps,
NonNullable<Adapter['getCursorUntilEventTypes']>
>
}

export interface Adapter {
loadEvents: (filter: EventFilter) => Promise<EventsWithCursor>
importEvents: (options?: Partial<ImportOptions>) => ImportEventsStream
exportEvents: (options?: Partial<ExportOptions>) => ExportEventsStream
getLatestEvent: (filter: EventFilter) => Promise<SavedEvent | null>
saveEvent: (event: InputEvent) => Promise<void>
saveEvent: (event: InputEvent) => Promise<string>
init: () => Promise<void>
drop: () => Promise<void>
dispose: () => Promise<void>
Expand Down Expand Up @@ -614,4 +619,9 @@ export interface Adapter {
setReplicationPaused?: (pause: boolean) => Promise<void>
getReplicationState?: () => Promise<ReplicationState>
resetReplication?: () => Promise<void>

getCursorUntilEventTypes?: (
cursor: Cursor,
untilEventTypes: Array<InputEvent['type']>
) => Promise<string>
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
import { AssertionError } from 'assert'
import { cursorToThreadArray, THREAD_COUNT, initThreadArray } from '../src'

describe('calculating array of threadCounters from cursor', () => {
test('should throw AssertionError when cursor has an invalid size', () => {
expect(() => cursorToThreadArray('AAAAAAAAAAAAAAAA')).toThrow(
AssertionError
)
})

test('should return all zero array from null cursor', () => {
const arr = cursorToThreadArray(null)
expect(arr).toEqual(initThreadArray())
})

test('should return all zero array from initial cursor', () => {
const cursor = 'AAAAAAAA'.repeat(THREAD_COUNT)
const arr = cursorToThreadArray(cursor)
expect(arr).toEqual(initThreadArray())
})

test('should return expected results', () => {
const cursor =
// eslint-disable-next-line no-useless-concat
'AAAAAAAB' + 'AAAAAAAa' + 'AAAAAAAA'.repeat(THREAD_COUNT - 3) + 'BAAAAAAA'
const arr = cursorToThreadArray(cursor)
const expectedArr = initThreadArray()
expectedArr[0] = 1
expectedArr[1] = 26
expectedArr[255] = 64 ** 7
expect(arr).toEqual(expectedArr)
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
import { AssertionError } from 'assert'
import {
threadArrayToCursor,
cursorToThreadArray,
initThreadArray,
} from '../src'

describe('calculating cursor from array of thread counters', () => {
test('should throw AssertionError when array has an invalid size', () => {
const arr = new Array<number>(10)
expect(() => threadArrayToCursor(arr)).toThrow(AssertionError)
})

test('should return expected results', () => {
const arr = initThreadArray()
arr[0] = 1
arr[1] = 61
arr[2] = 64
arr[3] = 64 ** 7
arr[4] = 26
arr[5] = 62
arr[6] = 63
arr[255] = 2

const cursor = threadArrayToCursor(arr)
expect(cursor.substring(0, 8)).toEqual('AAAAAAAB')
expect(cursor.substring(8, 8 * 2)).toEqual('AAAAAAA9')
expect(cursor.substring(8 * 2, 8 * 3)).toEqual('AAAAAABA')
expect(cursor.substring(8 * 3, 8 * 4)).toEqual('BAAAAAAA')
expect(cursor.substring(8 * 4, 8 * 5)).toEqual('AAAAAAAa')
expect(cursor.substring(8 * 5, 8 * 6)).toEqual('AAAAAAA+')
expect(cursor.substring(8 * 6, 8 * 7)).toEqual('AAAAAAA/')
expect(cursor.substring(8 * 7, 8 * 8)).toEqual('AAAAAAAA')
expect(cursor.substring(8 * 255, 8 * 256)).toEqual('AAAAAAAC')

const translatedArr = cursorToThreadArray(cursor)
expect(translatedArr).toEqual(arr)
})
})
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
import { AdapterPool } from './types'
import {
cursorToThreadArray,
SavedEvent,
threadArrayToCursor,
initThreadArray,
Cursor,
} from '@resolve-js/eventstore-base'

const getCursorUntilEventTypes = async (
{ database, escapeId, escape, eventsTableName, shapeEvent }: AdapterPool,
cursor: Cursor,
untilEventTypes: Array<SavedEvent['type']>
): Promise<string> => {
if (untilEventTypes.length < 1) {
throw new Error('Must define at least one event type')
}

const tableNameAsId = escapeId(eventsTableName)

const vectorConditions = cursorToThreadArray(cursor)

const minThreadCounterConditions = `${vectorConditions
.map(
(threadCounter, threadId) =>
`${escapeId('threadId')} = ${+threadId} AND ${escapeId(
'threadCounter'
)} >= ${+threadCounter} `
)
.join(' OR ')}`

const rows = (await database.all(
`SELECT "threadId", MIN("threadCounter") AS "threadCounter" FROM (
SELECT "threadId", MIN("threadCounter") AS "threadCounter" FROM ${tableNameAsId} WHERE type IN
(${untilEventTypes.map((t) => escape(t)).join(', ')})
AND (${minThreadCounterConditions})
GROUP BY "threadId"
UNION ALL
SELECT "threadId", MAX("threadCounter")+1 AS "threadCounter" FROM ${tableNameAsId} GROUP BY "threadId")
GROUP BY "threadId"`
)) as Array<{
threadId: SavedEvent['threadId']
threadCounter: SavedEvent['threadCounter']
}>

const threadCounters = initThreadArray()
for (const row of rows) {
threadCounters[row.threadId] = row.threadCounter
}
return threadArrayToCursor(threadCounters)
}

export default getCursorUntilEventTypes
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import setReplicationIterator from './set-replication-iterator'
import setReplicationPaused from './set-replication-paused'
import getReplicationState from './get-replication-state'
import resetReplication from './reset-replication'
import getCursorUntilEventTypes from './get-cursor-until-event-types'

import type { Adapter } from '@resolve-js/eventstore-base'
import type { ConnectionDependencies, SqliteAdapterConfig } from './types'
Expand Down Expand Up @@ -87,6 +88,7 @@ const createSqliteAdapter = (options: SqliteAdapterConfig): Adapter => {
setReplicationPaused,
getReplicationState,
resetReplication,
getCursorUntilEventTypes,
},
{ sqlite, tmp, os, fs } as ConnectionDependencies,
options
Expand Down
Loading