From d7e00ce47d26b594d4b7fc768fe8f61fb674f34c Mon Sep 17 00:00:00 2001 From: Roman Chistokhodov Date: Mon, 6 Dec 2021 12:44:59 +0300 Subject: [PATCH 1/2] Timer in http replication request --- .../src/api-handlers/replicate.ts | 142 +++++++++++++----- .../replicator-via-api-handler/src/build.ts | 9 +- .../src/call-replicate.ts | 4 +- .../replicator-via-api-handler/src/types.ts | 2 +- 4 files changed, 118 insertions(+), 39 deletions(-) diff --git a/packages/modules/module-replication/src/api-handlers/replicate.ts b/packages/modules/module-replication/src/api-handlers/replicate.ts index 6396581df..676cd928e 100644 --- a/packages/modules/module-replication/src/api-handlers/replicate.ts +++ b/packages/modules/module-replication/src/api-handlers/replicate.ts @@ -26,7 +26,7 @@ function shouldSaveError(error: any) { } const handler = async (req: ResolveRequest, res: ResolveResponse) => { - let input + let input: any try { input = JSON.parse(req.body ?? '') checkInput(input) @@ -48,49 +48,118 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { iterator: input.iterator, }) - res.status(202) - res.end('Replication has been started') - } catch (error) { - if (shouldSaveError(error)) { + type ReplicationOperationResult = { + status: 'timeout' | 'success' | 'error' + message: string + } + + const replicateData = async (): Promise => { + let result: ReplicationOperationResult = { + status: 'error', + message: 'Uninitialized error', + } try { + await req.resolve.eventstoreAdapter.replicateSecrets( + input.secretsToSet, + input.secretsToDelete + ) + await req.resolve.eventstoreAdapter.replicateEvents(input.events) await req.resolve.eventstoreAdapter.setReplicationStatus({ statusAndData: { - status: 'criticalError', + status: 'batchDone', data: { - name: error.name ?? 'Error', - message: error.message ?? 'Unknown error', + appliedEventsCount: input.events.length, }, }, + lastEvent: input.events[input.events.length - 1], }) - } catch (e) { - error.message += '\n' - error.message += e.message + result = { + status: 'success', + message: `Completed replication of ${input.events.length} events`, + } + } catch (error) { + result.message = error.message + if (shouldSaveError(error)) { + try { + await req.resolve.eventstoreAdapter.setReplicationStatus({ + statusAndData: { + status: 'criticalError', + data: { + name: error.name ?? 'Error', + message: error.message ?? 'Unknown error', + }, + }, + }) + } catch (e) { + log.error(e) + } + } else { + log.debug(error) + } + } finally { + try { + if (result.status === 'success') await req.resolve.broadcastEvent() + } catch (error) { + log.error('broadcastEvent error: ', error) + } } - } else { - log.debug(error) + return result } - res.status(500) - res.end(error.message) - return - } + type TimerInfo = { + timeout: NodeJS.Timeout | null + timeoutResolve: ((value: ReplicationOperationResult) => void) | null + timeoutPromise: Promise + } - try { - await req.resolve.eventstoreAdapter.replicateSecrets( - input.secretsToSet, - input.secretsToDelete - ) - await req.resolve.eventstoreAdapter.replicateEvents(input.events) - await req.resolve.eventstoreAdapter.setReplicationStatus({ - statusAndData: { - status: 'batchDone', - data: { - appliedEventsCount: input.events.length, - }, - }, - lastEvent: input.events[input.events.length - 1], - }) - await req.resolve.broadcastEvent() + const makeTimer = (): TimerInfo => { + const timerInfo: Partial = { + timeout: null, + timeoutResolve: null, + } + + timerInfo.timeoutPromise = new Promise( + (resolve) => { + timerInfo.timeoutResolve = resolve + timerInfo.timeout = setTimeout( + () => + resolve({ + status: 'timeout', + message: `Batch of ${input.events.length} events took too long to process to respond in place. Continuing replication in background.`, + }), + 5000 + ) + } + ) + return timerInfo as TimerInfo + } + const timerInfo = makeTimer() + + const result = await Promise.race([ + timerInfo.timeoutPromise, + replicateData(), + ]) + if (result.status !== 'timeout') { + if (timerInfo.timeout !== null) { + clearTimeout(timerInfo.timeout) + timerInfo.timeout = null + } + if (timerInfo.timeoutResolve !== null) { + timerInfo.timeoutResolve({ + status: 'timeout', + message: 'Resolving timeout promise', + }) + timerInfo.timeoutResolve = null + } + } + if (result.status === 'timeout') { + res.status(202) + } else if (result.status === 'success') { + res.status(200) + } else if (result.status === 'error') { + res.status(500) + } + res.end(result.message) } catch (error) { if (shouldSaveError(error)) { try { @@ -104,11 +173,16 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { }, }) } catch (e) { - log.error(e) + error.message += '\n' + error.message += e.message } } else { log.debug(error) } + + res.status(500) + res.end(error.message) + return } } diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts index ce48a4143..058aecbee 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts @@ -4,7 +4,6 @@ import type { CallReplicateResult, } from './types' import type { - ReplicationState, StoredEventBatchPointer, GatheredSecrets, EventLoader, @@ -33,6 +32,7 @@ const getBuildDelay = (iterationNumber: number) => { } const BATCH_PROCESSING_POLL_MS = 50 +const PATCH_PROCESSING_TIME_LIMIT = 60 * 1000 const build: ExternalMethods['build'] = async ( basePool, @@ -195,11 +195,14 @@ const build: ExternalMethods['build'] = async ( name: 'Error', message: result.message, } - } else if (result.type === 'launched') { + } else if (result.type === 'launched' || result.type === 'processed') { while (true) { const state = await basePool.getReplicationState(basePool) if (state.statusAndData.status === 'batchInProgress') { - if (state.statusAndData.data.startedAt + 60 * 1000 < Date.now()) { + if ( + state.statusAndData.data.startedAt + PATCH_PROCESSING_TIME_LIMIT < + Date.now() + ) { lastError = { recoverable: true, name: 'Error', diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/call-replicate.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/call-replicate.ts index 42ae8ce99..3a6e8e435 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/call-replicate.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/call-replicate.ts @@ -30,8 +30,10 @@ const callReplicate: InternalMethods['callReplicate'] = async ( resultType = 'serverError' } else if (response.status >= 400) { resultType = 'clientError' - } else if (response.status === 202 || response.status === 200) { + } else if (response.status === 202) { resultType = 'launched' + } else if (response.status === 200) { + resultType = 'processed' } return { type: resultType, diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/types.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/types.ts index 7677f73c3..046bcde82 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/types.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/types.ts @@ -25,7 +25,7 @@ export type GetReplicationState = ( ) => Promise export type CallReplicateResult = { - type: 'launched' | 'unknown' | 'serverError' | 'clientError' + type: 'launched' | 'processed' | 'unknown' | 'serverError' | 'clientError' httpStatus: number message: string } From b176229fd3af3f39e5f5d72e86f1896d2f191b43 Mon Sep 17 00:00:00 2001 From: Roman Chistokhodov Date: Thu, 9 Dec 2021 15:09:35 +0300 Subject: [PATCH 2/2] Correct replication lock --- packages/core/core/src/types/runtime.ts | 21 +-- .../src/api-handlers/occupy_replication.ts | 10 +- .../src/api-handlers/release_replication.ts | 11 +- .../src/api-handlers/replicate.ts | 82 ++++++++--- .../src/api-handlers/replication_state.ts | 1 + .../eventstore-base/src/types.ts | 1 + .../src/get-replication-state.ts | 6 +- .../src/init-replication-state-table.ts | 3 +- .../eventstore-lite/src/replicate-events.ts | 46 +++++- .../eventstore-lite/src/replicate-secrets.ts | 72 ++++++++-- .../eventstore-lite/src/reset-replication.ts | 3 +- .../src/set-replication-lock.ts | 20 ++- .../src/set-replication-status.ts | 33 ++++- .../eventstore-postgresql/src/errors.ts | 2 + .../src/get-replication-state.ts | 4 +- .../src/init-replication-state-table.ts | 3 +- .../src/replicate-events.ts | 124 ++++++++++------ .../src/replicate-secrets.ts | 62 ++++++-- .../src/reset-replication.ts | 20 +-- .../src/set-replication-lock.ts | 18 ++- .../src/set-replication-status.ts | 31 +++- .../replicator-via-api-handler/src/build.ts | 26 +++- .../src/call-replicate.ts | 7 +- .../src/check-target-url.ts | 1 + .../src/get-replication-state.ts | 1 + .../src/occupy-replication.ts | 8 +- .../src/release-replication.ts | 7 +- .../replicator-via-api-handler/src/types.ts | 5 +- tests/eventstore-replication/index.test.ts | 134 +++++++++++++----- 29 files changed, 589 insertions(+), 173 deletions(-) diff --git a/packages/core/core/src/types/runtime.ts b/packages/core/core/src/types/runtime.ts index 16f48303b..782eb6171 100644 --- a/packages/core/core/src/types/runtime.ts +++ b/packages/core/core/src/types/runtime.ts @@ -172,6 +172,7 @@ export type ReplicationState = { iterator: SerializableMap | null successEvent: OldEvent | null locked: boolean + lockId: string | null } export type EventStoreDescription = { @@ -224,20 +225,24 @@ export type Eventstore = { }> > - replicateEvents: (events: OldEvent[]) => Promise + replicateEvents: (lockId: string, events: OldEvent[]) => Promise replicateSecrets: ( + lockId: string, existingSecrets: OldSecretRecord[], deletedSecrets: Array - ) => Promise - setReplicationStatus: (state: { - statusAndData: ReplicationStatusAndData - lastEvent?: OldEvent - iterator?: ReplicationState['iterator'] - }) => Promise + ) => Promise + setReplicationStatus: ( + lockId: string, + state: { + statusAndData: ReplicationStatusAndData + lastEvent?: OldEvent + iterator?: ReplicationState['iterator'] + } + ) => Promise setReplicationPaused: (pause: boolean) => Promise getReplicationState: () => Promise resetReplication: () => Promise - setReplicationLock: (lockDuration: number) => Promise + setReplicationLock: (lockId: string, lockDuration: number) => Promise describe: ( options?: EventStoreDescribeOptions diff --git a/packages/modules/module-replication/src/api-handlers/occupy_replication.ts b/packages/modules/module-replication/src/api-handlers/occupy_replication.ts index f2c65b307..e24dfda91 100644 --- a/packages/modules/module-replication/src/api-handlers/occupy_replication.ts +++ b/packages/modules/module-replication/src/api-handlers/occupy_replication.ts @@ -2,7 +2,14 @@ import type { ResolveRequest, ResolveResponse } from '@resolve-js/core' import respondWithError from './respond-with-error' const handler = async (req: ResolveRequest, res: ResolveResponse) => { - const duration = +req.query.duration + const occupyInfo = JSON.parse(req.body ?? '') + if (occupyInfo == null || typeof occupyInfo.lockId !== 'string') { + res.status(400) + res.end('Expected lockId') + return + } + + const duration = +occupyInfo.duration if (isNaN(duration) || duration <= 0) { res.status(400) res.end('Invalid duration provided') @@ -10,6 +17,7 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { } try { const success = await req.resolve.eventstoreAdapter.setReplicationLock( + occupyInfo.lockId, duration ) if (success) { diff --git a/packages/modules/module-replication/src/api-handlers/release_replication.ts b/packages/modules/module-replication/src/api-handlers/release_replication.ts index 58920774f..f264750fb 100644 --- a/packages/modules/module-replication/src/api-handlers/release_replication.ts +++ b/packages/modules/module-replication/src/api-handlers/release_replication.ts @@ -3,7 +3,16 @@ import respondWithError from './respond-with-error' const handler = async (req: ResolveRequest, res: ResolveResponse) => { try { - await req.resolve.eventstoreAdapter.setReplicationLock(0) + const releaseInfo = JSON.parse(req.body ?? '') + if (releaseInfo == null || typeof releaseInfo.lockId !== 'string') { + res.status(400) + res.end('Expected lockId') + return + } + await req.resolve.eventstoreAdapter.setReplicationLock( + releaseInfo.lockId, + 0 + ) res.status(200) res.end() } catch (error) { diff --git a/packages/modules/module-replication/src/api-handlers/replicate.ts b/packages/modules/module-replication/src/api-handlers/replicate.ts index 676cd928e..537a23ec8 100644 --- a/packages/modules/module-replication/src/api-handlers/replicate.ts +++ b/packages/modules/module-replication/src/api-handlers/replicate.ts @@ -1,7 +1,14 @@ -import type { ResolveRequest, ResolveResponse } from '@resolve-js/core' +import type { + ReplicationState, + ResolveRequest, + ResolveResponse, +} from '@resolve-js/core' import { getLog } from './get-log' const checkInput = (input: any) => { + if (typeof input.lockId !== 'string') { + throw new Error('LockId must be provided and be string') + } if (!Array.isArray(input.events)) { throw new Error('Events must be array') } @@ -36,9 +43,10 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { return } + const lockId: string = input.lockId const log = getLog('replicate') try { - await req.resolve.eventstoreAdapter.setReplicationStatus({ + await req.resolve.eventstoreAdapter.setReplicationStatus(lockId, { statusAndData: { status: 'batchInProgress', data: { @@ -51,6 +59,7 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { type ReplicationOperationResult = { status: 'timeout' | 'success' | 'error' message: string + state?: ReplicationState } const replicateData = async (): Promise => { @@ -59,29 +68,53 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { message: 'Uninitialized error', } try { - await req.resolve.eventstoreAdapter.replicateSecrets( - input.secretsToSet, - input.secretsToDelete - ) - await req.resolve.eventstoreAdapter.replicateEvents(input.events) - await req.resolve.eventstoreAdapter.setReplicationStatus({ - statusAndData: { - status: 'batchDone', - data: { - appliedEventsCount: input.events.length, - }, - }, - lastEvent: input.events[input.events.length - 1], - }) - result = { - status: 'success', - message: `Completed replication of ${input.events.length} events`, + const myLock = + (await req.resolve.eventstoreAdapter.replicateSecrets( + lockId, + input.secretsToSet, + input.secretsToDelete + )) && + (await req.resolve.eventstoreAdapter.replicateEvents( + lockId, + input.events + )) + if (!myLock) { + result = { + status: 'error', + message: `Can't replicate using lock id "${lockId}": someone else occupied the replication lock or database is locked due to reset`, + } + } else { + const state = await req.resolve.eventstoreAdapter.setReplicationStatus( + lockId, + { + statusAndData: { + status: 'batchDone', + data: { + appliedEventsCount: input.events.length, + }, + }, + lastEvent: input.events[input.events.length - 1], + } + ) + + if (state) { + result = { + status: 'success', + message: `Completed replication of ${input.events.length} events`, + state, + } + } else { + result = { + status: 'error', + message: `Can't set batchDone status using lock id "${lockId}": someone else occupied the replication lock`, + } + } } } catch (error) { result.message = error.message if (shouldSaveError(error)) { try { - await req.resolve.eventstoreAdapter.setReplicationStatus({ + await req.resolve.eventstoreAdapter.setReplicationStatus(lockId, { statusAndData: { status: 'criticalError', data: { @@ -159,11 +192,11 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { } else if (result.status === 'error') { res.status(500) } - res.end(result.message) + res.json(result) } catch (error) { if (shouldSaveError(error)) { try { - await req.resolve.eventstoreAdapter.setReplicationStatus({ + await req.resolve.eventstoreAdapter.setReplicationStatus(lockId, { statusAndData: { status: 'criticalError', data: { @@ -181,7 +214,10 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { } res.status(500) - res.end(error.message) + res.json({ + status: 'error', + message: error.message, + }) return } } diff --git a/packages/modules/module-replication/src/api-handlers/replication_state.ts b/packages/modules/module-replication/src/api-handlers/replication_state.ts index 379f73e1a..27c860cf9 100644 --- a/packages/modules/module-replication/src/api-handlers/replication_state.ts +++ b/packages/modules/module-replication/src/api-handlers/replication_state.ts @@ -29,6 +29,7 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => { successEvent: null, paused: false, locked: false, + lockId: null, } res.json(result) } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/types.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/types.ts index 876639ac5..8ef1fb55c 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/types.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-base/src/types.ts @@ -80,6 +80,7 @@ export function getInitialReplicationState(): ReplicationState { paused: false, successEvent: null, locked: false, + lockId: null, } } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/get-replication-state.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/get-replication-state.ts index 22b133180..707814cf4 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/get-replication-state.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/get-replication-state.ts @@ -12,9 +12,8 @@ const getReplicationState = async ( const rows = await executeStatement( `SELECT "Status", "StatusData", "Iterator", "IsPaused", "SuccessEvent", - ("LockExpirationTime" > CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER)) as "Locked" FROM ${escapeId( - replicationStateTableName - )}` + ("LockExpirationTime" > CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER)) as "Locked", "LockId" + FROM ${escapeId(replicationStateTableName)}` ) if (rows.length > 0) { const row = rows[0] @@ -33,6 +32,7 @@ const getReplicationState = async ( iterator: row.Iterator != null ? JSON.parse(row.Iterator) : null, successEvent: lastEvent, locked: !!row.Locked, + lockId: row.lockId, } } else { return getInitialReplicationState() diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/init-replication-state-table.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/init-replication-state-table.ts index 7a773dfa6..ba475c30e 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/init-replication-state-table.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/init-replication-state-table.ts @@ -18,7 +18,8 @@ const initReplicationStateTable = async ( "Iterator" JSON NULL, "IsPaused" TINYINT DEFAULT 0 NOT NULL, "SuccessEvent" JSON NULL, - "LockExpirationTime" BIGINT DEFAULT 0 NOT NULL + "LockExpirationTime" BIGINT DEFAULT 0 NOT NULL, + "LockId" VARCHAR(50) DEFAULT NULL ); INSERT OR IGNORE INTO ${replicationStateTableNameAsId} DEFAULT VALUES; COMMIT; diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/replicate-events.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/replicate-events.ts index 0895b5aa3..b611f8cb4 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/replicate-events.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/replicate-events.ts @@ -2,12 +2,14 @@ import type { AdapterPool } from './types' import { THREAD_COUNT } from '@resolve-js/eventstore-base' import type { OldEvent, StoredEvent } from '@resolve-js/eventstore-base' import { str as strCRC32 } from 'crc-32' +import isIntegerOverflowError from './integer-overflow-error' export const replicateEvents = async ( pool: AdapterPool, + lockId: string, events: OldEvent[] -): Promise => { - if (events.length === 0) return +): Promise => { + if (events.length === 0) return true const { executeStatement, @@ -17,6 +19,9 @@ export const replicateEvents = async ( escapeId, } = pool const eventsTableNameAsId = escapeId(eventsTableName) + const replicationStateTableNameAsId = escapeId( + `${eventsTableName}-replication-state` + ) const rows = (await executeStatement( `SELECT "threadId", MAX("threadCounter") AS "threadCounter" FROM @@ -44,7 +49,17 @@ export const replicateEvents = async ( eventsToInsert.push({ ...event, threadId, threadCounter }) } - await executeQuery(`INSERT OR IGNORE INTO ${eventsTableNameAsId}( + try { + await executeQuery(` + BEGIN IMMEDIATE; + SELECT ABS("ReplicationIsLocked") AS "lock_zero" FROM ( + SELECT 0 AS "ReplicationIsLocked" + UNION ALL + SELECT -9223372036854775808 AS "ReplicationIsLocked" + FROM ${replicationStateTableNameAsId} + WHERE "LockId" != ${escape(lockId)} + ); + INSERT OR IGNORE INTO ${eventsTableNameAsId}( "threadId", "threadCounter", "timestamp", @@ -68,7 +83,30 @@ export const replicateEvents = async ( } AS BLOB)) )` ) - .join(',')}`) + .join(',')}; + COMMIT;`) + return true + } catch (error) { + try { + await executeQuery('ROLLBACK;') + } catch (rollbackError) { + // ignore + } + + const errorMessage = + error != null && error.message != null ? error.message : '' + + const errorCode = + error != null && error.code != null ? (error.code as string) : '' + + if (errorCode === 'SQLITE_BUSY') { + return false + } else if (isIntegerOverflowError(errorMessage)) { + return false + } else { + throw error + } + } } export default replicateEvents diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/replicate-secrets.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/replicate-secrets.ts index e39912117..5b48ced64 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/replicate-secrets.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/replicate-secrets.ts @@ -1,18 +1,41 @@ import type { AdapterPool } from './types' import type { OldSecretRecord } from '@resolve-js/eventstore-base' +import isIntegerOverflowError from './integer-overflow-error' const replicateSecrets = async ( pool: AdapterPool, + lockId: string, existingSecrets: OldSecretRecord[], deletedSecrets: Array -): Promise => { - const { executeQuery, secretsTableName, escape, escapeId } = pool +): Promise => { + const { + executeQuery, + secretsTableName, + eventsTableName, + escape, + escapeId, + } = pool + const replicationStateTableNameAsId = escapeId( + `${eventsTableName}-replication-state` + ) const secretsTableNameAsId = escapeId(secretsTableName) - if (existingSecrets.length > 0) { - await executeQuery( - `INSERT OR IGNORE INTO ${secretsTableNameAsId}( + if (existingSecrets.length > 0 || deletedSecrets.length > 0) { + try { + await executeQuery( + ` + BEGIN IMMEDIATE; + SELECT ABS("ReplicationIsLocked") AS "lock_zero" FROM ( + SELECT 0 AS "ReplicationIsLocked" + UNION ALL + SELECT -9223372036854775808 AS "ReplicationIsLocked" + FROM ${replicationStateTableNameAsId} + WHERE "LockId" != ${escape(lockId)} + ); + ${ + existingSecrets.length > 0 + ? `INSERT OR IGNORE INTO ${secretsTableNameAsId}( "idx", "id", "secret" @@ -23,13 +46,40 @@ const replicateSecrets = async ( secretRecord.secret != null ? escape(secretRecord.secret) : 'NULL' })` ) - .join(',')}` - ) - } - if (deletedSecrets.length > 0) { - await executeQuery(`UPDATE ${secretsTableNameAsId} SET "secret" = NULL - WHERE "id" IN (${deletedSecrets.map((id) => escape(id)).join(',')})`) + .join(',')};` + : `` + } + ${ + deletedSecrets.length > 0 + ? `UPDATE ${secretsTableNameAsId} SET "secret" = NULL + WHERE "id" IN (${deletedSecrets.map((id) => escape(id)).join(',')});` + : `` + } + COMMIT;` + ) + } catch (error) { + try { + await executeQuery('ROLLBACK;') + } catch (rollbackError) { + // ignore + } + + const errorMessage = + error != null && error.message != null ? error.message : '' + + const errorCode = + error != null && error.code != null ? (error.code as string) : '' + + if (errorCode === 'SQLITE_BUSY') { + return false + } else if (isIntegerOverflowError(errorMessage)) { + return false + } else { + throw error + } + } } + return true } export default replicateSecrets diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/reset-replication.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/reset-replication.ts index a7561c013..9c7ba37cc 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/reset-replication.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/reset-replication.ts @@ -26,7 +26,8 @@ const resetReplication = async (pool: AdapterPool): Promise => { "StatusData" = NULL, "Iterator" = NULL, "SuccessEvent" = NULL, - "LockExpirationTime" = 0; + "LockExpirationTime" = 0, + "LockId" = NULL; COMMIT;`) } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/set-replication-lock.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/set-replication-lock.ts index 35e7e4154..5750ecb75 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/set-replication-lock.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/set-replication-lock.ts @@ -3,23 +3,31 @@ import initReplicationStateTable from './init-replication-state-table' const setReplicationLock = async ( pool: AdapterPool, + lockId: string, lockDuration: number ): Promise => { - const { executeStatement, executeQuery, escapeId } = pool + const { executeStatement, escapeId } = pool const replicationStateTableName = await initReplicationStateTable(pool) if (lockDuration <= 0) { - await executeQuery( + const rows = await executeStatement( `UPDATE ${escapeId(replicationStateTableName)} - SET "LockExpirationTime" = 0` + SET + "LockExpirationTime" = 0, + "LockId" = NULL + WHERE "LockId" = ${pool.escape(lockId)} + RETURNING "LockExpirationTime", "LockId"` ) - return true + return rows.length !== 0 } else { const rows = await executeStatement( `UPDATE ${escapeId(replicationStateTableName)} - SET "LockExpirationTime" = CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER) + ${+lockDuration} - WHERE "LockExpirationTime" < CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER) RETURNING "LockExpirationTime"` + SET + "LockExpirationTime" = CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER) + ${+lockDuration}, + "LockId" = ${pool.escape(lockId)} + WHERE "LockExpirationTime" < CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER) + RETURNING "LockExpirationTime"` ) return rows.length !== 0 } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/set-replication-status.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/set-replication-status.ts index 449af63ee..b31ef3eaa 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/set-replication-status.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-lite/src/set-replication-status.ts @@ -4,6 +4,7 @@ import initReplicationStateTable from './init-replication-state-table' const setReplicationStatus = async ( pool: AdapterPool, + lockId: string, { statusAndData, lastEvent, @@ -13,12 +14,12 @@ const setReplicationStatus = async ( lastEvent?: OldEvent iterator?: ReplicationState['iterator'] } -): Promise => { - const { executeQuery, escapeId, escape } = pool +): Promise => { + const { executeStatement, escapeId, escape } = pool const replicationStateTableName = await initReplicationStateTable(pool) - await executeQuery( + const rows = await executeStatement( `UPDATE ${escapeId(replicationStateTableName)} SET "Status" = ${escape(statusAndData.status)}, @@ -38,8 +39,32 @@ const setReplicationStatus = async ( iterator != null ? escape(JSON.stringify(iterator)) : 'NULL' }` : `` - }` + } + WHERE "LockId" = ${escape(lockId)} + RETURNING "Status", "StatusData", "Iterator", "IsPaused", "SuccessEvent", + ("LockExpirationTime" > CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER)) as "Locked", "LockId"` ) + + if (rows.length === 1) { + const row = rows[0] + let lastEvent: OldEvent | null = null + if (row.SuccessEvent != null) { + lastEvent = JSON.parse(row.SuccessEvent) as OldEvent + } + return { + statusAndData: { + status: row.Status, + data: row.StatusData != null ? JSON.parse(row.StatusData) : null, + } as ReplicationState['statusAndData'], + paused: row.IsPaused !== 0, + iterator: row.Iterator != null ? JSON.parse(row.Iterator) : null, + successEvent: lastEvent, + locked: !!row.Locked, + lockId: row.lockId, + } + } else { + return null + } } export default setReplicationStatus diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/errors.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/errors.ts index ea0c4e079..abb2f7d00 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/errors.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/errors.ts @@ -52,6 +52,8 @@ export const isServiceBusyError = (error: any): boolean => { error, /Connection rate is too high, please reduce connection rate/i ) || + checkFormalError(error, '55P03') || + checkFuzzyError(error, /could not obtain lock on relation/) || checkFuzzyError(error, /getaddrinfo/) || checkFuzzyError(error, /SQLState: 08001/) || checkFuzzyError(error, /remaining connection slots are reserved/i)) diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/get-replication-state.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/get-replication-state.ts index 0cdea3f65..2e2edea4e 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/get-replication-state.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/get-replication-state.ts @@ -14,7 +14,7 @@ const getReplicationState = async ( const rows = (await executeStatement( `SELECT "Status", "StatusData", "Iterator", "IsPaused", "SuccessEvent", - ("LockExpirationTime" > (CAST(extract(epoch from clock_timestamp()) * 1000 AS ${LONG_NUMBER_SQL_TYPE}))) as "Locked" + ("LockExpirationTime" > (CAST(extract(epoch from clock_timestamp()) * 1000 AS ${LONG_NUMBER_SQL_TYPE}))) as "Locked", "LockId" FROM ${databaseNameAsId}.${escapeId(replicationStateTableName)}` )) as Array<{ Status: ReplicationState['statusAndData']['status'] @@ -23,6 +23,7 @@ const getReplicationState = async ( IsPaused: boolean SuccessEvent: ReplicationState['successEvent'] Locked: boolean + LockId: string | null }> if (rows.length > 0) { const row = rows[0] @@ -41,6 +42,7 @@ const getReplicationState = async ( iterator: row.Iterator, successEvent: lastEvent, locked: row.Locked, + lockId: row.LockId, } } else { return getInitialReplicationState() diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/init-replication-state-table.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/init-replication-state-table.ts index a13fe8482..68726a7de 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/init-replication-state-table.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/init-replication-state-table.ts @@ -26,7 +26,8 @@ const initReplicationStateTable = async ( "Iterator" JSONB NULL, "IsPaused" BOOLEAN DEFAULT FALSE NOT NULL, "SuccessEvent" JSON NULL, - "LockExpirationTime" ${LONG_NUMBER_SQL_TYPE} DEFAULT 0 NOT NULL + "LockExpirationTime" ${LONG_NUMBER_SQL_TYPE} DEFAULT 0 NOT NULL, + "LockId" VARCHAR(50) DEFAULT NULL ) `) diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/replicate-events.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/replicate-events.ts index 577c284a0..0de1046d6 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/replicate-events.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/replicate-events.ts @@ -15,10 +15,9 @@ type EventWithSize = { export const replicateEvents = async ( pool: AdapterPool, + lockId: string, events: OldEvent[] -): Promise => { - if (events.length === 0) return - +): Promise => { const { executeStatement, eventsTableName, @@ -29,6 +28,9 @@ export const replicateEvents = async ( const eventsTableNameAsId = escapeId(eventsTableName) const threadsTableAsId = escapeId(`${eventsTableName}-threads`) const databaseNameAsId = escapeId(databaseName) + const replicationStateTableNameAsId = escapeId( + `${eventsTableName}-replication-state` + ) const stringRows = (await executeStatement( `SELECT "threadId", MAX("threadCounter") AS "threadCounter" FROM @@ -66,8 +68,6 @@ export const replicateEvents = async ( eventsToInsert.push({ ...event, threadId, threadCounter }) } - if (eventsToInsert.length === 0) return - const calculateEventWithSize = (event: StoredEvent): EventWithSize => { const serializedEvent = [ `${escape(event.aggregateId)},`, @@ -92,7 +92,18 @@ export const replicateEvents = async ( do { shouldRetry = false try { - await executeStatement(`INSERT INTO ${databaseNameAsId}.${eventsTableNameAsId}( + await executeStatement(`BEGIN WORK; + LOCK TABLE ${databaseNameAsId}.${replicationStateTableNameAsId} IN EXCLUSIVE MODE NOWAIT; + WITH "lock_check" AS ( + SELECT 0 AS "lock_zero" WHERE ( + (SELECT 1 AS "ReplicationIsLocked") + UNION ALL + (SELECT 1 AS "ReplicationIsLocked" + FROM ${databaseNameAsId}.${replicationStateTableNameAsId} + WHERE "LockId" != ${escape(lockId)}) + ) = 1 + ) + INSERT INTO ${databaseNameAsId}.${eventsTableNameAsId}( "threadId", "threadCounter", "timestamp", @@ -105,13 +116,14 @@ export const replicateEvents = async ( .map( (eventWithSize) => `(${eventWithSize.event.threadId}, - ${eventWithSize.event.threadCounter}, + ${eventWithSize.event.threadCounter} + (SELECT "lock_zero" FROM "lock_check" LIMIT 1), ${eventWithSize.event.timestamp}, ${eventWithSize.serialized}, ${eventWithSize.size})` ) .join(',')} - ON CONFLICT DO NOTHING`) + ON CONFLICT DO NOTHING; + COMMIT WORK;`) } catch (error) { const errorMessage: string = error.message if (/deadlock detected/.test(errorMessage)) { @@ -126,51 +138,83 @@ export const replicateEvents = async ( let currentBatchSize = 0 const currentEventsBatch: EventWithSize[] = [] - for (const event of eventsToInsert) { - const eventWithSize = calculateEventWithSize(event) + try { + for (const event of eventsToInsert) { + const eventWithSize = calculateEventWithSize(event) - if (eventWithSize.size > MAX_EVENTS_BATCH_BYTE_SIZE) { - await insertEventsBatch([eventWithSize]) - continue + if (eventWithSize.size > MAX_EVENTS_BATCH_BYTE_SIZE) { + await insertEventsBatch([eventWithSize]) + continue + } + + const newCurrentBatchSize = currentBatchSize + eventWithSize.size + if (newCurrentBatchSize > MAX_EVENTS_BATCH_BYTE_SIZE) { + await insertEventsBatch(currentEventsBatch) + currentEventsBatch.length = 0 + currentBatchSize = 0 + } + currentBatchSize += eventWithSize.size + currentEventsBatch.push(eventWithSize) } - const newCurrentBatchSize = currentBatchSize + eventWithSize.size - if (newCurrentBatchSize > MAX_EVENTS_BATCH_BYTE_SIZE) { + if (currentEventsBatch.length) { await insertEventsBatch(currentEventsBatch) - currentEventsBatch.length = 0 - currentBatchSize = 0 } - currentBatchSize += eventWithSize.size - currentEventsBatch.push(eventWithSize) - } - - if (currentEventsBatch.length) { - await insertEventsBatch(currentEventsBatch) - } - type ThreadToUpdate = { - threadId: StoredEvent['threadId'] - threadCounter: StoredEvent['threadCounter'] - } - const threadsToUpdate: ThreadToUpdate[] = [] - for (let i = 0; i < threadCounters.length; ++i) { - if (threadCounters[i] !== undefined) { - threadsToUpdate.push({ - threadId: i, - threadCounter: threadCounters[i] + 1, - }) + type ThreadToUpdate = { + threadId: StoredEvent['threadId'] + threadCounter: StoredEvent['threadCounter'] } - } - if (threadsToUpdate.length > 0) { - await executeStatement(`INSERT INTO ${databaseNameAsId}.${threadsTableAsId} ("threadId","threadCounter") + const threadsToUpdate: ThreadToUpdate[] = [] + for (let i = 0; i < threadCounters.length; ++i) { + if (threadCounters[i] !== undefined) { + threadsToUpdate.push({ + threadId: i, + threadCounter: threadCounters[i] + 1, + }) + } + } + if (threadsToUpdate.length > 0) { + await executeStatement(`BEGIN WORK; + LOCK TABLE ${databaseNameAsId}.${replicationStateTableNameAsId} IN EXCLUSIVE MODE NOWAIT; + WITH "lock_check" AS ( + SELECT 0 AS "lock_zero" WHERE ( + (SELECT 1 AS "ReplicationIsLocked") + UNION ALL + (SELECT 1 AS "ReplicationIsLocked" + FROM ${databaseNameAsId}.${replicationStateTableNameAsId} + WHERE "LockId" != ${escape(lockId)}) + ) = 1 + ) + INSERT INTO ${databaseNameAsId}.${threadsTableAsId} ("threadId","threadCounter") VALUES ${threadsToUpdate .map( (threadToUpdate) => - `(${threadToUpdate.threadId},${threadToUpdate.threadCounter})` + `(${threadToUpdate.threadId},${threadToUpdate.threadCounter} + (SELECT "lock_zero" FROM "lock_check" LIMIT 1))` ) .join( ',' - )} ON CONFLICT ("threadId") DO UPDATE SET "threadCounter" = EXCLUDED."threadCounter"`) + )} ON CONFLICT ("threadId") DO UPDATE SET "threadCounter" = EXCLUDED."threadCounter"; + COMMIT;`) + } + + return true + } catch (error) { + try { + await executeStatement('ROLLBACK;') + } catch (rollbackError) { + // ignore + } + + const errorMessage = + error != null && error.message != null ? error.message : '' + if (errorMessage.indexOf('subquery used as an expression') > -1) { + return false + } else if (errorMessage.indexOf('could not obtain lock on relation') > -1) { + return false + } else { + throw error + } } } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/replicate-secrets.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/replicate-secrets.ts index 78b828cd5..09006bed9 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/replicate-secrets.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/replicate-secrets.ts @@ -3,11 +3,13 @@ import type { OldSecretRecord } from '@resolve-js/eventstore-base' const replicateSecrets = async ( pool: AdapterPool, + lockId: string, existingSecrets: OldSecretRecord[], deletedSecrets: Array -): Promise => { +): Promise => { const { executeStatement, + eventsTableName, secretsTableName, escape, escapeId, @@ -16,10 +18,25 @@ const replicateSecrets = async ( const secretsTableNameAsId = escapeId(secretsTableName) const databaseNameAsId = escapeId(databaseName) + const replicationStateTableNameAsId = escapeId( + `${eventsTableName}-replication-state` + ) - if (existingSecrets.length > 0) { - await executeStatement( - `INSERT INTO ${databaseNameAsId}.${secretsTableNameAsId}( + if (existingSecrets.length > 0 || deletedSecrets.length > 0) { + try { + await executeStatement( + `BEGIN WORK; + LOCK TABLE ${databaseNameAsId}.${replicationStateTableNameAsId} IN EXCLUSIVE MODE NOWAIT; + SELECT 0 AS "lock_zero" WHERE ( + (SELECT 1 AS "ReplicationIsLocked") + UNION ALL + (SELECT 1 AS "ReplicationIsLocked" + FROM ${databaseNameAsId}.${replicationStateTableNameAsId} + WHERE "LockId" != ${escape(lockId)}) + ) = 1; + ${ + existingSecrets.length > 0 + ? `INSERT INTO ${databaseNameAsId}.${secretsTableNameAsId}( "id", "secret" ) VALUES ${existingSecrets @@ -29,13 +46,38 @@ const replicateSecrets = async ( secretRecord.secret != null ? escape(secretRecord.secret) : 'NULL' })` ) - .join(',')} ON CONFLICT DO NOTHING` - ) - } - if (deletedSecrets.length > 0) { - await executeStatement(`UPDATE ${databaseNameAsId}.${secretsTableNameAsId} SET "secret" = NULL - WHERE "id" IN (${deletedSecrets.map((id) => escape(id)).join(',')})`) + .join(',')} ON CONFLICT DO NOTHING;` + : '' + } + ${ + deletedSecrets.length > 0 + ? `UPDATE ${databaseNameAsId}.${secretsTableNameAsId} SET "secret" = NULL + WHERE "id" IN (${deletedSecrets.map((id) => escape(id)).join(',')});` + : '' + } + COMMIT WORK;` + ) + } catch (error) { + try { + await executeStatement('ROLLBACK;') + } catch (rollbackError) { + // ignore + } + + const errorMessage = + error != null && error.message != null ? error.message : '' + if (errorMessage.indexOf('subquery used as an expression') > -1) { + return false + } else if ( + errorMessage.indexOf('could not obtain lock on relation') > -1 + ) { + return false + } else { + throw error + } + } } + return true } export default replicateSecrets diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/reset-replication.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/reset-replication.ts index 65d06eac7..569573aa4 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/reset-replication.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/reset-replication.ts @@ -19,20 +19,20 @@ const resetReplication = async (pool: AdapterPool): Promise => { const notStarted: ReplicationState['statusAndData']['status'] = 'notStarted' - const statements = [ - `TRUNCATE ${databaseNameAsId}.${eventsTableNameAsId}`, - `TRUNCATE ${databaseNameAsId}.${secretsTableNameAsId}`, - `UPDATE ${databaseNameAsId}.${threadsTableAsId} SET "threadCounter" = 0`, - `UPDATE ${databaseNameAsId}.${replicationStateTableNameAsId} SET + await executeStatement(` + BEGIN WORK; + LOCK ${databaseNameAsId}.${replicationStateTableNameAsId} IN ACCESS EXCLUSIVE MODE; + TRUNCATE ${databaseNameAsId}.${eventsTableNameAsId}; + TRUNCATE ${databaseNameAsId}.${secretsTableNameAsId}; + UPDATE ${databaseNameAsId}.${threadsTableAsId} SET "threadCounter" = 0; + UPDATE ${databaseNameAsId}.${replicationStateTableNameAsId} SET "Status" = ${escape(notStarted)}, "StatusData" = NULL, "Iterator" = NULL, "SuccessEvent" = NULL, - "LockExpirationTime" = 0`, - ] - for (const statement of statements) { - await executeStatement(statement) - } + "LockExpirationTime" = 0, + "LockId" = NULL; + COMMIT WORK;`) } export default resetReplication diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/set-replication-lock.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/set-replication-lock.ts index 4cce2b1fa..35e083c55 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/set-replication-lock.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/set-replication-lock.ts @@ -4,6 +4,7 @@ import { LONG_NUMBER_SQL_TYPE } from './constants' const setReplicationLock = async ( pool: AdapterPool, + lockId: string, lockDuration: number ): Promise => { const { executeStatement, escapeId, databaseName } = pool @@ -12,16 +13,23 @@ const setReplicationLock = async ( const databaseNameAsId = escapeId(databaseName) if (lockDuration <= 0) { - await executeStatement( + const rows = await executeStatement( `UPDATE ${databaseNameAsId}.${escapeId(replicationStateTableName)} - SET "LockExpirationTime" = 0` + SET + "LockExpirationTime" = 0, + "LockId" = NULL + WHERE "LockId" = ${pool.escape(lockId)} + RETURNING "LockExpirationTime", "LockId"` ) - return true + return rows.length !== 0 } else { const rows = await executeStatement( `UPDATE ${databaseNameAsId}.${escapeId(replicationStateTableName)} - SET "LockExpirationTime" = CAST(extract(epoch from clock_timestamp()) * 1000 AS ${LONG_NUMBER_SQL_TYPE}) + ${+lockDuration} - WHERE "LockExpirationTime" < CAST(extract(epoch from clock_timestamp()) * 1000 AS ${LONG_NUMBER_SQL_TYPE}) RETURNING "LockExpirationTime"` + SET + "LockExpirationTime" = CAST(extract(epoch from clock_timestamp()) * 1000 AS ${LONG_NUMBER_SQL_TYPE}) + ${+lockDuration}, + "LockId" = ${pool.escape(lockId)} + WHERE "LockExpirationTime" < CAST(extract(epoch from clock_timestamp()) * 1000 AS ${LONG_NUMBER_SQL_TYPE}) + RETURNING "LockExpirationTime", "LockId"` ) return rows.length !== 0 } diff --git a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/set-replication-status.ts b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/set-replication-status.ts index f0a3e0dc1..166689966 100644 --- a/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/set-replication-status.ts +++ b/packages/runtime/adapters/eventstore-adapters/eventstore-postgresql/src/set-replication-status.ts @@ -1,9 +1,11 @@ import type { AdapterPool } from './types' import type { ReplicationState, OldEvent } from '@resolve-js/eventstore-base' +import { LONG_NUMBER_SQL_TYPE } from './constants' import initReplicationStateTable from './init-replication-state-table' const setReplicationStatus = async ( pool: AdapterPool, + lockId: string, { statusAndData, lastEvent, @@ -13,13 +15,13 @@ const setReplicationStatus = async ( lastEvent?: OldEvent iterator?: ReplicationState['iterator'] } -): Promise => { +): Promise => { const { executeStatement, escapeId, escape, databaseName } = pool const replicationStateTableName = await initReplicationStateTable(pool) const databaseNameAsId = escapeId(databaseName) - await executeStatement( + const rows = await executeStatement( `UPDATE ${databaseNameAsId}.${escapeId(replicationStateTableName)} SET "Status" = ${escape(statusAndData.status)}, @@ -39,8 +41,31 @@ const setReplicationStatus = async ( iterator != null ? escape(JSON.stringify(iterator)) : 'NULL' }` : `` - }` + } + WHERE "LockId" = ${pool.escape(lockId)} + RETURNING "Status", "StatusData", "Iterator", "IsPaused", "SuccessEvent", + ("LockExpirationTime" > (CAST(extract(epoch from clock_timestamp()) * 1000 AS ${LONG_NUMBER_SQL_TYPE}))) as "Locked", "LockId"` ) + if (rows.length === 1) { + const row = rows[0] + let lastEvent: OldEvent | null = null + if (row.SuccessEvent != null) { + lastEvent = row.SuccessEvent as OldEvent + } + return { + statusAndData: { + status: row.Status, + data: row.StatusData, + } as ReplicationState['statusAndData'], + paused: row.IsPaused, + iterator: row.Iterator, + successEvent: lastEvent, + locked: row.Locked, + lockId: row.LockId, + } + } else { + return null + } } export default setReplicationStatus diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts index 058aecbee..3c918f519 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/build.ts @@ -74,14 +74,16 @@ const build: ExternalMethods['build'] = async ( return } + let lockId = `${Date.now()}` const timeLeft = getVacantTimeInMillis() try { - const result = await basePool.occupyReplication(basePool, timeLeft) + const result = await basePool.occupyReplication(basePool, lockId, timeLeft) if (result.status === 'alreadyLocked') { await delayNext(getBuildDelay(iterationNumber), { name: 'Error', message: 'Replication process is already locked', }) + return } else if (result.status === 'serviceError') { await delayNext(getBuildDelay(iterationNumber), { name: 'Error', @@ -109,7 +111,7 @@ const build: ExternalMethods['build'] = async ( log.error(error) } try { - await basePool.releaseReplication(basePool) + await basePool.releaseReplication(basePool, lockId) } catch (error) { if (!isHTTPServiceError(error)) log.error(error) } @@ -177,6 +179,7 @@ const build: ExternalMethods['build'] = async ( const result: CallReplicateResult = await basePool.callReplicate( basePool, + lockId, events, existingSecrets, deletedSecrets, @@ -195,7 +198,24 @@ const build: ExternalMethods['build'] = async ( name: 'Error', message: result.message, } - } else if (result.type === 'launched' || result.type === 'processed') { + } else if (result.type === 'processed') { + if ( + result.state === null || + result.state.statusAndData.status !== 'batchDone' + ) { + lastError = { + recoverable: false, + name: 'Error', + message: 'Reported processed, but status is not batchDone', + } + } else { + iterator = { cursor: nextCursor } + appliedEventsCount = + result.state.statusAndData.data.appliedEventsCount + wasPaused = state.paused + iterationNumber = 0 + } + } else if (result.type === 'launched') { while (true) { const state = await basePool.getReplicationState(basePool) if (state.statusAndData.status === 'batchInProgress') { diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/call-replicate.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/call-replicate.ts index 3a6e8e435..e0b8383a2 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/call-replicate.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/call-replicate.ts @@ -5,12 +5,14 @@ import { REPLICATE } from '@resolve-js/module-replication' const callReplicate: InternalMethods['callReplicate'] = async ( pool, + lockId, events, secretsToSet, secretsToDelete, iterator ) => { const data = { + lockId, events, secretsToSet, secretsToDelete, @@ -25,7 +27,7 @@ const callReplicate: InternalMethods['callReplicate'] = async ( } ) let resultType: CallReplicateResult['type'] = 'unknown' - const message = await response.text() + const result = await response.json() if (response.status >= 500) { resultType = 'serverError' } else if (response.status >= 400) { @@ -38,7 +40,8 @@ const callReplicate: InternalMethods['callReplicate'] = async ( return { type: resultType, httpStatus: response.status, - message: message, + message: result.message ?? result, + state: result.state ?? null, } } diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/check-target-url.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/check-target-url.ts index 1f2b7b112..843986646 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/check-target-url.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/check-target-url.ts @@ -21,6 +21,7 @@ const checkTargetUrl = ( iterator: null, successEvent: null, locked: false, + lockId: null, } } return null diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/get-replication-state.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/get-replication-state.ts index 01ad33608..0cd8c6efe 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/get-replication-state.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/get-replication-state.ts @@ -18,6 +18,7 @@ const getReplicationState: InternalMethods['getReplicationState'] = async ({ iterator: null, successEvent: null, locked: false, + lockId: null, } try { diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/occupy-replication.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/occupy-replication.ts index 0aa38cfa5..ef6c80db9 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/occupy-replication.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/occupy-replication.ts @@ -5,12 +5,18 @@ import { OCCUPY_REPLICATION } from '@resolve-js/module-replication' const occupyReplication: InternalMethods['occupyReplication'] = async ( pool, + lockId: string, duration: number ) => { const response = await fetch( - `${pool.targetApplicationUrl}${OCCUPY_REPLICATION.endpoint}?duration=${duration}`, + `${pool.targetApplicationUrl}${OCCUPY_REPLICATION.endpoint}`, { method: OCCUPY_REPLICATION.method, + body: JSON.stringify({ + duration, + lockId, + }), + headers: { 'Content-Type': 'application/json' }, } ) switch (response.status) { diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/release-replication.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/release-replication.ts index 72fff56d7..831b1c274 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/release-replication.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/release-replication.ts @@ -4,10 +4,15 @@ import fetch from 'node-fetch' import { RELEASE_REPLICATION } from '@resolve-js/module-replication' const releaseReplication: InternalMethods['releaseReplication'] = async ( - pool + pool, + lockId: string ) => { await fetch(`${pool.targetApplicationUrl}${RELEASE_REPLICATION.endpoint}`, { method: RELEASE_REPLICATION.method, + body: JSON.stringify({ + lockId, + }), + headers: { 'Content-Type': 'application/json' }, }) } diff --git a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/types.ts b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/types.ts index 046bcde82..c3532d4ec 100644 --- a/packages/runtime/adapters/replicators/replicator-via-api-handler/src/types.ts +++ b/packages/runtime/adapters/replicators/replicator-via-api-handler/src/types.ts @@ -28,10 +28,12 @@ export type CallReplicateResult = { type: 'launched' | 'processed' | 'unknown' | 'serverError' | 'clientError' httpStatus: number message: string + state: ReplicationState | null } export type CallReplicate = ( pool: AdapterPool, + lockId: string, events: OldEvent[], secretsToSet: OldSecretRecord[], secretsToDelete: Array, @@ -55,12 +57,13 @@ export type InternalMethods = { setReplicationPaused: SetReplicationPaused occupyReplication: ( pool: AdapterPool, + lockId: string, duration: number ) => Promise<{ status: 'success' | 'alreadyLocked' | 'serviceError' | 'error' message?: string }> - releaseReplication: (pool: AdapterPool) => Promise + releaseReplication: (pool: AdapterPool, lockId: string) => Promise } export type ArrayOrSingleOrNull = Array | T | null diff --git a/tests/eventstore-replication/index.test.ts b/tests/eventstore-replication/index.test.ts index 414a0fc8b..97e2bc09b 100644 --- a/tests/eventstore-replication/index.test.ts +++ b/tests/eventstore-replication/index.test.ts @@ -40,7 +40,7 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { const adapter: Adapter = adapters['test_replication'] test('get-replication-state should return default state', async () => { - const state: ReplicationState = await adapter.getReplicationState() + const state = await adapter.getReplicationState() expect(state.statusAndData.status).toEqual('notStarted') expect(state.statusAndData.data).toBeNull() @@ -49,9 +49,67 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { expect(state.locked).toEqual(false) }) + test('set-replication-status should do nothing if not locked', async () => { + const startedAt = Date.now() + const result = await adapter.setReplicationStatus('lockId', { + statusAndData: { + status: 'batchInProgress', + data: { + startedAt, + }, + }, + }) + expect(result).toBe(null) + const state = await adapter.getReplicationState() + expect(state.statusAndData.status).toEqual('notStarted') + expect(state.locked).toEqual(false) + }) + + test('set-replication-lock should work as expected', async () => { + const lockDuration = 4000 + expect(await adapter.setReplicationLock('shortLock', lockDuration)).toEqual( + true + ) + let state = await adapter.getReplicationState() + expect(state.locked).toEqual(true) + + expect(await adapter.setReplicationLock('shortLock', lockDuration)).toEqual( + false + ) + await new Promise((resolve) => setTimeout(resolve, lockDuration)) + + state = await adapter.getReplicationState() + expect(state.locked).toBe(false) + + expect( + await adapter.setReplicationLock('toRelease', lockDuration * 2) + ).toBe(true) + expect(await adapter.setReplicationLock('wrongId', lockDuration * 2)).toBe( + false + ) + expect(await adapter.setReplicationLock('wrongId', 0)).toBe(false) + expect(await adapter.setReplicationLock('toRelease', 0)).toBe(true) + state = await adapter.getReplicationState() + expect(state.locked).toBe(false) + }) + test('set-replication-status should change status, statusData properties of the state', async () => { + await adapter.setReplicationLock('lockId', jestTimeout()) + const startedAt = Date.now() - await adapter.setReplicationStatus({ + + expect( + await adapter.setReplicationStatus('wrongId', { + statusAndData: { + status: 'batchInProgress', + data: { + startedAt, + }, + }, + }) + ).toBe(null) + + let setResult = await adapter.setReplicationStatus('lockId', { statusAndData: { status: 'batchInProgress', data: { @@ -59,13 +117,20 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { }, }, }) + expect(setResult).not.toBe(null) + expect(setResult.statusAndData.status).toEqual('batchInProgress') + expect(setResult.statusAndData.data).toEqual({ startedAt }) + let state = await adapter.getReplicationState() expect(state.statusAndData.status).toEqual('batchInProgress') expect(state.statusAndData.data).toEqual({ startedAt }) - await adapter.setReplicationStatus({ + setResult = await adapter.setReplicationStatus('lockId', { statusAndData: { status: 'batchDone', data: { appliedEventsCount: 10 } }, }) + expect(setResult.statusAndData.status).toEqual('batchDone') + expect(setResult.statusAndData.data).toEqual({ appliedEventsCount: 10 }) + state = await adapter.getReplicationState() expect(state.statusAndData.status).toEqual('batchDone') expect(state.statusAndData.data).toEqual({ appliedEventsCount: 10 }) @@ -79,7 +144,7 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { type: 'type', } - await adapter.setReplicationStatus({ + await adapter.setReplicationStatus('lockId', { statusAndData: { status: 'batchDone', data: { appliedEventsCount: 10 } }, lastEvent: event, iterator: { cursor: 'DEAF' }, @@ -89,7 +154,7 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { expect(state.successEvent).toEqual(event) expect(state.iterator).toEqual({ cursor: 'DEAF' }) - await adapter.setReplicationStatus({ + await adapter.setReplicationStatus('lockId', { statusAndData: { status: 'criticalError', data: { name: 'Error', message: '' }, @@ -112,30 +177,27 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { expect(state.paused).toEqual(false) }) - test('set-replication-lock should work as expected', async () => { - const lockDuration = 4000 - expect(await adapter.setReplicationLock(lockDuration)).toEqual(true) - let state = await adapter.getReplicationState() - expect(state.locked).toEqual(true) - - expect(await adapter.setReplicationLock(lockDuration)).toEqual(false) - await new Promise((resolve) => setTimeout(resolve, lockDuration)) - - state = await adapter.getReplicationState() - expect(state.locked).toBe(false) + const secretCount = 36 - expect(await adapter.setReplicationLock(lockDuration * 2)).toBe(true) - expect(await adapter.setReplicationLock(0)).toBe(true) - state = await adapter.getReplicationState() - expect(state.locked).toBe(false) + test('replicate-secrets should return false when using wrong lockId', async () => { + const secretRecords: OldSecretRecord[] = generateSecrets(10) + expect(await adapter.replicateSecrets('wrongId', secretRecords, [])).toBe( + false + ) + expect( + await adapter.replicateSecrets('wrongId', [], [makeIdFromIndex(0)]) + ).toBe(false) + expect( + await adapter.replicateSecrets('wrongId', secretRecords, [ + makeIdFromIndex(0), + ]) + ).toBe(false) }) - const secretCount = 36 - test('replicate-secrets should be able to set secrets', async () => { const secretRecords: OldSecretRecord[] = generateSecrets(secretCount) - await adapter.replicateSecrets(secretRecords, []) + await adapter.replicateSecrets('lockId', secretRecords, []) const { secrets: loadedSecrets } = await adapter.loadSecrets({ limit: secretCount, }) @@ -154,7 +216,7 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { ) { secretsToDelete.push(makeIdFromIndex(i)) } - await adapter.replicateSecrets([], secretsToDelete) + await adapter.replicateSecrets('lockId', [], secretsToDelete) const { secrets: loadedSecrets } = await adapter.loadSecrets({ limit: secretCount, }) @@ -188,7 +250,7 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { secretsToDelete.push(makeIdFromIndex(i)) } - await adapter.replicateSecrets(secretsToSet, secretsToDelete) + await adapter.replicateSecrets('lockId', secretsToSet, secretsToDelete) const { secrets: loadedSecrets } = await adapter.loadSecrets({ limit: secretCount * 2, }) @@ -208,7 +270,7 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { }) } - await adapter.replicateSecrets(secretsToSet, []) + await adapter.replicateSecrets('lockId', secretsToSet, []) const { secrets: loadedSecrets } = await adapter.loadSecrets({ limit: secretCount * 2, }) @@ -222,7 +284,7 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { const { secrets: loadedSecrets } = await adapter.loadSecrets({ limit: secretCount * 2, }) - await adapter.replicateSecrets([], []) + await adapter.replicateSecrets('lockId', [], []) const { secrets: loadedAgainSecrets } = await adapter.loadSecrets({ limit: secretCount * 2, }) @@ -231,13 +293,21 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { const eventCount = 2560 + test('replicate-events should return false when using wrong lockId', async () => { + const events: OldEvent[] = [] + for (let i = 0; i < 10; ++i) { + events.push(makeTestEvent(i)) + } + expect(await adapter.replicateEvents('wrongId', events)).toBe(false) + }) + test('replicate-events should insert events', async () => { const events: OldEvent[] = [] for (let i = 0; i < eventCount; ++i) { events.push(makeTestEvent(i)) } - await adapter.replicateEvents(events) + await adapter.replicateEvents('lockId', events) let loadedEventCount = 0 let currentCursor = null @@ -262,13 +332,13 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { for (let i = eventIndexAfterGap; i < eventCount + addEventCount; ++i) { eventsAfterGap.push(makeTestEvent(i)) } - await adapter.replicateEvents(eventsAfterGap) + await adapter.replicateEvents('lockId', eventsAfterGap) const events: OldEvent[] = [] for (let i = eventCount; i < eventCount + addEventCount; ++i) { events.push(makeTestEvent(i)) } - await adapter.replicateEvents(events) + await adapter.replicateEvents('lockId', events) let loadedEventCount = 0 let currentCursor = null @@ -307,7 +377,7 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { test('should be able to replicate secrets and events after reset', async () => { const secretRecords: OldSecretRecord[] = generateSecrets(secretCount) - await adapter.replicateSecrets(secretRecords, []) + await adapter.replicateSecrets('lockId', secretRecords, []) const { secrets: loadedSecrets } = await adapter.loadSecrets({ limit: secretCount, }) @@ -319,7 +389,7 @@ describe(`${adapterFactory.name}. eventstore adapter replication state`, () => { events.push(makeTestEvent(i)) } - await adapter.replicateEvents(events) + await adapter.replicateEvents('lockId', events) const { events: loadedEvents } = await adapter.loadEvents({ limit: lessEventCount, cursor: null,