Skip to content

Commit

Permalink
Sparse clean TRX journal / Improve PLV8 (#2030)
Browse files Browse the repository at this point in the history
  • Loading branch information
IhostVlad authored Sep 3, 2021
1 parent 5a05d7b commit c41dbdc
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 60 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,8 @@ export const buildEvents: (
} = pool
const { eventsWithCursors } = buildInfo
const isContinuousMode =
typeof eventstoreAdapter.getCursorUntilEventTypes === 'function'
typeof eventstoreAdapter.getCursorUntilEventTypes === 'function' &&
!!process.env.EXPERIMENTAL_SQS_TRANSPORT
const getContinuousLatestCursor = async (
cursor: ReadModelCursor,
events: Array<ReadModelEvent>,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
import { ReadModelEvent } from '../../readmodel-base/types'
import type {
PassthroughErrorInstance,
ExternalMethods,
ReadModelCursor,
ReadModelProcedureLedger,
ProcedureResult,
EventThreadData,
ReadModelEvent,
} from './types'
import getLog from './get-log'
import { LeveledDebugger } from '@resolve-js/debug-levels'
Expand Down Expand Up @@ -166,7 +166,8 @@ const buildEvents: (
const { eventsWithCursors } = buildInfo
let isProcedural = inputIsProcedural
const isContinuousMode =
typeof eventstoreAdapter.getCursorUntilEventTypes === 'function'
typeof eventstoreAdapter.getCursorUntilEventTypes === 'function' &&
!!process.env.EXPERIMENTAL_SQS_TRANSPORT
const getContinuousLatestCursor = async (
cursor: ReadModelCursor,
events: Array<EventThreadData>,
Expand Down Expand Up @@ -238,7 +239,7 @@ const buildEvents: (
return events
})

let eventsPromise =
let eventsPromise: Promise<ReadModelEvent[]> | Promise<null> =
hotEvents == null
? loadEventsWithMonitoring(
Promise.resolve(cursor),
Expand All @@ -247,7 +248,7 @@ const buildEvents: (
)
: Promise.resolve(hotEvents)

const eventstoreLocalTableNamePromise = (async () => {
const eventstoreLocalResourcePromise = (async () => {
let resourceNames = null
try {
void ({ resourceNames } =
Expand All @@ -259,7 +260,8 @@ const buildEvents: (
if (
resourceNames == null ||
resourceNames?.eventsTableName == null ||
resourceNames?.databaseName == null
resourceNames?.databaseName == null ||
!isProcedural
) {
return null
}
Expand All @@ -285,7 +287,7 @@ const buildEvents: (
SELECT 1/Count("CTE".*) AS "NonZero" FROM "CTE"
`)

return resourceNames.eventsTableName
return resourceNames
} catch (err) {
return null
}
Expand All @@ -312,24 +314,21 @@ const buildEvents: (
true
)

let [events, eventstoreLocalTableName] = await Promise.all([
eventsPromise,
eventstoreLocalTableNamePromise,
])
let isLocalEventsSkipped = false
const eventstoreLocalResourcesNames = await eventstoreLocalResourcePromise
let events =
eventstoreLocalResourcesNames == null ? await eventsPromise : null

for (metricData.eventLoopCount = 0; true; metricData.eventLoopCount++) {
if (events.length === 0) {
if (events != null && events.length === 0) {
throw new PassthroughError(false)
}

log.debug(`Start optimistic events loading`)

let nextCursorPromise: Promise<ReadModelCursor> = getContinuousLatestCursor(
cursor,
events,
eventTypes
)
let nextCursorPromise: Promise<ReadModelCursor> | null =
events != null
? getContinuousLatestCursor(cursor, events, eventTypes)
: null
let appliedEventsCount = 0
let regularWorkflow = true

Expand All @@ -345,8 +344,13 @@ const buildEvents: (
)}(${escapeStr(
JSON.stringify({
maxExecutionTime: getVacantTimeInMillis(),
...(eventstoreLocalTableName != null
? { eventstoreLocalTableName }
...(eventstoreLocalResourcesNames != null
? {
localEventsDatabaseName:
eventstoreLocalResourcesNames.databaseName,
localEventsTableName:
eventstoreLocalResourcesNames.eventsTableName,
}
: { events }),
})
)}) AS "Result"`
Expand Down Expand Up @@ -376,6 +380,7 @@ const buildEvents: (

appliedEventsCount = appliedCount
eventCount += appliedCount

if (status === 'OK_PARTIAL' || status === 'CUSTOM_ERROR') {
nextCursorPromise = getContinuousLatestCursor(
cursor,
Expand All @@ -384,6 +389,14 @@ const buildEvents: (
)
}
if (status === 'OK_ALL' || status === 'OK_PARTIAL') {
if (nextCursorPromise == null) {
nextCursorPromise = nextCursorPromise = getContinuousLatestCursor(
cursor,
appliedEventsThreadData,
eventTypes
)
}

lastSuccessEvent = successEvent
} else if (status === 'CUSTOM_ERROR') {
lastFailedEvent = failureEvent
Expand All @@ -395,9 +408,6 @@ const buildEvents: (
}

regularWorkflow = false
if (eventstoreLocalTableName != null) {
isLocalEventsSkipped = true
}
} catch (err) {
isProcedural = false
if (err instanceof PassthroughError) {
Expand All @@ -413,6 +423,14 @@ const buildEvents: (

await inlineLedgerRunQuery(`ROLLBACK TO SAVEPOINT ${rootSavePointId};`)

if (events == null) {
events = await loadEventsWithMonitoring(
Promise.resolve(cursor),
100,
Date.now()
)
}

nextCursorPromise = getContinuousLatestCursor(
cursor,
events,
Expand All @@ -423,24 +441,19 @@ const buildEvents: (
}

const eventsLoadStartTimestamp = Date.now()
if (regularWorkflow && isLocalEventsSkipped) {
events = await loadEventsWithMonitoring(
nextCursorPromise,
100,
eventsLoadStartTimestamp
)
isLocalEventsSkipped = false
}

if (regularWorkflow || eventstoreLocalTableName == null) {
eventsPromise = loadEventsWithMonitoring(
nextCursorPromise,
1000,
eventsLoadStartTimestamp
)
}
eventsPromise =
regularWorkflow || eventstoreLocalResourcesNames == null
? loadEventsWithMonitoring(
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
nextCursorPromise!,
1000,
eventsLoadStartTimestamp
)
: Promise.resolve(null)

if (regularWorkflow) {
// eslint-disable-next-line @typescript-eslint/no-non-null-assertion
events = events!
log.debug(`Running regular workflow events applying`)
try {
for (const event of events) {
Expand Down Expand Up @@ -731,11 +744,7 @@ const build: ExternalMethods['build'] = async (
AND "IsPaused" = FALSE
AND "Errors" IS NULL
FOR NO KEY UPDATE NOWAIT
), "CleanTrx" AS (
DELETE FROM ${databaseNameAsId}.${trxTableNameAsId}
WHERE "Timestamp" < CAST(extract(epoch from clock_timestamp()) * 1000 AS BIGINT) - 86400000
RETURNING *
), "InsertTrx" AS (
), "InsertTrx" AS (
INSERT INTO ${databaseNameAsId}.${trxTableNameAsId}(
"Timestamp", "XaKey", "XaValue"
) VALUES (
Expand All @@ -749,7 +758,6 @@ const build: ExternalMethods['build'] = async (
SET "XaKey" = ${escapeStr(xaKey)}
WHERE "EventSubscriber" = ${escapeStr(readModelName)}
AND CAST(COALESCE((SELECT LEAST(Count("InsertTrx".*), 0) FROM "InsertTrx"), 0) AS BIGINT) = 0
AND CAST(COALESCE((SELECT LEAST(Count("CleanTrx".*), 0) FROM "CleanTrx"), 0) AS BIGINT) = 0
AND (SELECT Count("MaybeAcquireLock".*) FROM "MaybeAcquireLock") = 1
AND "IsPaused" = FALSE
AND "Errors" IS NULL
Expand Down Expand Up @@ -816,6 +824,18 @@ const build: ExternalMethods['build'] = async (
getVacantTimeInMillis,
buildInfo
)

try {
await inlineLedgerRunQuery(`
DELETE FROM ${databaseNameAsId}.${trxTableNameAsId}
WHERE "Timestamp" < CAST(extract(epoch from clock_timestamp()) * 1000 AS BIGINT) - 86400000
`)
} catch (err) {
if (!(err instanceof PassthroughError)) {
log.debug(`Unknown error is thrown while cleaning TRX journal`)
throw err
}
}
} catch (error) {
if (
error == null ||
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,45 +191,54 @@ const wrapProcedure = (readModel) => (input, options) => {

const {
events: inputEvents,
eventstoreLocalTableName,
localEventsDatabaseName,
localEventsTableName,
maxExecutionTime,
} = input
if (inputEvents != null && eventstoreLocalTableName != null) {
if (
inputEvents != null &&
(localEventsDatabaseName != null || localEventsTableName != null)
) {
throw new Error(
'Providing events and eventstoreLocalTableName is mutually exclusive'
'Providing events and localEventsDatabaseName/localEventsTableName is mutually exclusive'
)
}
let events = inputEvents
if (eventstoreLocalTableName != null) {

let events = null
if (localEventsDatabaseName != null && localEventsTableName != null) {
try {
events = null
const databaseNameAsId = escapeId(options.schemaName)
const ledgerDatabaseNameAsId = escapeId(options.schemaName)
const ledgerTableNameAsId = escapeId(
`${options.tablePrefix}__${options.schemaName}__LEDGER__`
)
const eventsTableAsId = escapeId(eventstoreLocalTableName)
const eventsDatabaseNameAsId = escapeId(localEventsDatabaseName)
const eventsTableAsId = escapeId(localEventsTableName)
const [{ EventTypes, Cursor }] = plv8.execute(`
SELECT "EventTypes", "Cursor" FROM ${databaseNameAsId}.${ledgerTableNameAsId}
SELECT "EventTypes", "Cursor" FROM ${ledgerDatabaseNameAsId}.${ledgerTableNameAsId}
WHERE "EventSubscriber" = ${escapeStr(readModel.name)}
`)

events = plv8.execute(`SELECT * FROM ${databaseNameAsId}.${eventsTableAsId}
events = Array.from(
plv8.execute(`SELECT * FROM ${eventsDatabaseNameAsId}.${eventsTableAsId}
WHERE 1=1 ${
EventTypes != null && EventTypes.length > 0
? `AND "type" IN (${EventTypes.map(escapeStr)})`
: ''
} AND (${cursorStrToHex(Cursor)
.map(
(threadCounter, threadId) =>
`"threadId" = ${+threadId} AND "threadCounter" >= x'${threadCounter}'::INT8 `
)
.join(' OR ')})
.map(
(threadCounter, threadId) =>
`"threadId" = ${+threadId} AND "threadCounter" >= x'${threadCounter}'::INT8 `
)
.join(' OR ')})
ORDER BY "timestamp" ASC, "threadCounter" ASC, "threadId" ASC
LIMIT 1000
`)
)
} catch (err) {
throw new Error('Local events reading failed')
}
} else if (inputEvents != null) {
events = inputEvents
}

if (!Array.isArray(events)) {
Expand Down

0 comments on commit c41dbdc

Please sign in to comment.