Skip to content

Commit

Permalink
Rework replicaiton to correctly handle locks (#2174)
Browse files Browse the repository at this point in the history
  • Loading branch information
FreeSlave authored Dec 9, 2021
1 parent 95d602f commit 4d28c77
Show file tree
Hide file tree
Showing 29 changed files with 691 additions and 196 deletions.
21 changes: 13 additions & 8 deletions packages/core/core/src/types/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ export type ReplicationState = {
iterator: SerializableMap | null
successEvent: OldEvent | null
locked: boolean
lockId: string | null
}

export type EventStoreDescription = {
Expand Down Expand Up @@ -224,20 +225,24 @@ export type Eventstore = {
}>
>

replicateEvents: (events: OldEvent[]) => Promise<void>
replicateEvents: (lockId: string, events: OldEvent[]) => Promise<boolean>
replicateSecrets: (
lockId: string,
existingSecrets: OldSecretRecord[],
deletedSecrets: Array<OldSecretRecord['id']>
) => Promise<void>
setReplicationStatus: (state: {
statusAndData: ReplicationStatusAndData
lastEvent?: OldEvent
iterator?: ReplicationState['iterator']
}) => Promise<void>
) => Promise<boolean>
setReplicationStatus: (
lockId: string,
state: {
statusAndData: ReplicationStatusAndData
lastEvent?: OldEvent
iterator?: ReplicationState['iterator']
}
) => Promise<ReplicationState | null>
setReplicationPaused: (pause: boolean) => Promise<void>
getReplicationState: () => Promise<ReplicationState>
resetReplication: () => Promise<void>
setReplicationLock: (lockDuration: number) => Promise<boolean>
setReplicationLock: (lockId: string, lockDuration: number) => Promise<boolean>

describe: (
options?: EventStoreDescribeOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ import type { ResolveRequest, ResolveResponse } from '@resolve-js/core'
import respondWithError from './respond-with-error'

const handler = async (req: ResolveRequest, res: ResolveResponse) => {
const duration = +req.query.duration
const occupyInfo = JSON.parse(req.body ?? '')
if (occupyInfo == null || typeof occupyInfo.lockId !== 'string') {
res.status(400)
res.end('Expected lockId')
return
}

const duration = +occupyInfo.duration
if (isNaN(duration) || duration <= 0) {
res.status(400)
res.end('Invalid duration provided')
return
}
try {
const success = await req.resolve.eventstoreAdapter.setReplicationLock(
occupyInfo.lockId,
duration
)
if (success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ import respondWithError from './respond-with-error'

const handler = async (req: ResolveRequest, res: ResolveResponse) => {
try {
await req.resolve.eventstoreAdapter.setReplicationLock(0)
const releaseInfo = JSON.parse(req.body ?? '')
if (releaseInfo == null || typeof releaseInfo.lockId !== 'string') {
res.status(400)
res.end('Expected lockId')
return
}
await req.resolve.eventstoreAdapter.setReplicationLock(
releaseInfo.lockId,
0
)
res.status(200)
res.end()
} catch (error) {
Expand Down
196 changes: 153 additions & 43 deletions packages/modules/module-replication/src/api-handlers/replicate.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import type { ResolveRequest, ResolveResponse } from '@resolve-js/core'
import type {
ReplicationState,
ResolveRequest,
ResolveResponse,
} from '@resolve-js/core'
import { getLog } from './get-log'

const checkInput = (input: any) => {
if (typeof input.lockId !== 'string') {
throw new Error('LockId must be provided and be string')
}
if (!Array.isArray(input.events)) {
throw new Error('Events must be array')
}
Expand All @@ -26,7 +33,7 @@ function shouldSaveError(error: any) {
}

const handler = async (req: ResolveRequest, res: ResolveResponse) => {
let input
let input: any
try {
input = JSON.parse(req.body ?? '')
checkInput(input)
Expand All @@ -36,9 +43,10 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
return
}

const lockId: string = input.lockId
const log = getLog('replicate')
try {
await req.resolve.eventstoreAdapter.setReplicationStatus({
await req.resolve.eventstoreAdapter.setReplicationStatus(lockId, {
statusAndData: {
status: 'batchInProgress',
data: {
Expand All @@ -48,53 +56,147 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
iterator: input.iterator,
})

res.status(202)
res.end('Replication has been started')
} catch (error) {
if (shouldSaveError(error)) {
type ReplicationOperationResult = {
status: 'timeout' | 'success' | 'error'
message: string
state?: ReplicationState
}

const replicateData = async (): Promise<ReplicationOperationResult> => {
let result: ReplicationOperationResult = {
status: 'error',
message: 'Uninitialized error',
}
try {
await req.resolve.eventstoreAdapter.setReplicationStatus({
statusAndData: {
status: 'criticalError',
data: {
name: error.name ?? 'Error',
message: error.message ?? 'Unknown error',
},
},
})
} catch (e) {
error.message += '\n'
error.message += e.message
const myLock =
(await req.resolve.eventstoreAdapter.replicateSecrets(
lockId,
input.secretsToSet,
input.secretsToDelete
)) &&
(await req.resolve.eventstoreAdapter.replicateEvents(
lockId,
input.events
))
if (!myLock) {
result = {
status: 'error',
message: `Can't replicate using lock id "${lockId}": someone else occupied the replication lock or database is locked due to reset`,
}
} else {
const state = await req.resolve.eventstoreAdapter.setReplicationStatus(
lockId,
{
statusAndData: {
status: 'batchDone',
data: {
appliedEventsCount: input.events.length,
},
},
lastEvent: input.events[input.events.length - 1],
}
)

if (state) {
result = {
status: 'success',
message: `Completed replication of ${input.events.length} events`,
state,
}
} else {
result = {
status: 'error',
message: `Can't set batchDone status using lock id "${lockId}": someone else occupied the replication lock`,
}
}
}
} catch (error) {
result.message = error.message
if (shouldSaveError(error)) {
try {
await req.resolve.eventstoreAdapter.setReplicationStatus(lockId, {
statusAndData: {
status: 'criticalError',
data: {
name: error.name ?? 'Error',
message: error.message ?? 'Unknown error',
},
},
})
} catch (e) {
log.error(e)
}
} else {
log.debug(error)
}
} finally {
try {
if (result.status === 'success') await req.resolve.broadcastEvent()
} catch (error) {
log.error('broadcastEvent error: ', error)
}
}
} else {
log.debug(error)
return result
}

res.status(500)
res.end(error.message)
return
}
type TimerInfo = {
timeout: NodeJS.Timeout | null
timeoutResolve: ((value: ReplicationOperationResult) => void) | null
timeoutPromise: Promise<ReplicationOperationResult>
}

try {
await req.resolve.eventstoreAdapter.replicateSecrets(
input.secretsToSet,
input.secretsToDelete
)
await req.resolve.eventstoreAdapter.replicateEvents(input.events)
await req.resolve.eventstoreAdapter.setReplicationStatus({
statusAndData: {
status: 'batchDone',
data: {
appliedEventsCount: input.events.length,
},
},
lastEvent: input.events[input.events.length - 1],
})
await req.resolve.broadcastEvent()
const makeTimer = (): TimerInfo => {
const timerInfo: Partial<TimerInfo> = {
timeout: null,
timeoutResolve: null,
}

timerInfo.timeoutPromise = new Promise<ReplicationOperationResult>(
(resolve) => {
timerInfo.timeoutResolve = resolve
timerInfo.timeout = setTimeout(
() =>
resolve({
status: 'timeout',
message: `Batch of ${input.events.length} events took too long to process to respond in place. Continuing replication in background.`,
}),
5000
)
}
)
return timerInfo as TimerInfo
}
const timerInfo = makeTimer()

const result = await Promise.race([
timerInfo.timeoutPromise,
replicateData(),
])
if (result.status !== 'timeout') {
if (timerInfo.timeout !== null) {
clearTimeout(timerInfo.timeout)
timerInfo.timeout = null
}
if (timerInfo.timeoutResolve !== null) {
timerInfo.timeoutResolve({
status: 'timeout',
message: 'Resolving timeout promise',
})
timerInfo.timeoutResolve = null
}
}
if (result.status === 'timeout') {
res.status(202)
} else if (result.status === 'success') {
res.status(200)
} else if (result.status === 'error') {
res.status(500)
}
res.json(result)
} catch (error) {
if (shouldSaveError(error)) {
try {
await req.resolve.eventstoreAdapter.setReplicationStatus({
await req.resolve.eventstoreAdapter.setReplicationStatus(lockId, {
statusAndData: {
status: 'criticalError',
data: {
Expand All @@ -104,11 +206,19 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
},
})
} catch (e) {
log.error(e)
error.message += '\n'
error.message += e.message
}
} else {
log.debug(error)
}

res.status(500)
res.json({
status: 'error',
message: error.message,
})
return
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
successEvent: null,
paused: false,
locked: false,
lockId: null,
}
res.json(result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export function getInitialReplicationState(): ReplicationState {
paused: false,
successEvent: null,
locked: false,
lockId: null,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ const getReplicationState = async (

const rows = await executeStatement(
`SELECT "Status", "StatusData", "Iterator", "IsPaused", "SuccessEvent",
("LockExpirationTime" > CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER)) as "Locked" FROM ${escapeId(
replicationStateTableName
)}`
("LockExpirationTime" > CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER)) as "Locked", "LockId"
FROM ${escapeId(replicationStateTableName)}`
)
if (rows.length > 0) {
const row = rows[0]
Expand All @@ -33,6 +32,7 @@ const getReplicationState = async (
iterator: row.Iterator != null ? JSON.parse(row.Iterator) : null,
successEvent: lastEvent,
locked: !!row.Locked,
lockId: row.lockId,
}
} else {
return getInitialReplicationState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const initReplicationStateTable = async (
"Iterator" JSON NULL,
"IsPaused" TINYINT DEFAULT 0 NOT NULL,
"SuccessEvent" JSON NULL,
"LockExpirationTime" BIGINT DEFAULT 0 NOT NULL
"LockExpirationTime" BIGINT DEFAULT 0 NOT NULL,
"LockId" VARCHAR(50) DEFAULT NULL
);
INSERT OR IGNORE INTO ${replicationStateTableNameAsId} DEFAULT VALUES;
COMMIT;
Expand Down
Loading

0 comments on commit 4d28c77

Please sign in to comment.