Skip to content

Commit

Permalink
Ability to delay next in readmodel build (#2090)
Browse files Browse the repository at this point in the history
* TImeout in next event listener

* Use timeout for next in replicator

Co-authored-by: VladIhost <[email protected]>
  • Loading branch information
FreeSlave and IhostVlad authored Oct 27, 2021
1 parent 1d858ee commit 46b0bc6
Show file tree
Hide file tree
Showing 7 changed files with 175 additions and 81 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ export type ReadModelLedger = {
IsPaused: boolean
}

export type MethodNext = () => Promise<void>
export type MethodNext = (
timeout?: number,
notificationExtraPayload?: object
) => Promise<void>
export type MethodGetRemainingTime = () => number
export type MethodGetEncryption = () => (
event: ReadModelEvent
Expand Down Expand Up @@ -289,6 +292,7 @@ export type BuildInfo = {
notificationId: string
sendTime: number
coldStart?: boolean
[key: string]: any
}

export type AdapterConnection<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ function statusDataToError(
}
}

const isHTTPServiceError = (name: string) =>
name === 'FetchError' || name === 'AbortError' || name === 'ServiceError'

const getBuildDelay = (iterationNumber: number) => {
return 30000 * 2 ** iterationNumber
}

const BATCH_PROCESSING_POLL_MS = 50

const build: ExternalMethods['build'] = async (
Expand All @@ -61,15 +68,30 @@ const build: ExternalMethods['build'] = async (
modelInterop,
next,
eventstoreAdapter,
getVacantTimeInMillis
getVacantTimeInMillis,
buildInfo
) => {
const log = getLog('build')

await eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis)
let iterationNumber = buildInfo.iterationNumber ?? 0

const delayNext = async (delay: number, error: any) => {
log.debug(
`Delaying next for ${delay}ms due to service error ${
error ? error.name + ': ' + error.message : ''
}`
)
await next(delay, { iterationNumber: iterationNumber + 1 })
}

const state = await basePool.getReplicationState(basePool)
if (state.status === 'error') {
log.error('Refuse to start or continue replication with error state')
log.error(
`Refuse to start or continue replication with error state: ${state.statusData}`
)
return
} else if (state.status === 'serviceError') {
await delayNext(getBuildDelay(iterationNumber), state.statusData)
return
}
if (state.paused) {
Expand All @@ -78,18 +100,21 @@ const build: ExternalMethods['build'] = async (
}

const timeLeft = getVacantTimeInMillis()
const occupationResult = await basePool.occupyReplication(basePool, timeLeft)
if (!occupationResult.success) {
log.debug(
`Could not occupy replication process. ${occupationResult.message ?? ''}`
)
try {
const result = await basePool.occupyReplication(basePool, timeLeft)
if (!result.success) {
log.error(`Could not occupy replication process: ${result.message}`)
return
}
} catch (error) {
await delayNext(getBuildDelay(iterationNumber), error)
return
}

let iterator = state.iterator
let localContinue = true
const sleepAfterServiceErrorMs = 3000

await eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis)
let eventLoader: EventLoader

const onExit = async () => {
Expand All @@ -101,7 +126,7 @@ const build: ExternalMethods['build'] = async (
try {
await basePool.releaseReplication(basePool)
} catch (error) {
log.error(error)
if (!isHTTPServiceError(error.name)) log.error(error)
}
}

Expand All @@ -119,18 +144,14 @@ const build: ExternalMethods['build'] = async (
)
} catch (error) {
await onExit()
if (RequestTimeoutError.is(error) || ServiceBusyError.is(error)) {
log.debug(
`Got non-fatal error, continuing on the next step. ${error.message}`
)
if (RequestTimeoutError.is(error)) {
await next()
return
} else if (ConnectionError.is(error)) {
log.error(error)
return
} else if (ServiceBusyError.is(error)) {
await delayNext(getBuildDelay(iterationNumber), error)
} else {
throw error
log.error(error)
}
return
}

log.debug('Starting or continuing replication process')
Expand All @@ -150,16 +171,15 @@ const build: ExternalMethods['build'] = async (
loadEventsResult.events
)
} catch (error) {
if (RequestTimeoutError.is(error) || ServiceBusyError.is(error)) {
log.debug(
`Got non-fatal error, continuing on the next step. ${error.message}`
)
await onExit()
return
await onExit()
if (RequestTimeoutError.is(error)) {
await next()
} else if (ServiceBusyError.is(error)) {
await delayNext(getBuildDelay(iterationNumber), error)
} else {
await onExit()
throw error
log.error(error)
}
return
}
const { cursor: nextCursor, events } = loadEventsResult
const { existingSecrets, deletedSecrets } = gatheredSecrets
Expand Down Expand Up @@ -194,6 +214,7 @@ const build: ExternalMethods['build'] = async (
? (state.statusData.appliedEventsCount as number)
: 0
wasPaused = state.paused
iterationNumber = 0
break
} else if (state.status === 'serviceError') {
lastError = statusDataToError(state.statusData, {
Expand All @@ -217,43 +238,44 @@ const build: ExternalMethods['build'] = async (
lastError = error
}

const isBuildSuccess = lastError == null && appliedEventsCount > 0

if (isBuildSuccess) {
if (appliedEventsCount > 0) {
log.verbose(`Replicated batch of ${appliedEventsCount} events`)
}

let delay = 0
let shouldContinue = appliedEventsCount > 0
if (lastError) {
log.error(lastError)
if (
lastError.name === 'ServiceError' ||
lastError.name === 'AbortError' ||
lastError.name === 'FetchError'
) {
const vacantTime = getVacantTimeInMillis()
if (vacantTime > 0) {
await sleep(Math.min(vacantTime, sleepAfterServiceErrorMs))
}
if (isHTTPServiceError(lastError.name)) {
delay = getBuildDelay(iterationNumber)
shouldContinue = true
localContinue = false
} else {
shouldContinue = false
}
}

if (getVacantTimeInMillis() < 0) {
localContinue = false
}

if (isBuildSuccess && wasPaused) {
if (wasPaused) {
log.debug('Pausing replication as requested')
await onExit()
return
}

if (isBuildSuccess && localContinue) {
if (shouldContinue && localContinue && delay === 0) {
log.verbose('Continuing replication in the local build loop')
} else {
await onExit()
if (isBuildSuccess) {
log.debug('Calling next in build')
await next()
if (lastError)
log.error(`Exiting replication loop due to error ${lastError.message}`)
if (shouldContinue) {
if (delay > 0) {
await delayNext(delay, lastError)
} else {
await next()
}
}
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({
return checkResult
}

const defaultValues = {
paused: false,
iterator: null,
successEvent: null,
locked: false,
}

try {
const response = await fetch(
`${targetApplicationUrl}${REPLICATION_STATE.endpoint}`
Expand All @@ -25,10 +32,7 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({
name: response.statusText,
message: (state as any).message ?? response.statusText,
},
paused: false,
iterator: null,
successEvent: null,
locked: false,
...defaultValues,
}
}
return state
Expand All @@ -45,13 +49,18 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({
message: error.message as string,
stack: error.stack ? (error.stack as string) : null,
},
paused: false,
iterator: null,
successEvent: null,
locked: false,
...defaultValues,
}
} else {
throw error
return {
status: 'error',
statusData: {
name: error.name as string,
message: error.message as string,
stack: error.stack ? (error.stack as string) : null,
},
...defaultValues,
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import STS from 'aws-sdk/clients/sts'
import type { EventSubscriberNotifier } from '@resolve-js/runtime-base'
import {
createEventSubscriberNotification,
Expand Down Expand Up @@ -26,6 +27,8 @@ type NotifierRuntime = {
invokeLambdaAsync: Function
}

export const LAMBDA_TO_STEP_FUNCTION_COST_EXPENSE_THRESHOLD_MS = 3000

export const waitForSubscriber = async (isSaga = false) =>
await new Promise((resolve) => setTimeout(resolve, isSaga ? 10000 : 1000))

Expand Down Expand Up @@ -136,16 +139,45 @@ export const eventSubscriberNotifierFactory = async (
? `arn:aws:sqs:${region}:${accountId}:${userId}-${eventSubscriberScope}-${eventSubscriber}`
: functionArn

const invokeBuildAsync = async (parameters: { eventSubscriber: string }) =>
useSqs
? await sendSqsMessage(
`${userId}-${eventSubscriberScope}-${parameters.eventSubscriber}`,
parameters
)
: await invokeLambdaAsync(functionName, {
resolveSource: 'BuildEventSubscriber',
...parameters,
})
const invokeBuildAsync = async (
parameters: { eventSubscriber: string },
timeout?: number
) => {
if (useSqs) {
return await sendSqsMessage(
`${userId}-${eventSubscriberScope}-${parameters.eventSubscriber}`,
parameters
)
}
const lambdaEvent = {
resolveSource: 'BuildEventSubscriber',
...parameters,
}
if (
timeout == null ||
timeout < LAMBDA_TO_STEP_FUNCTION_COST_EXPENSE_THRESHOLD_MS
) {
await new Promise((resolve) => setTimeout(resolve, timeout))
await invokeLambdaAsync(functionName, lambdaEvent)
} else {
const { Arn } = await new STS().getCallerIdentity().promise()
await invokeFunction({
Region: process.env.AWS_REGION as string,
FunctionName: process.env.RESOLVE_SCHEDULER_LAMBDA_ARN as string,
Payload: {
functionName: process.env.AWS_LAMBDA_FUNCTION_NAME,
event: lambdaEvent,
date: new Date(Date.now() + timeout).toISOString(),
validationRoleArn: Arn,
principial: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
sessionToken: process.env.AWS_SESSION_TOKEN,
},
},
})
}
}

const ensureQueue = async (name?: string) => {
if (!useSqs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,33 @@ const serializeState = async ({ state }: { state: any }): Promise<string> => {
const next = async (
pool: ReadModelPool,
eventSubscriber: string,
timeout?: number,
notificationExtraPayload?: object,
...args: any[]
) => {
if (args.length > 0) {
throw new TypeError('Next should be invoked with no arguments')
}
await pool.invokeBuildAsync({
eventSubscriber,
initiator: 'read-model-next',
notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`,
sendTime: Date.now(),
})
if (timeout != null && (isNaN(+timeout) || +timeout < 0)) {
throw new TypeError('Timeout should be non-negative integer')
}
if (
notificationExtraPayload != null &&
notificationExtraPayload.constructor !== Object
) {
throw new TypeError('Notification extra payload should be plain object')
}

await pool.invokeBuildAsync(
{
eventSubscriber,
initiator: 'read-model-next',
notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`,
sendTime: Date.now(),
...notificationExtraPayload,
},
timeout != null ? Math.floor(+timeout) : timeout
)
}

const updateCustomReadModel = async (
Expand Down
4 changes: 3 additions & 1 deletion packages/runtime/runtimes/runtime-base/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ export type EventSubscriberNotification = {
sendTime: number
event?: Event
cursor?: string
[key: string]: any
}

export type InvokeBuildAsync = (
parameters: EventSubscriberNotification
parameters: EventSubscriberNotification,
timeout?: number
) => Promise<void>

export type { BuildTimeConstants }
Expand Down
Loading

0 comments on commit 46b0bc6

Please sign in to comment.