Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve readmodel postgresql lightweight optimistic #2118

Merged
merged 2 commits into from
Nov 12, 2021
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -328,12 +328,10 @@ const buildEvents: (
const eventstoreLocalResourcesNames = await eventstoreLocalResourcePromise
let events =
eventstoreLocalResourcesNames == null ? await eventsPromise : null
let isLastEmptyLoop = false

for (metricData.eventLoopCount = 0; true; metricData.eventLoopCount++) {
if (events != null && events.length === 0) {
throw new PassthroughError(false)
}

const isEffectiveEventLoop = events == null || events.length > 0
log.debug(`Start optimistic events loading`)

let nextCursorPromise: Promise<ReadModelCursor> | null =
Expand All @@ -343,7 +341,7 @@ const buildEvents: (
let appliedEventsCount = 0
let regularWorkflow = true

if (isProcedural) {
if (isProcedural && isEffectiveEventLoop) {
log.debug(`Running procedural events applying`)
try {
let procedureResult: Array<{ Result: ProcedureResult }> | null = null
Expand Down Expand Up @@ -462,17 +460,21 @@ const buildEvents: (
}

const eventsLoadStartTimestamp = Date.now()
eventsPromise =
regularWorkflow || eventstoreLocalResourcesNames == null
? loadEventsWithMonitoring(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
nextCursorPromise!,
1000,
eventsLoadStartTimestamp
)
: Promise.resolve(null)
const getEventsPromise = (regularWorkflow ||
eventstoreLocalResourcesNames == null
? loadEventsWithMonitoring.bind(
null,
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
nextCursorPromise!,
1000,
eventsLoadStartTimestamp
)
: Promise.resolve.bind(null, null)) as () =>
| Promise<ReadModelEvent[]>
| Promise<null>
eventsPromise = getEventsPromise()

if (regularWorkflow) {
if (regularWorkflow && isEffectiveEventLoop) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
events = events!
log.debug(`Running regular workflow events applying`)
Expand Down Expand Up @@ -555,7 +557,8 @@ const buildEvents: (
log.debug(`Finish running regular workflow events applying`)
}
const nextCursor = await nextCursorPromise
if (lastError == null) {

if (lastError == null && isEffectiveEventLoop) {
log.debug(`Saving success event into inline ledger`)
await inlineLedgerRunQuery(
`UPDATE ${databaseNameAsId}.${ledgerTableNameAsId} SET
Expand All @@ -572,7 +575,7 @@ const buildEvents: (
COMMIT;
`
)
} else {
} else if (isEffectiveEventLoop) {
log.debug(`Saving error and failed event into inline ledger`)
await inlineLedgerRunQuery(
`UPDATE ${databaseNameAsId}.${ledgerTableNameAsId}
Expand Down Expand Up @@ -629,7 +632,9 @@ const buildEvents: (
eventsApplyStartTimestamp = Date.now()
projectionApplyTime = 0

const isBuildSuccess = lastError == null && appliedEventsCount > 0
const isBuildSuccess: boolean =
lastError == null && (appliedEventsCount > 0 || !isLastEmptyLoop)
isLastEmptyLoop = isBuildSuccess && appliedEventsCount === 0
cursor = nextCursor

if (getVacantTimeInMillis() < 0) {
Expand All @@ -640,7 +645,8 @@ const buildEvents: (
log.debug(`Start transaction for the next step events applying`)
rootSavePointId = generateGuid(xaKey, 'ROOT')

await inlineLedgerRunQuery(
const runNextDatabaseLoop = inlineLedgerRunQuery.bind(
null,
`BEGIN TRANSACTION;
SET TRANSACTION ISOLATION LEVEL REPEATABLE READ;
SAVEPOINT ${rootSavePointId};
Expand All @@ -657,7 +663,14 @@ const buildEvents: (
true
)

events = await eventsPromise
if (!isLastEmptyLoop) {
await runNextDatabaseLoop()
events = await eventsPromise
} else {
eventsPromise = getEventsPromise()
events = await eventsPromise
await runNextDatabaseLoop()
}
} else {
if (isBuildSuccess) {
log.debug(`Going to the next step of building`)
Expand Down