Skip to content

Commit

Permalink
Add injectEvents, run import-export tests on postgres too (#1947)
Browse files Browse the repository at this point in the history
* Add injectEvents, run import-export tests on postgres too

* Fix eslint
  • Loading branch information
FreeSlave authored Jul 13, 2021
1 parent b3d4871 commit 7ef0761
Show file tree
Hide file tree
Showing 13 changed files with 424 additions and 104 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const createAdapter = <
dropFinal,
dispose,
injectEvent,
injectEvents,
freeze,
unfreeze,
shapeEvent,
Expand Down Expand Up @@ -137,6 +138,7 @@ const createAdapter = <

const connectedProps: Partial<ConnectedProps> = {
injectEvent: wrapMethod(adapterPool, injectEvent),
injectEvents: wrapMethod(adapterPool, injectEvents),
injectSecret: wrapMethod(adapterPool, injectSecret),
loadEventsByCursor: wrapMethod(adapterPool, loadEventsByCursor),
loadEventsByTimestamp: wrapMethod(adapterPool, loadEventsByTimestamp),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import {
} from './constants'

import { ResourceNotExistError } from './resource-errors'
import {
import type {
AdapterPoolConnectedProps,
AdapterPoolPossiblyUnconnected,
ImportOptions,
ImportEventsStream,
} from './types'

const MAX_EVENTS_BATCH_BYTE_SIZE = 32768

export const getStringifiedEvent = (params: {
buffer: Buffer
bufferSize: number
Expand Down Expand Up @@ -53,6 +55,23 @@ export const getStringifiedEvent = (params: {
}
}

async function flushEvents(stream: any) {
const eventsToInject = stream.eventsToInject
stream.eventsToInject = []
stream.currentBatchByteSize = 0

if (eventsToInject.length === 0) {
return
}

await stream.pool
.injectEvents(eventsToInject)
.then(() => {
stream.savedEventsCount += eventsToInject.length
})
.catch(stream.saveEventErrors.push.bind(stream.saveEventErrors))
}

const EventStream = function (
this: any,
{ pool, maintenanceMode, byteOffset }: any
Expand All @@ -65,14 +84,14 @@ const EventStream = function (
this.beginPosition = 0
this.endPosition = 0
this.vacantSize = BUFFER_SIZE
this.saveEventPromiseSet = new Set()
this.saveEventErrors = []
this.timestamp = 0
this.maintenanceMode = maintenanceMode
this.isMaintenanceInProgress = false
this.parsedEventsCount = 0
this.bypassMode = false
this.savedEventsCount = 0
this.currentBatchByteSize = 0
this.eventsToInject = []

this.on('timeout', () => {
this.externalTimeout = true
Expand All @@ -96,7 +115,7 @@ EventStream.prototype._write = async function (
try {
await this.pool.waitConnect()

const { dropEvents, initEvents, freeze, injectEvent }: any = this.pool
const { dropEvents, initEvents, freeze }: any = this.pool

if (
this.maintenanceMode === MAINTENANCE_MODE_AUTO &&
Expand Down Expand Up @@ -176,26 +195,20 @@ EventStream.prototype._write = async function (

this.timestamp = Math.max(this.timestamp, event.timestamp)

const saveEventPromise = injectEvent(event)
.then(() => {
this.savedEventsCount++
})
.catch(this.saveEventErrors.push.bind(this.saveEventErrors))
void saveEventPromise.then(
this.saveEventPromiseSet.delete.bind(
this.saveEventPromiseSet,
saveEventPromise
)
)
this.saveEventPromiseSet.add(saveEventPromise)
if (
this.currentBatchByteSize + eventByteLength >
MAX_EVENTS_BATCH_BYTE_SIZE ||
this.eventsToInject.length >= BATCH_SIZE
) {
await flushEvents(this)

if (this.parsedEventsCount++ >= BATCH_SIZE) {
await Promise.all([...this.saveEventPromiseSet])
if (this.externalTimeout === true) {
this.bypassMode = true
}
this.parsedEventsCount = 0
}

this.eventsToInject.push(event)
this.currentBatchByteSize += eventByteLength
}

callback()
Expand All @@ -207,7 +220,7 @@ EventStream.prototype._write = async function (
EventStream.prototype._final = async function (callback: any): Promise<void> {
if (this.bypassMode) {
try {
await Promise.all([...this.saveEventPromiseSet])
await flushEvents(this)
callback()
} catch (err) {
callback(err)
Expand All @@ -219,28 +232,16 @@ EventStream.prototype._final = async function (callback: any): Promise<void> {

try {
await this.pool.waitConnect()
const { unfreeze, injectEvent } = this.pool
const { unfreeze } = this.pool

if (this.vacantSize !== BUFFER_SIZE) {
let stringifiedEvent = null
let eventByteLength = 0

if (this.beginPosition < this.endPosition) {
stringifiedEvent = this.buffer
.slice(this.beginPosition, this.endPosition)
.toString(this.encoding)

eventByteLength += this.endPosition - this.beginPosition
} else {
stringifiedEvent = this.buffer
.slice(this.beginPosition, BUFFER_SIZE)
.toString(this.encoding)
stringifiedEvent += this.buffer
.slice(0, this.endPosition)
.toString(this.encoding)

eventByteLength += BUFFER_SIZE - this.beginPosition + this.endPosition
}
const { stringifiedEvent, eventByteLength } = getStringifiedEvent({
buffer: this.buffer,
bufferSize: BUFFER_SIZE,
encoding: this.encoding,
beginPosition: this.beginPosition,
endPosition: this.endPosition,
})

let event: any = PARTIAL_EVENT_FLAG
try {
Expand All @@ -252,22 +253,12 @@ EventStream.prototype._final = async function (callback: any): Promise<void> {

this.byteOffset += eventByteLength

const saveEventPromise: any = injectEvent(event)
.then(() => {
this.savedEventsCount++
})
.catch(this.saveEventErrors.push.bind(this.saveEventErrors))
void saveEventPromise.then(
this.saveEventPromiseSet.delete.bind(
this.saveEventPromiseSet,
saveEventPromise
)
)
this.saveEventPromiseSet.add(saveEventPromise)
this.eventsToInject.push(event)
this.currentBatchByteSize += eventByteLength
}
}

await Promise.all([...this.saveEventPromiseSet])
await flushEvents(this)

if (
this.maintenanceMode === MAINTENANCE_MODE_AUTO &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ export type AdapterPoolPrimalProps = {

export type AdapterPoolConnectedProps = Adapter & {
injectEvent: (event: SavedEvent) => Promise<void>
injectEvents: (events: SavedEvent[]) => Promise<void>
injectSecret?: (secretRecord: SecretRecord) => Promise<void>

loadEventsByTimestamp: (filter: TimestampFilter) => Promise<EventsWithCursor>
Expand Down Expand Up @@ -452,6 +453,10 @@ export interface AdapterFunctions<
ConnectedProps,
AdapterPoolConnectedProps['injectEvent']
>
injectEvents: PoolMethod<
ConnectedProps,
AdapterPoolConnectedProps['injectEvents']
>
loadEventsByCursor: PoolMethod<
ConnectedProps,
AdapterPoolConnectedProps['loadEventsByCursor']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import getLatestEvent from './get-latest-event'
import getSecret from './get-secret'
import initEvents from './init-events'
import injectEvent from './inject-event'
import injectEvents from './inject-events'
import loadEventsByCursor from './load-events-by-cursor'
import loadEventsByTimestamp from './load-events-by-timestamp'
import loadSnapshot from './load-snapshot'
Expand Down Expand Up @@ -69,6 +70,7 @@ const createSqliteAdapter = (options: SqliteAdapterConfig): Adapter => {
initSecrets,
initFinal,
injectEvent,
injectEvents,
loadEventsByCursor,
loadEventsByTimestamp,
loadSnapshot,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { AdapterPool } from './types'
import { SavedEvent } from '@resolve-js/eventstore-base'

const injectEvents = async function (
{ eventsTableName, database, escapeId, escape }: AdapterPool,
events: SavedEvent[]
): Promise<void> {
if (events.length === 0) {
return
}

for (const event of events) {
const missingFields = []
if (event.threadId == null) {
missingFields.push(`"threadId"`)
}
if (event.threadCounter == null) {
missingFields.push(`"threadCounter"`)
}
if (event.timestamp == null) {
missingFields.push(`"timestamp"`)
}
if (missingFields.length > 0) {
throw new Error(
`The field ${missingFields.join(', ')} is required in ${JSON.stringify(
event
)}`
)
}
}

const eventsTableNameAsId = escapeId(eventsTableName)

// prettier-ignore
await database.exec(`INSERT INTO ${eventsTableNameAsId}(
"threadId",
"threadCounter",
"timestamp",
"aggregateId",
"aggregateVersion",
"type",
"payload"
) VALUES ${events
.map(
(event) => `(
${+event.threadId},
${+event.threadCounter},
${+event.timestamp},
${escape(event.aggregateId)},
${+event.aggregateVersion},
${escape(event.type)},
json(CAST(${
event.payload != null
? escape(JSON.stringify(event.payload))
: escape('null')
} AS BLOB))
)`
)
.join(',')}`)
}

export default injectEvents
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import getEventSubscribers from './get-event-subscribers'
import getLatestEvent from './get-latest-event'
import saveEvent from './save-event'
import injectEvent from './inject-event'
import injectEvents from './inject-events'
import freeze from './freeze'
import unfreeze from './unfreeze'
import shapeEvent from './shape-event'
Expand Down Expand Up @@ -56,6 +57,7 @@ const createMysqlAdapter = (options: MysqlAdapterConfig): Adapter => {
dropFinal,
dispose,
injectEvent,
injectEvents,
freeze,
unfreeze,
shapeEvent,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { AdapterPool } from './types'
import { SavedEvent } from '@resolve-js/eventstore-base'

const injectEvents = async function (
pool: AdapterPool,
events: SavedEvent[]
): Promise<void> {
return
}

export default injectEvents
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import saveEvent from './save-event'
import fullJitter from './full-jitter'
import executeStatement from './execute-statement'
import injectEvent from './inject-event'
import injectEvents from './inject-events'
import coercer from './coercer'
import escapeId from './escape-id'
import escape from './escape'
Expand Down Expand Up @@ -94,6 +95,7 @@ const createPostgresqlServerlessAdapter = (
removeEventSubscriber,
getEventSubscribers,
injectEvent,
injectEvents,
injectSecret,
loadSecrets,
replicateEvents,
Expand Down
Loading

0 comments on commit 7ef0761

Please sign in to comment.