Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ability to delay next in readmodel build #2090

Merged
merged 3 commits into from
Oct 27, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -247,7 +247,10 @@ export type ReadModelLedger = {
IsPaused: boolean
}

export type MethodNext = () => Promise<void>
export type MethodNext = (
timeout?: number,
notificationExtraPayload?: object
) => Promise<void>
export type MethodGetRemainingTime = () => number
export type MethodGetEncryption = () => (
event: ReadModelEvent
Expand Down Expand Up @@ -289,6 +292,7 @@ export type BuildInfo = {
notificationId: string
sendTime: number
coldStart?: boolean
[key: string]: any
}

export type AdapterConnection<
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,13 @@ function statusDataToError(
}
}

const isHTTPServiceError = (name: string) =>
name === 'FetchError' || name === 'AbortError' || name === 'ServiceError'

const getBuildDelay = (iterationNumber: number) => {
return 30000 * 2 ** iterationNumber
}

const BATCH_PROCESSING_POLL_MS = 50

const build: ExternalMethods['build'] = async (
Expand All @@ -61,15 +68,30 @@ const build: ExternalMethods['build'] = async (
modelInterop,
next,
eventstoreAdapter,
getVacantTimeInMillis
getVacantTimeInMillis,
buildInfo
) => {
const log = getLog('build')

await eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis)
let iterationNumber = buildInfo.iterationNumber ?? 0

const delayNext = async (delay: number, error: any) => {
log.debug(
`Delaying next for ${delay}ms due to service error ${
error ? error.name + ': ' + error.message : ''
}`
)
await next(delay, { iterationNumber: iterationNumber + 1 })
}

const state = await basePool.getReplicationState(basePool)
if (state.status === 'error') {
log.error('Refuse to start or continue replication with error state')
log.error(
`Refuse to start or continue replication with error state: ${state.statusData}`
)
return
} else if (state.status === 'serviceError') {
await delayNext(getBuildDelay(iterationNumber), state.statusData)
return
}
if (state.paused) {
Expand All @@ -78,18 +100,21 @@ const build: ExternalMethods['build'] = async (
}

const timeLeft = getVacantTimeInMillis()
const occupationResult = await basePool.occupyReplication(basePool, timeLeft)
if (!occupationResult.success) {
log.debug(
`Could not occupy replication process. ${occupationResult.message ?? ''}`
)
try {
const result = await basePool.occupyReplication(basePool, timeLeft)
if (!result.success) {
log.error(`Could not occupy replication process: ${result.message}`)
return
}
} catch (error) {
await delayNext(getBuildDelay(iterationNumber), error)
return
}

let iterator = state.iterator
let localContinue = true
const sleepAfterServiceErrorMs = 3000

await eventstoreAdapter.establishTimeLimit(getVacantTimeInMillis)
let eventLoader: EventLoader

const onExit = async () => {
Expand All @@ -101,7 +126,7 @@ const build: ExternalMethods['build'] = async (
try {
await basePool.releaseReplication(basePool)
} catch (error) {
log.error(error)
if (!isHTTPServiceError(error.name)) log.error(error)
}
}

Expand All @@ -119,18 +144,14 @@ const build: ExternalMethods['build'] = async (
)
} catch (error) {
await onExit()
if (RequestTimeoutError.is(error) || ServiceBusyError.is(error)) {
log.debug(
`Got non-fatal error, continuing on the next step. ${error.message}`
)
if (RequestTimeoutError.is(error)) {
await next()
return
} else if (ConnectionError.is(error)) {
log.error(error)
return
} else if (ServiceBusyError.is(error)) {
await delayNext(getBuildDelay(iterationNumber), error)
} else {
throw error
log.error(error)
}
return
}

log.debug('Starting or continuing replication process')
Expand All @@ -150,16 +171,15 @@ const build: ExternalMethods['build'] = async (
loadEventsResult.events
)
} catch (error) {
if (RequestTimeoutError.is(error) || ServiceBusyError.is(error)) {
log.debug(
`Got non-fatal error, continuing on the next step. ${error.message}`
)
await onExit()
return
await onExit()
if (RequestTimeoutError.is(error)) {
await next()
} else if (ServiceBusyError.is(error)) {
await delayNext(getBuildDelay(iterationNumber), error)
} else {
await onExit()
throw error
log.error(error)
}
return
}
const { cursor: nextCursor, events } = loadEventsResult
const { existingSecrets, deletedSecrets } = gatheredSecrets
Expand Down Expand Up @@ -194,6 +214,7 @@ const build: ExternalMethods['build'] = async (
? (state.statusData.appliedEventsCount as number)
: 0
wasPaused = state.paused
iterationNumber = 0
break
} else if (state.status === 'serviceError') {
lastError = statusDataToError(state.statusData, {
Expand All @@ -217,43 +238,44 @@ const build: ExternalMethods['build'] = async (
lastError = error
}

const isBuildSuccess = lastError == null && appliedEventsCount > 0

if (isBuildSuccess) {
if (appliedEventsCount > 0) {
log.verbose(`Replicated batch of ${appliedEventsCount} events`)
}

let delay = 0
let shouldContinue = appliedEventsCount > 0
if (lastError) {
log.error(lastError)
if (
lastError.name === 'ServiceError' ||
lastError.name === 'AbortError' ||
lastError.name === 'FetchError'
) {
const vacantTime = getVacantTimeInMillis()
if (vacantTime > 0) {
await sleep(Math.min(vacantTime, sleepAfterServiceErrorMs))
}
if (isHTTPServiceError(lastError.name)) {
delay = getBuildDelay(iterationNumber)
shouldContinue = true
localContinue = false
} else {
shouldContinue = false
}
}

if (getVacantTimeInMillis() < 0) {
localContinue = false
}

if (isBuildSuccess && wasPaused) {
if (wasPaused) {
log.debug('Pausing replication as requested')
await onExit()
return
}

if (isBuildSuccess && localContinue) {
if (shouldContinue && localContinue && delay === 0) {
log.verbose('Continuing replication in the local build loop')
} else {
await onExit()
if (isBuildSuccess) {
log.debug('Calling next in build')
await next()
if (lastError)
log.error(`Exiting replication loop due to error ${lastError.message}`)
if (shouldContinue) {
if (delay > 0) {
await delayNext(delay, lastError)
} else {
await next()
}
}
return
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({
return checkResult
}

const defaultValues = {
paused: false,
iterator: null,
successEvent: null,
locked: false,
}

try {
const response = await fetch(
`${targetApplicationUrl}${REPLICATION_STATE.endpoint}`
Expand All @@ -25,10 +32,7 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({
name: response.statusText,
message: (state as any).message ?? response.statusText,
},
paused: false,
iterator: null,
successEvent: null,
locked: false,
...defaultValues,
}
}
return state
Expand All @@ -45,13 +49,18 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({
message: error.message as string,
stack: error.stack ? (error.stack as string) : null,
},
paused: false,
iterator: null,
successEvent: null,
locked: false,
...defaultValues,
}
} else {
throw error
return {
status: 'error',
statusData: {
name: error.name as string,
message: error.message as string,
stack: error.stack ? (error.stack as string) : null,
},
...defaultValues,
}
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import STS from 'aws-sdk/clients/sts'
import type { EventSubscriberNotifier } from '@resolve-js/runtime-base'
import {
createEventSubscriberNotification,
Expand Down Expand Up @@ -26,6 +27,8 @@ type NotifierRuntime = {
invokeLambdaAsync: Function
}

export const LAMBDA_TO_STEP_FUNCTION_COST_EXPENSE_THRESHOLD_MS = 3000

export const waitForSubscriber = async (isSaga = false) =>
await new Promise((resolve) => setTimeout(resolve, isSaga ? 10000 : 1000))

Expand Down Expand Up @@ -136,16 +139,45 @@ export const eventSubscriberNotifierFactory = async (
? `arn:aws:sqs:${region}:${accountId}:${userId}-${eventSubscriberScope}-${eventSubscriber}`
: functionArn

const invokeBuildAsync = async (parameters: { eventSubscriber: string }) =>
useSqs
? await sendSqsMessage(
`${userId}-${eventSubscriberScope}-${parameters.eventSubscriber}`,
parameters
)
: await invokeLambdaAsync(functionName, {
resolveSource: 'BuildEventSubscriber',
...parameters,
})
const invokeBuildAsync = async (
parameters: { eventSubscriber: string },
timeout?: number
) => {
if (useSqs) {
return await sendSqsMessage(
`${userId}-${eventSubscriberScope}-${parameters.eventSubscriber}`,
parameters
)
}
const lambdaEvent = {
resolveSource: 'BuildEventSubscriber',
...parameters,
}
if (
timeout == null ||
timeout < LAMBDA_TO_STEP_FUNCTION_COST_EXPENSE_THRESHOLD_MS
) {
await new Promise((resolve) => setTimeout(resolve, timeout))
await invokeLambdaAsync(functionName, lambdaEvent)
} else {
const { Arn } = await new STS().getCallerIdentity().promise()
await invokeFunction({
Region: process.env.AWS_REGION as string,
FunctionName: process.env.RESOLVE_SCHEDULER_LAMBDA_ARN as string,
Payload: {
functionName: process.env.AWS_LAMBDA_FUNCTION_NAME,
event: lambdaEvent,
date: new Date(Date.now() + timeout).toISOString(),
validationRoleArn: Arn,
principial: {
accessKeyId: process.env.AWS_ACCESS_KEY_ID,
secretAccessKey: process.env.AWS_SECRET_ACCESS_KEY,
sessionToken: process.env.AWS_SESSION_TOKEN,
},
},
})
}
}

const ensureQueue = async (name?: string) => {
if (!useSqs) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,17 +110,33 @@ const serializeState = async ({ state }: { state: any }): Promise<string> => {
const next = async (
pool: ReadModelPool,
eventSubscriber: string,
timeout?: number,
notificationExtraPayload?: object,
...args: any[]
) => {
if (args.length > 0) {
throw new TypeError('Next should be invoked with no arguments')
}
await pool.invokeBuildAsync({
eventSubscriber,
initiator: 'read-model-next',
notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`,
sendTime: Date.now(),
})
if (timeout != null && (isNaN(+timeout) || +timeout < 0)) {
throw new TypeError('Timeout should be non-negative integer')
}
if (
notificationExtraPayload != null &&
notificationExtraPayload.constructor !== Object
) {
throw new TypeError('Notification extra payload should be plain object')
}

await pool.invokeBuildAsync(
{
eventSubscriber,
initiator: 'read-model-next',
notificationId: `NT-${Date.now()}${Math.floor(Math.random() * 1000000)}`,
sendTime: Date.now(),
...notificationExtraPayload,
},
timeout != null ? Math.floor(+timeout) : timeout
)
}

const updateCustomReadModel = async (
Expand Down
4 changes: 3 additions & 1 deletion packages/runtime/runtimes/runtime-base/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,10 +126,12 @@ export type EventSubscriberNotification = {
sendTime: number
event?: Event
cursor?: string
[key: string]: any
}

export type InvokeBuildAsync = (
parameters: EventSubscriberNotification
parameters: EventSubscriberNotification,
timeout?: number
) => Promise<void>

export type { BuildTimeConstants }
Expand Down
Loading