Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add injectEvents, run import-export tests on postgres too #1947

Merged
merged 3 commits into from
Jul 13, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ const createAdapter = <
dropFinal,
dispose,
injectEvent,
injectEvents,
freeze,
unfreeze,
shapeEvent,
Expand Down Expand Up @@ -137,6 +138,7 @@ const createAdapter = <

const connectedProps: Partial<ConnectedProps> = {
injectEvent: wrapMethod(adapterPool, injectEvent),
injectEvents: wrapMethod(adapterPool, injectEvents),
injectSecret: wrapMethod(adapterPool, injectSecret),
loadEventsByCursor: wrapMethod(adapterPool, loadEventsByCursor),
loadEventsByTimestamp: wrapMethod(adapterPool, loadEventsByTimestamp),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,15 @@ import {
} from './constants'

import { ResourceNotExistError } from './resource-errors'
import {
import type {
AdapterPoolConnectedProps,
AdapterPoolPossiblyUnconnected,
ImportOptions,
ImportEventsStream,
} from './types'

const MAX_EVENTS_BATCH_BYTE_SIZE = 32768

export const getStringifiedEvent = (params: {
buffer: Buffer
bufferSize: number
Expand Down Expand Up @@ -53,6 +55,23 @@ export const getStringifiedEvent = (params: {
}
}

async function flushEvents(stream: any) {
const eventsToInject = stream.eventsToInject
stream.eventsToInject = []
stream.currentBatchByteSize = 0

if (eventsToInject.length === 0) {
return
}

await stream.pool
.injectEvents(eventsToInject)
.then(() => {
stream.savedEventsCount += eventsToInject.length
})
.catch(stream.saveEventErrors.push.bind(stream.saveEventErrors))
}

const EventStream = function (
this: any,
{ pool, maintenanceMode, byteOffset }: any
Expand All @@ -65,14 +84,14 @@ const EventStream = function (
this.beginPosition = 0
this.endPosition = 0
this.vacantSize = BUFFER_SIZE
this.saveEventPromiseSet = new Set()
this.saveEventErrors = []
this.timestamp = 0
this.maintenanceMode = maintenanceMode
this.isMaintenanceInProgress = false
this.parsedEventsCount = 0
this.bypassMode = false
this.savedEventsCount = 0
this.currentBatchByteSize = 0
this.eventsToInject = []

this.on('timeout', () => {
this.externalTimeout = true
Expand All @@ -96,7 +115,7 @@ EventStream.prototype._write = async function (
try {
await this.pool.waitConnect()

const { dropEvents, initEvents, freeze, injectEvent }: any = this.pool
const { dropEvents, initEvents, freeze }: any = this.pool

if (
this.maintenanceMode === MAINTENANCE_MODE_AUTO &&
Expand Down Expand Up @@ -176,26 +195,20 @@ EventStream.prototype._write = async function (

this.timestamp = Math.max(this.timestamp, event.timestamp)

const saveEventPromise = injectEvent(event)
.then(() => {
this.savedEventsCount++
})
.catch(this.saveEventErrors.push.bind(this.saveEventErrors))
void saveEventPromise.then(
this.saveEventPromiseSet.delete.bind(
this.saveEventPromiseSet,
saveEventPromise
)
)
this.saveEventPromiseSet.add(saveEventPromise)
if (
this.currentBatchByteSize + eventByteLength >
MAX_EVENTS_BATCH_BYTE_SIZE ||
this.eventsToInject.length >= BATCH_SIZE
) {
await flushEvents(this)

if (this.parsedEventsCount++ >= BATCH_SIZE) {
await Promise.all([...this.saveEventPromiseSet])
if (this.externalTimeout === true) {
this.bypassMode = true
}
this.parsedEventsCount = 0
}

this.eventsToInject.push(event)
this.currentBatchByteSize += eventByteLength
}

callback()
Expand All @@ -207,7 +220,7 @@ EventStream.prototype._write = async function (
EventStream.prototype._final = async function (callback: any): Promise<void> {
if (this.bypassMode) {
try {
await Promise.all([...this.saveEventPromiseSet])
await flushEvents(this)
callback()
} catch (err) {
callback(err)
Expand All @@ -219,28 +232,16 @@ EventStream.prototype._final = async function (callback: any): Promise<void> {

try {
await this.pool.waitConnect()
const { unfreeze, injectEvent } = this.pool
const { unfreeze } = this.pool

if (this.vacantSize !== BUFFER_SIZE) {
let stringifiedEvent = null
let eventByteLength = 0

if (this.beginPosition < this.endPosition) {
stringifiedEvent = this.buffer
.slice(this.beginPosition, this.endPosition)
.toString(this.encoding)

eventByteLength += this.endPosition - this.beginPosition
} else {
stringifiedEvent = this.buffer
.slice(this.beginPosition, BUFFER_SIZE)
.toString(this.encoding)
stringifiedEvent += this.buffer
.slice(0, this.endPosition)
.toString(this.encoding)

eventByteLength += BUFFER_SIZE - this.beginPosition + this.endPosition
}
const { stringifiedEvent, eventByteLength } = getStringifiedEvent({
buffer: this.buffer,
bufferSize: BUFFER_SIZE,
encoding: this.encoding,
beginPosition: this.beginPosition,
endPosition: this.endPosition,
})

let event: any = PARTIAL_EVENT_FLAG
try {
Expand All @@ -252,22 +253,12 @@ EventStream.prototype._final = async function (callback: any): Promise<void> {

this.byteOffset += eventByteLength

const saveEventPromise: any = injectEvent(event)
.then(() => {
this.savedEventsCount++
})
.catch(this.saveEventErrors.push.bind(this.saveEventErrors))
void saveEventPromise.then(
this.saveEventPromiseSet.delete.bind(
this.saveEventPromiseSet,
saveEventPromise
)
)
this.saveEventPromiseSet.add(saveEventPromise)
this.eventsToInject.push(event)
this.currentBatchByteSize += eventByteLength
}
}

await Promise.all([...this.saveEventPromiseSet])
await flushEvents(this)

if (
this.maintenanceMode === MAINTENANCE_MODE_AUTO &&
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,7 @@ export type AdapterPoolPrimalProps = {

export type AdapterPoolConnectedProps = Adapter & {
injectEvent: (event: SavedEvent) => Promise<void>
injectEvents: (events: SavedEvent[]) => Promise<void>
injectSecret?: (secretRecord: SecretRecord) => Promise<void>

loadEventsByTimestamp: (filter: TimestampFilter) => Promise<EventsWithCursor>
Expand Down Expand Up @@ -452,6 +453,10 @@ export interface AdapterFunctions<
ConnectedProps,
AdapterPoolConnectedProps['injectEvent']
>
injectEvents: PoolMethod<
ConnectedProps,
AdapterPoolConnectedProps['injectEvents']
>
loadEventsByCursor: PoolMethod<
ConnectedProps,
AdapterPoolConnectedProps['loadEventsByCursor']
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import getLatestEvent from './get-latest-event'
import getSecret from './get-secret'
import initEvents from './init-events'
import injectEvent from './inject-event'
import injectEvents from './inject-events'
import loadEventsByCursor from './load-events-by-cursor'
import loadEventsByTimestamp from './load-events-by-timestamp'
import loadSnapshot from './load-snapshot'
Expand Down Expand Up @@ -69,6 +70,7 @@ const createSqliteAdapter = (options: SqliteAdapterConfig): Adapter => {
initSecrets,
initFinal,
injectEvent,
injectEvents,
loadEventsByCursor,
loadEventsByTimestamp,
loadSnapshot,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
import { AdapterPool } from './types'
import { SavedEvent } from '@resolve-js/eventstore-base'

const injectEvents = async function (
{ eventsTableName, database, escapeId, escape }: AdapterPool,
events: SavedEvent[]
): Promise<void> {
if (events.length === 0) {
return
}

for (const event of events) {
const missingFields = []
if (event.threadId == null) {
missingFields.push(`"threadId"`)
}
if (event.threadCounter == null) {
missingFields.push(`"threadCounter"`)
}
if (event.timestamp == null) {
missingFields.push(`"timestamp"`)
}
if (missingFields.length > 0) {
throw new Error(
`The field ${missingFields.join(', ')} is required in ${JSON.stringify(
event
)}`
)
}
}

const eventsTableNameAsId = escapeId(eventsTableName)

// prettier-ignore
await database.exec(`INSERT INTO ${eventsTableNameAsId}(
"threadId",
"threadCounter",
"timestamp",
"aggregateId",
"aggregateVersion",
"type",
"payload"
) VALUES ${events
.map(
(event) => `(
${+event.threadId},
${+event.threadCounter},
${+event.timestamp},
${escape(event.aggregateId)},
${+event.aggregateVersion},
${escape(event.type)},
json(CAST(${
event.payload != null
? escape(JSON.stringify(event.payload))
: escape('null')
} AS BLOB))
)`
)
.join(',')}`)
}

export default injectEvents
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import getEventSubscribers from './get-event-subscribers'
import getLatestEvent from './get-latest-event'
import saveEvent from './save-event'
import injectEvent from './inject-event'
import injectEvents from './inject-events'
import freeze from './freeze'
import unfreeze from './unfreeze'
import shapeEvent from './shape-event'
Expand Down Expand Up @@ -56,6 +57,7 @@ const createMysqlAdapter = (options: MysqlAdapterConfig): Adapter => {
dropFinal,
dispose,
injectEvent,
injectEvents,
freeze,
unfreeze,
shapeEvent,
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
import { AdapterPool } from './types'
import { SavedEvent } from '@resolve-js/eventstore-base'

const injectEvents = async function (
pool: AdapterPool,
events: SavedEvent[]
): Promise<void> {
return
}

export default injectEvents
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import saveEvent from './save-event'
import fullJitter from './full-jitter'
import executeStatement from './execute-statement'
import injectEvent from './inject-event'
import injectEvents from './inject-events'
import coercer from './coercer'
import escapeId from './escape-id'
import escape from './escape'
Expand Down Expand Up @@ -94,6 +95,7 @@ const createPostgresqlServerlessAdapter = (
removeEventSubscriber,
getEventSubscribers,
injectEvent,
injectEvents,
injectSecret,
loadSecrets,
replicateEvents,
Expand Down
Loading