Skip to content

Commit

Permalink
Stitch context with out of order LSN positions
Browse files Browse the repository at this point in the history
  • Loading branch information
exAspArk committed Dec 16, 2024
1 parent 10a37f4 commit b1cbd0f
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 131 deletions.
32 changes: 15 additions & 17 deletions core/src/fetched-record-buffer.ts
Original file line number Diff line number Diff line change
@@ -1,35 +1,29 @@
import { FetchedRecord } from './fetched-record'

export class FetchedRecordBuffer {
store: { [subject: string]: { [position: string]: FetchedRecord[] } }
store: { [subject: string]: { [transactionId: string]: FetchedRecord[] } }

constructor() {
this.store = {}
}

static fromStore(store: { [subject: string]: { [position: string]: FetchedRecord[] } }) {
const buffer = new FetchedRecordBuffer()
buffer.store = store
return buffer
}

addFetchedRecord(fetchedRecord: FetchedRecord) {
const newBuffer: FetchedRecordBuffer = Object.assign(Object.create(this), this)
const { subject, changeAttributes } = fetchedRecord
const position = changeAttributes.position.toString()
const existingFetchedRecords = this.store[subject]?.[position]
const transactionId = changeAttributes.transactionId.toString()
const existingFetchedRecords = this.store[subject]?.[transactionId]

if (existingFetchedRecords) {
newBuffer.store = {
...this.store,
[subject]: { ...this.store[subject], [position]: [...existingFetchedRecords, fetchedRecord] },
[subject]: { ...this.store[subject], [transactionId]: [...existingFetchedRecords, fetchedRecord] },
}
return newBuffer
}

newBuffer.store = {
...this.store,
[subject]: { ...(this.store[subject] || []), [position]: [fetchedRecord] },
[subject]: { ...(this.store[subject] || []), [transactionId]: [fetchedRecord] },
}
return newBuffer
}
Expand All @@ -48,17 +42,21 @@ export class FetchedRecordBuffer {
forEach(callback: (subject: string, fetchedRecords: FetchedRecord[]) => void) {
Object.keys(this.store).forEach((subject) => {
const fetchedRecords = Object.values(this.store[subject]).flat()
const sortedFetchedRecords = fetchedRecords.sort(
(a, b) =>
parseInt(a.changeAttributes.position.toString(), 10) - parseInt(b.changeAttributes.position.toString(), 10),
)

// Sort by transactionId, then by streamSequence
const sortedFetchedRecords = fetchedRecords.sort((a, b) => {
if (a.changeAttributes.transactionId !== b.changeAttributes.transactionId) {
return a.changeAttributes.transactionId - b.changeAttributes.transactionId
}
return a.streamSequence - b.streamSequence
})

callback(subject, sortedFetchedRecords)
})
}

fetchedRecordsByPosition(subject: string, position: string) {
return this.store[subject]?.[position] || []
fetchedRecordsByTransactionId(subject: string, transactionId: string) {
return this.store[subject]?.[transactionId] || []
}

sizeBySubject(subject: string) {
Expand Down
26 changes: 12 additions & 14 deletions core/src/specs/fixtures/fetched-records.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,5 @@
import { Operation } from '../../entities/Change'

import { POSITIONS } from './nats-messages'

export const MOCKED_DATE = new Date(1466424490000)

export const CHANGE_ATTRIBUTES = {
Expand All @@ -11,7 +9,7 @@ export const CHANGE_ATTRIBUTES = {
database: 'bemi_dev_source',
context: {},
operation: Operation.CREATE,
position: POSITIONS.CREATE,
position: 35878528,
primaryKey: '2',
queuedAt: MOCKED_DATE,
schema: 'public',
Expand All @@ -26,7 +24,7 @@ export const CHANGE_ATTRIBUTES = {
database: 'bemi_dev_source',
context: { op: 'c' },
operation: Operation.MESSAGE,
position: POSITIONS.CREATE,
position: 35878528,
primaryKey: undefined,
queuedAt: MOCKED_DATE,
schema: '',
Expand All @@ -41,7 +39,7 @@ export const CHANGE_ATTRIBUTES = {
database: 'bemi_dev_source',
context: {},
operation: Operation.UPDATE,
position: POSITIONS.UPDATE,
position: 35878832,
primaryKey: '2',
queuedAt: MOCKED_DATE,
schema: 'public',
Expand All @@ -56,7 +54,7 @@ export const CHANGE_ATTRIBUTES = {
database: 'bemi_dev_source',
context: { op: 'u' },
operation: Operation.MESSAGE,
position: POSITIONS.UPDATE,
position: 35878832,
primaryKey: undefined,
queuedAt: MOCKED_DATE,
schema: '',
Expand All @@ -71,12 +69,12 @@ export const CHANGE_ATTRIBUTES = {
database: 'bemi_dev_source',
context: {},
operation: Operation.DELETE,
position: POSITIONS.DELETE,
position: 35878952,
primaryKey: '2',
queuedAt: MOCKED_DATE,
schema: 'public',
table: 'todo',
transactionId: 767,
transactionId: 770,
before: { id: 2, isCompleted: true, task: 'Test' },
after: {},
},
Expand All @@ -86,12 +84,12 @@ export const CHANGE_ATTRIBUTES = {
database: 'bemi_dev_source',
context: { op: 'd' },
operation: Operation.MESSAGE,
position: POSITIONS.DELETE,
position: 35878952,
primaryKey: undefined,
queuedAt: MOCKED_DATE,
schema: '',
table: '',
transactionId: 767,
transactionId: 770,
before: {},
after: {},
},
Expand All @@ -101,12 +99,12 @@ export const CHANGE_ATTRIBUTES = {
database: 'bemi_dev_source',
context: {},
operation: Operation.MESSAGE,
position: POSITIONS.HEARTBEAT_MESSAGE,
position: 371211568,
primaryKey: undefined,
queuedAt: MOCKED_DATE,
schema: '',
table: '',
transactionId: 769,
transactionId: 771,
before: {},
after: {},
},
Expand All @@ -116,12 +114,12 @@ export const CHANGE_ATTRIBUTES = {
database: 'bemi_dev_source',
context: {},
operation: Operation.TRUNCATE,
position: POSITIONS.TRUNCATE,
position: 35909944,
primaryKey: undefined,
queuedAt: MOCKED_DATE,
schema: 'public',
table: 'todo',
transactionId: 771,
transactionId: 772,
before: {},
after: {},
},
Expand Down
54 changes: 12 additions & 42 deletions core/src/specs/fixtures/nats-messages.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,6 @@ import { JsMsg } from 'nats'

import { encodeData } from '../../nats'

export const POSITIONS = {
CREATE: 35878528,
UPDATE: 35878832,
DELETE: 35878952,
TRUNCATE: 35909944,
HEARTBEAT_MESSAGE: 371211568,
HEARTBEAT_CHANGE: 371211569,
}

export const MESSAGE_DATA = {
CREATE: {
before: null,
Expand All @@ -26,7 +17,7 @@ export const MESSAGE_DATA = {
schema: 'public',
table: 'todo',
txId: 768,
lsn: POSITIONS.CREATE.toString(),
lsn: 35878528,
xmin: null,
},
op: 'c',
Expand All @@ -47,7 +38,7 @@ export const MESSAGE_DATA = {
schema: '',
table: '',
txId: 768,
lsn: POSITIONS.CREATE.toString(),
lsn: 35878528,
xmin: null,
},
message: { prefix: '_bemi', content: 'eyJvcCI6ICJjIn0=' },
Expand All @@ -66,7 +57,7 @@ export const MESSAGE_DATA = {
schema: 'public',
table: 'todo',
txId: 769,
lsn: POSITIONS.UPDATE.toString(),
lsn: 35878832,
xmin: null,
},
op: 'u',
Expand All @@ -87,7 +78,7 @@ export const MESSAGE_DATA = {
schema: '',
table: '',
txId: 769,
lsn: POSITIONS.UPDATE.toString(),
lsn: 35878832,
xmin: null,
},
message: { prefix: '_bemi', content: 'eyJvcCI6ICJ1In0=' },
Expand All @@ -105,8 +96,8 @@ export const MESSAGE_DATA = {
sequence: '[null,"35878952"]',
schema: 'public',
table: 'todo',
txId: 767,
lsn: POSITIONS.DELETE.toString(),
txId: 770,
lsn: 35878952,
xmin: null,
},
op: 'd',
Expand All @@ -126,8 +117,8 @@ export const MESSAGE_DATA = {
sequence: '[null,"35878952"]',
schema: '',
table: '',
txId: 767,
lsn: POSITIONS.DELETE,
txId: 770,
lsn: 35878952,
xmin: null,
},
message: { prefix: '_bemi', content: 'eyJvcCI6ICJkIn0=' },
Expand All @@ -145,8 +136,8 @@ export const MESSAGE_DATA = {
sequence: '["35878976","35909944"]',
schema: 'public',
table: 'todo',
txId: 770,
lsn: POSITIONS.TRUNCATE.toString(),
txId: 772,
lsn: 35909944,
xmin: null,
},
op: 't',
Expand All @@ -166,33 +157,12 @@ export const MESSAGE_DATA = {
sequence: '["371211440","371211568"]',
schema: '',
table: '',
txId: 4215,
lsn: POSITIONS.HEARTBEAT_MESSAGE.toString(),
txId: 771,
lsn: 371211568,
xmin: null,
},
message: { prefix: '_bemi_heartbeat', content: '' },
},
HEARTBEAT_CHANGE: {
before: null,
after: { id: 1, last_heartbeat_at: '2024-04-18T20:40:29.086091Z' },
source: {
version: '2.5.0-SNAPSHOT',
connector: 'postgresql',
name: 'prefix',
ts_ms: 1713472829086,
snapshot: 'false',
db: 'bemi_dev_source',
sequence: '["24830282128","24897389232"]',
schema: '_bemi',
table: 'heartbeats',
txId: 3497,
lsn: 371211569,
xmin: null,
},
op: 'u',
ts_ms: 1713472829090,
transaction: null,
},
NON_BEMI_MESSAGE: {
op: 'm',
ts_ms: 1706128742602,
Expand Down
Loading

0 comments on commit b1cbd0f

Please sign in to comment.