Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework replicaiton to correctly handle locks #2174

Merged
merged 2 commits into from
Dec 9, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 13 additions & 8 deletions packages/core/core/src/types/runtime.ts
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@ export type ReplicationState = {
iterator: SerializableMap | null
successEvent: OldEvent | null
locked: boolean
lockId: string | null
}

export type EventStoreDescription = {
Expand Down Expand Up @@ -224,20 +225,24 @@ export type Eventstore = {
}>
>

replicateEvents: (events: OldEvent[]) => Promise<void>
replicateEvents: (lockId: string, events: OldEvent[]) => Promise<boolean>
replicateSecrets: (
lockId: string,
existingSecrets: OldSecretRecord[],
deletedSecrets: Array<OldSecretRecord['id']>
) => Promise<void>
setReplicationStatus: (state: {
statusAndData: ReplicationStatusAndData
lastEvent?: OldEvent
iterator?: ReplicationState['iterator']
}) => Promise<void>
) => Promise<boolean>
setReplicationStatus: (
lockId: string,
state: {
statusAndData: ReplicationStatusAndData
lastEvent?: OldEvent
iterator?: ReplicationState['iterator']
}
) => Promise<ReplicationState | null>
setReplicationPaused: (pause: boolean) => Promise<void>
getReplicationState: () => Promise<ReplicationState>
resetReplication: () => Promise<void>
setReplicationLock: (lockDuration: number) => Promise<boolean>
setReplicationLock: (lockId: string, lockDuration: number) => Promise<boolean>

describe: (
options?: EventStoreDescribeOptions
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,22 @@ import type { ResolveRequest, ResolveResponse } from '@resolve-js/core'
import respondWithError from './respond-with-error'

const handler = async (req: ResolveRequest, res: ResolveResponse) => {
const duration = +req.query.duration
const occupyInfo = JSON.parse(req.body ?? '')
if (occupyInfo == null || typeof occupyInfo.lockId !== 'string') {
res.status(400)
res.end('Expected lockId')
return
}

const duration = +occupyInfo.duration
if (isNaN(duration) || duration <= 0) {
res.status(400)
res.end('Invalid duration provided')
return
}
try {
const success = await req.resolve.eventstoreAdapter.setReplicationLock(
occupyInfo.lockId,
duration
)
if (success) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,16 @@ import respondWithError from './respond-with-error'

const handler = async (req: ResolveRequest, res: ResolveResponse) => {
try {
await req.resolve.eventstoreAdapter.setReplicationLock(0)
const releaseInfo = JSON.parse(req.body ?? '')
if (releaseInfo == null || typeof releaseInfo.lockId !== 'string') {
res.status(400)
res.end('Expected lockId')
return
}
await req.resolve.eventstoreAdapter.setReplicationLock(
releaseInfo.lockId,
0
)
res.status(200)
res.end()
} catch (error) {
Expand Down
196 changes: 153 additions & 43 deletions packages/modules/module-replication/src/api-handlers/replicate.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,14 @@
import type { ResolveRequest, ResolveResponse } from '@resolve-js/core'
import type {
ReplicationState,
ResolveRequest,
ResolveResponse,
} from '@resolve-js/core'
import { getLog } from './get-log'

const checkInput = (input: any) => {
if (typeof input.lockId !== 'string') {
throw new Error('LockId must be provided and be string')
}
if (!Array.isArray(input.events)) {
throw new Error('Events must be array')
}
Expand All @@ -26,7 +33,7 @@ function shouldSaveError(error: any) {
}

const handler = async (req: ResolveRequest, res: ResolveResponse) => {
let input
let input: any
try {
input = JSON.parse(req.body ?? '')
checkInput(input)
Expand All @@ -36,9 +43,10 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
return
}

const lockId: string = input.lockId
const log = getLog('replicate')
try {
await req.resolve.eventstoreAdapter.setReplicationStatus({
await req.resolve.eventstoreAdapter.setReplicationStatus(lockId, {
statusAndData: {
status: 'batchInProgress',
data: {
Expand All @@ -48,53 +56,147 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
iterator: input.iterator,
})

res.status(202)
res.end('Replication has been started')
} catch (error) {
if (shouldSaveError(error)) {
type ReplicationOperationResult = {
status: 'timeout' | 'success' | 'error'
message: string
state?: ReplicationState
}

const replicateData = async (): Promise<ReplicationOperationResult> => {
let result: ReplicationOperationResult = {
status: 'error',
message: 'Uninitialized error',
}
try {
await req.resolve.eventstoreAdapter.setReplicationStatus({
statusAndData: {
status: 'criticalError',
data: {
name: error.name ?? 'Error',
message: error.message ?? 'Unknown error',
},
},
})
} catch (e) {
error.message += '\n'
error.message += e.message
const myLock =
(await req.resolve.eventstoreAdapter.replicateSecrets(
lockId,
input.secretsToSet,
input.secretsToDelete
)) &&
(await req.resolve.eventstoreAdapter.replicateEvents(
lockId,
input.events
))
if (!myLock) {
result = {
status: 'error',
message: `Can't replicate using lock id "${lockId}": someone else occupied the replication lock or database is locked due to reset`,
}
} else {
const state = await req.resolve.eventstoreAdapter.setReplicationStatus(
lockId,
{
statusAndData: {
status: 'batchDone',
data: {
appliedEventsCount: input.events.length,
},
},
lastEvent: input.events[input.events.length - 1],
}
)

if (state) {
result = {
status: 'success',
message: `Completed replication of ${input.events.length} events`,
state,
}
} else {
result = {
status: 'error',
message: `Can't set batchDone status using lock id "${lockId}": someone else occupied the replication lock`,
}
}
}
} catch (error) {
result.message = error.message
if (shouldSaveError(error)) {
try {
await req.resolve.eventstoreAdapter.setReplicationStatus(lockId, {
statusAndData: {
status: 'criticalError',
data: {
name: error.name ?? 'Error',
message: error.message ?? 'Unknown error',
},
},
})
} catch (e) {
log.error(e)
}
} else {
log.debug(error)
}
} finally {
try {
if (result.status === 'success') await req.resolve.broadcastEvent()
} catch (error) {
log.error('broadcastEvent error: ', error)
}
}
} else {
log.debug(error)
return result
}

res.status(500)
res.end(error.message)
return
}
type TimerInfo = {
timeout: NodeJS.Timeout | null
timeoutResolve: ((value: ReplicationOperationResult) => void) | null
timeoutPromise: Promise<ReplicationOperationResult>
}

try {
await req.resolve.eventstoreAdapter.replicateSecrets(
input.secretsToSet,
input.secretsToDelete
)
await req.resolve.eventstoreAdapter.replicateEvents(input.events)
await req.resolve.eventstoreAdapter.setReplicationStatus({
statusAndData: {
status: 'batchDone',
data: {
appliedEventsCount: input.events.length,
},
},
lastEvent: input.events[input.events.length - 1],
})
await req.resolve.broadcastEvent()
const makeTimer = (): TimerInfo => {
const timerInfo: Partial<TimerInfo> = {
timeout: null,
timeoutResolve: null,
}

timerInfo.timeoutPromise = new Promise<ReplicationOperationResult>(
(resolve) => {
timerInfo.timeoutResolve = resolve
timerInfo.timeout = setTimeout(
() =>
resolve({
status: 'timeout',
message: `Batch of ${input.events.length} events took too long to process to respond in place. Continuing replication in background.`,
}),
5000
)
}
)
return timerInfo as TimerInfo
}
const timerInfo = makeTimer()

const result = await Promise.race([
timerInfo.timeoutPromise,
replicateData(),
])
if (result.status !== 'timeout') {
if (timerInfo.timeout !== null) {
clearTimeout(timerInfo.timeout)
timerInfo.timeout = null
}
if (timerInfo.timeoutResolve !== null) {
timerInfo.timeoutResolve({
status: 'timeout',
message: 'Resolving timeout promise',
})
timerInfo.timeoutResolve = null
}
}
if (result.status === 'timeout') {
res.status(202)
} else if (result.status === 'success') {
res.status(200)
} else if (result.status === 'error') {
res.status(500)
}
res.json(result)
} catch (error) {
if (shouldSaveError(error)) {
try {
await req.resolve.eventstoreAdapter.setReplicationStatus({
await req.resolve.eventstoreAdapter.setReplicationStatus(lockId, {
statusAndData: {
status: 'criticalError',
data: {
Expand All @@ -104,11 +206,19 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
},
})
} catch (e) {
log.error(e)
error.message += '\n'
error.message += e.message
}
} else {
log.debug(error)
}

res.status(500)
res.json({
status: 'error',
message: error.message,
})
return
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ const handler = async (req: ResolveRequest, res: ResolveResponse) => {
successEvent: null,
paused: false,
locked: false,
lockId: null,
}
res.json(result)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export function getInitialReplicationState(): ReplicationState {
paused: false,
successEvent: null,
locked: false,
lockId: null,
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@ const getReplicationState = async (

const rows = await executeStatement(
`SELECT "Status", "StatusData", "Iterator", "IsPaused", "SuccessEvent",
("LockExpirationTime" > CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER)) as "Locked" FROM ${escapeId(
replicationStateTableName
)}`
("LockExpirationTime" > CAST(strftime('%s','now') || substr(strftime('%f','now'),4) AS INTEGER)) as "Locked", "LockId"
FROM ${escapeId(replicationStateTableName)}`
)
if (rows.length > 0) {
const row = rows[0]
Expand All @@ -33,6 +32,7 @@ const getReplicationState = async (
iterator: row.Iterator != null ? JSON.parse(row.Iterator) : null,
successEvent: lastEvent,
locked: !!row.Locked,
lockId: row.lockId,
}
} else {
return getInitialReplicationState()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,8 @@ const initReplicationStateTable = async (
"Iterator" JSON NULL,
"IsPaused" TINYINT DEFAULT 0 NOT NULL,
"SuccessEvent" JSON NULL,
"LockExpirationTime" BIGINT DEFAULT 0 NOT NULL
"LockExpirationTime" BIGINT DEFAULT 0 NOT NULL,
"LockId" VARCHAR(50) DEFAULT NULL
);
INSERT OR IGNORE INTO ${replicationStateTableNameAsId} DEFAULT VALUES;
COMMIT;
Expand Down
Loading