From b1cbd0fff85c34f0a1523c3efe2f1e5b2f4f3c95 Mon Sep 17 00:00:00 2001 From: exAspArk Date: Mon, 16 Dec 2024 10:54:01 -0500 Subject: [PATCH] Stitch context with out of order LSN positions --- core/src/fetched-record-buffer.ts | 32 ++-- core/src/specs/fixtures/fetched-records.ts | 26 ++- core/src/specs/fixtures/nats-messages.ts | 54 ++----- core/src/specs/stitching.spec.ts | 177 ++++++++++++++------- core/src/stitching.ts | 8 +- docs/docs/changelog.md | 5 + 6 files changed, 171 insertions(+), 131 deletions(-) diff --git a/core/src/fetched-record-buffer.ts b/core/src/fetched-record-buffer.ts index 2c26940..e8b4883 100644 --- a/core/src/fetched-record-buffer.ts +++ b/core/src/fetched-record-buffer.ts @@ -1,35 +1,29 @@ import { FetchedRecord } from './fetched-record' export class FetchedRecordBuffer { - store: { [subject: string]: { [position: string]: FetchedRecord[] } } + store: { [subject: string]: { [transactionId: string]: FetchedRecord[] } } constructor() { this.store = {} } - static fromStore(store: { [subject: string]: { [position: string]: FetchedRecord[] } }) { - const buffer = new FetchedRecordBuffer() - buffer.store = store - return buffer - } - addFetchedRecord(fetchedRecord: FetchedRecord) { const newBuffer: FetchedRecordBuffer = Object.assign(Object.create(this), this) const { subject, changeAttributes } = fetchedRecord - const position = changeAttributes.position.toString() - const existingFetchedRecords = this.store[subject]?.[position] + const transactionId = changeAttributes.transactionId.toString() + const existingFetchedRecords = this.store[subject]?.[transactionId] if (existingFetchedRecords) { newBuffer.store = { ...this.store, - [subject]: { ...this.store[subject], [position]: [...existingFetchedRecords, fetchedRecord] }, + [subject]: { ...this.store[subject], [transactionId]: [...existingFetchedRecords, fetchedRecord] }, } return newBuffer } newBuffer.store = { ...this.store, - [subject]: { ...(this.store[subject] || []), [position]: [fetchedRecord] }, + [subject]: { ...(this.store[subject] || []), [transactionId]: [fetchedRecord] }, } return newBuffer } @@ -48,17 +42,21 @@ export class FetchedRecordBuffer { forEach(callback: (subject: string, fetchedRecords: FetchedRecord[]) => void) { Object.keys(this.store).forEach((subject) => { const fetchedRecords = Object.values(this.store[subject]).flat() - const sortedFetchedRecords = fetchedRecords.sort( - (a, b) => - parseInt(a.changeAttributes.position.toString(), 10) - parseInt(b.changeAttributes.position.toString(), 10), - ) + + // Sort by transactionId, then by streamSequence + const sortedFetchedRecords = fetchedRecords.sort((a, b) => { + if (a.changeAttributes.transactionId !== b.changeAttributes.transactionId) { + return a.changeAttributes.transactionId - b.changeAttributes.transactionId + } + return a.streamSequence - b.streamSequence + }) callback(subject, sortedFetchedRecords) }) } - fetchedRecordsByPosition(subject: string, position: string) { - return this.store[subject]?.[position] || [] + fetchedRecordsByTransactionId(subject: string, transactionId: string) { + return this.store[subject]?.[transactionId] || [] } sizeBySubject(subject: string) { diff --git a/core/src/specs/fixtures/fetched-records.ts b/core/src/specs/fixtures/fetched-records.ts index dcab0bc..1420a37 100644 --- a/core/src/specs/fixtures/fetched-records.ts +++ b/core/src/specs/fixtures/fetched-records.ts @@ -1,7 +1,5 @@ import { Operation } from '../../entities/Change' -import { POSITIONS } from './nats-messages' - export const MOCKED_DATE = new Date(1466424490000) export const CHANGE_ATTRIBUTES = { @@ -11,7 +9,7 @@ export const CHANGE_ATTRIBUTES = { database: 'bemi_dev_source', context: {}, operation: Operation.CREATE, - position: POSITIONS.CREATE, + position: 35878528, primaryKey: '2', queuedAt: MOCKED_DATE, schema: 'public', @@ -26,7 +24,7 @@ export const CHANGE_ATTRIBUTES = { database: 'bemi_dev_source', context: { op: 'c' }, operation: Operation.MESSAGE, - position: POSITIONS.CREATE, + position: 35878528, primaryKey: undefined, queuedAt: MOCKED_DATE, schema: '', @@ -41,7 +39,7 @@ export const CHANGE_ATTRIBUTES = { database: 'bemi_dev_source', context: {}, operation: Operation.UPDATE, - position: POSITIONS.UPDATE, + position: 35878832, primaryKey: '2', queuedAt: MOCKED_DATE, schema: 'public', @@ -56,7 +54,7 @@ export const CHANGE_ATTRIBUTES = { database: 'bemi_dev_source', context: { op: 'u' }, operation: Operation.MESSAGE, - position: POSITIONS.UPDATE, + position: 35878832, primaryKey: undefined, queuedAt: MOCKED_DATE, schema: '', @@ -71,12 +69,12 @@ export const CHANGE_ATTRIBUTES = { database: 'bemi_dev_source', context: {}, operation: Operation.DELETE, - position: POSITIONS.DELETE, + position: 35878952, primaryKey: '2', queuedAt: MOCKED_DATE, schema: 'public', table: 'todo', - transactionId: 767, + transactionId: 770, before: { id: 2, isCompleted: true, task: 'Test' }, after: {}, }, @@ -86,12 +84,12 @@ export const CHANGE_ATTRIBUTES = { database: 'bemi_dev_source', context: { op: 'd' }, operation: Operation.MESSAGE, - position: POSITIONS.DELETE, + position: 35878952, primaryKey: undefined, queuedAt: MOCKED_DATE, schema: '', table: '', - transactionId: 767, + transactionId: 770, before: {}, after: {}, }, @@ -101,12 +99,12 @@ export const CHANGE_ATTRIBUTES = { database: 'bemi_dev_source', context: {}, operation: Operation.MESSAGE, - position: POSITIONS.HEARTBEAT_MESSAGE, + position: 371211568, primaryKey: undefined, queuedAt: MOCKED_DATE, schema: '', table: '', - transactionId: 769, + transactionId: 771, before: {}, after: {}, }, @@ -116,12 +114,12 @@ export const CHANGE_ATTRIBUTES = { database: 'bemi_dev_source', context: {}, operation: Operation.TRUNCATE, - position: POSITIONS.TRUNCATE, + position: 35909944, primaryKey: undefined, queuedAt: MOCKED_DATE, schema: 'public', table: 'todo', - transactionId: 771, + transactionId: 772, before: {}, after: {}, }, diff --git a/core/src/specs/fixtures/nats-messages.ts b/core/src/specs/fixtures/nats-messages.ts index 035028a..fa86863 100644 --- a/core/src/specs/fixtures/nats-messages.ts +++ b/core/src/specs/fixtures/nats-messages.ts @@ -2,15 +2,6 @@ import { JsMsg } from 'nats' import { encodeData } from '../../nats' -export const POSITIONS = { - CREATE: 35878528, - UPDATE: 35878832, - DELETE: 35878952, - TRUNCATE: 35909944, - HEARTBEAT_MESSAGE: 371211568, - HEARTBEAT_CHANGE: 371211569, -} - export const MESSAGE_DATA = { CREATE: { before: null, @@ -26,7 +17,7 @@ export const MESSAGE_DATA = { schema: 'public', table: 'todo', txId: 768, - lsn: POSITIONS.CREATE.toString(), + lsn: 35878528, xmin: null, }, op: 'c', @@ -47,7 +38,7 @@ export const MESSAGE_DATA = { schema: '', table: '', txId: 768, - lsn: POSITIONS.CREATE.toString(), + lsn: 35878528, xmin: null, }, message: { prefix: '_bemi', content: 'eyJvcCI6ICJjIn0=' }, @@ -66,7 +57,7 @@ export const MESSAGE_DATA = { schema: 'public', table: 'todo', txId: 769, - lsn: POSITIONS.UPDATE.toString(), + lsn: 35878832, xmin: null, }, op: 'u', @@ -87,7 +78,7 @@ export const MESSAGE_DATA = { schema: '', table: '', txId: 769, - lsn: POSITIONS.UPDATE.toString(), + lsn: 35878832, xmin: null, }, message: { prefix: '_bemi', content: 'eyJvcCI6ICJ1In0=' }, @@ -105,8 +96,8 @@ export const MESSAGE_DATA = { sequence: '[null,"35878952"]', schema: 'public', table: 'todo', - txId: 767, - lsn: POSITIONS.DELETE.toString(), + txId: 770, + lsn: 35878952, xmin: null, }, op: 'd', @@ -126,8 +117,8 @@ export const MESSAGE_DATA = { sequence: '[null,"35878952"]', schema: '', table: '', - txId: 767, - lsn: POSITIONS.DELETE, + txId: 770, + lsn: 35878952, xmin: null, }, message: { prefix: '_bemi', content: 'eyJvcCI6ICJkIn0=' }, @@ -145,8 +136,8 @@ export const MESSAGE_DATA = { sequence: '["35878976","35909944"]', schema: 'public', table: 'todo', - txId: 770, - lsn: POSITIONS.TRUNCATE.toString(), + txId: 772, + lsn: 35909944, xmin: null, }, op: 't', @@ -166,33 +157,12 @@ export const MESSAGE_DATA = { sequence: '["371211440","371211568"]', schema: '', table: '', - txId: 4215, - lsn: POSITIONS.HEARTBEAT_MESSAGE.toString(), + txId: 771, + lsn: 371211568, xmin: null, }, message: { prefix: '_bemi_heartbeat', content: '' }, }, - HEARTBEAT_CHANGE: { - before: null, - after: { id: 1, last_heartbeat_at: '2024-04-18T20:40:29.086091Z' }, - source: { - version: '2.5.0-SNAPSHOT', - connector: 'postgresql', - name: 'prefix', - ts_ms: 1713472829086, - snapshot: 'false', - db: 'bemi_dev_source', - sequence: '["24830282128","24897389232"]', - schema: '_bemi', - table: 'heartbeats', - txId: 3497, - lsn: 371211569, - xmin: null, - }, - op: 'u', - ts_ms: 1713472829090, - transaction: null, - }, NON_BEMI_MESSAGE: { op: 'm', ts_ms: 1706128742602, diff --git a/core/src/specs/stitching.spec.ts b/core/src/specs/stitching.spec.ts index 9f93e83..7aefb05 100644 --- a/core/src/specs/stitching.spec.ts +++ b/core/src/specs/stitching.spec.ts @@ -4,7 +4,6 @@ import { stitchFetchedRecords } from '../stitching' import { FetchedRecord, MESSAGE_PREFIX_CONTEXT, MESSAGE_PREFIX_HEARTBEAT } from '../fetched-record' import { FetchedRecordBuffer } from '../fetched-record-buffer' -import { POSITIONS } from './fixtures/nats-messages' import { MOCKED_DATE, CHANGE_ATTRIBUTES } from './fixtures/fetched-records' const findFetchedRecord = (fetchedRecords: FetchedRecord[], streamSequence: number) => @@ -16,6 +15,102 @@ describe('stitchFetchedRecords', () => { }) describe('when messages in the same batch', () => { + test('stitches changes with context when out of order positions', () => { + const subject = 'bemi-subject' + const fetchedRecords = [ + new FetchedRecord({ + subject, + streamSequence: 1, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE_MESSAGE, transactionId: 1, position: 141527096 }, + messagePrefix: MESSAGE_PREFIX_CONTEXT, + }), + new FetchedRecord({ + subject, + streamSequence: 2, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE, transactionId: 1, position: 141527704 }, + }), + new FetchedRecord({ + subject, + streamSequence: 3, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE_MESSAGE, transactionId: 2, position: 141527704 }, + messagePrefix: MESSAGE_PREFIX_CONTEXT, + }), + new FetchedRecord({ + subject, + streamSequence: 4, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE, transactionId: 2, position: 141527888 }, + }), + new FetchedRecord({ + subject, + streamSequence: 5, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE_MESSAGE, transactionId: 3, position: 141527400 }, + messagePrefix: MESSAGE_PREFIX_CONTEXT, + }), + new FetchedRecord({ + subject, + streamSequence: 6, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE, transactionId: 3, position: 141528024 }, + }), + ] + + const result = stitchFetchedRecords({ + fetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords(fetchedRecords), + useBuffer: true, + }) + + expect(result).toStrictEqual({ + stitchedFetchedRecords: [ + findFetchedRecord(fetchedRecords, 2).setContext(findFetchedRecord(fetchedRecords, 1).context()), + findFetchedRecord(fetchedRecords, 4).setContext(findFetchedRecord(fetchedRecords, 3).context()), + findFetchedRecord(fetchedRecords, 6).setContext(findFetchedRecord(fetchedRecords, 5).context()), + ], + ackStreamSequence: 6, + newFetchedRecordBuffer: new FetchedRecordBuffer(), + }) + }) + + test('stitches changes with context when out of order stream sequences with 2+ records within the same transaction', () => { + const subject = 'bemi-subject' + const fetchedRecords = [ + new FetchedRecord({ + subject, + streamSequence: 1, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE_MESSAGE, transactionId: 1 }, + messagePrefix: MESSAGE_PREFIX_CONTEXT, + }), + new FetchedRecord({ + subject, + streamSequence: 3, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE_MESSAGE, transactionId: 1 }, + messagePrefix: MESSAGE_PREFIX_CONTEXT, + }), + new FetchedRecord({ + subject, + streamSequence: 2, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE, transactionId: 1 }, + }), + new FetchedRecord({ + subject, + streamSequence: 4, + changeAttributes: { ...CHANGE_ATTRIBUTES.CREATE, transactionId: 1 }, + }), + ] + + const result = stitchFetchedRecords({ + fetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords(fetchedRecords), + useBuffer: true, + }) + + expect(result).toStrictEqual({ + stitchedFetchedRecords: [ + findFetchedRecord(fetchedRecords, 2).setContext(findFetchedRecord(fetchedRecords, 1).context()), + findFetchedRecord(fetchedRecords, 4).setContext(findFetchedRecord(fetchedRecords, 3).context()), + ], + ackStreamSequence: 4, + newFetchedRecordBuffer: new FetchedRecordBuffer(), + }) + }) + test('stitches context if it is first, ignores a heartbeat message', () => { const subject = 'bemi-subject' const fetchedRecords = [ @@ -44,11 +139,11 @@ describe('stitchFetchedRecords', () => { findFetchedRecord(fetchedRecords, 2).setContext(findFetchedRecord(fetchedRecords, 1).context()), ], ackStreamSequence: 3, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({}), + newFetchedRecordBuffer: new FetchedRecordBuffer(), }) }) - test('stitches context if it is second and pauses on the one before last position', () => { + test('stitches context if it is second and pauses on the one before last sequence', () => { const subject = 'bemi-subject' const fetchedRecords = [ new FetchedRecord({ subject, streamSequence: 1, changeAttributes: CHANGE_ATTRIBUTES.CREATE }), @@ -73,11 +168,7 @@ describe('stitchFetchedRecords', () => { findFetchedRecord(fetchedRecords, 3), ], ackStreamSequence: 3, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({ - [subject]: { - [POSITIONS.DELETE]: [findFetchedRecord(fetchedRecords, 4)], - }, - }), + newFetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords([findFetchedRecord(fetchedRecords, 4)]), }) }) @@ -87,19 +178,19 @@ describe('stitchFetchedRecords', () => { new FetchedRecord({ subject, streamSequence: 1, - changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, position: 1 }, + changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, transactionId: 1 }, messagePrefix: MESSAGE_PREFIX_HEARTBEAT, }), new FetchedRecord({ subject, streamSequence: 3, - changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, position: 3 }, + changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, transactionId: 3 }, messagePrefix: MESSAGE_PREFIX_HEARTBEAT, }), new FetchedRecord({ subject, streamSequence: 2, - changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, position: 2 }, + changeAttributes: { ...CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE, transactionId: 2 }, messagePrefix: MESSAGE_PREFIX_HEARTBEAT, }), ] @@ -112,7 +203,7 @@ describe('stitchFetchedRecords', () => { expect(result).toStrictEqual({ stitchedFetchedRecords: [], ackStreamSequence: 3, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({}), + newFetchedRecordBuffer: new FetchedRecordBuffer(), }) }) }) @@ -121,7 +212,6 @@ describe('stitchFetchedRecords', () => { test('stitches context across multiple subjects with a heartbeat message and pending context', () => { const subject1 = 'bemi-subject-1' const subject2 = 'bemi-subject-2' - const updateMessagePosition = CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE.position + 1 const fetchedRecords = [ new FetchedRecord({ subject: subject1, @@ -130,6 +220,7 @@ describe('stitchFetchedRecords', () => { messagePrefix: MESSAGE_PREFIX_CONTEXT, }), new FetchedRecord({ subject: subject1, streamSequence: 2, changeAttributes: CHANGE_ATTRIBUTES.CREATE }), + new FetchedRecord({ subject: subject2, streamSequence: 3, @@ -139,7 +230,10 @@ describe('stitchFetchedRecords', () => { new FetchedRecord({ subject: subject2, streamSequence: 4, - changeAttributes: { ...CHANGE_ATTRIBUTES.UPDATE_MESSAGE, position: updateMessagePosition }, + changeAttributes: { + ...CHANGE_ATTRIBUTES.UPDATE_MESSAGE, + transactionId: CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE.transactionId + 1, + }, messagePrefix: MESSAGE_PREFIX_CONTEXT, }), ] @@ -154,18 +248,13 @@ describe('stitchFetchedRecords', () => { findFetchedRecord(fetchedRecords, 2).setContext(findFetchedRecord(fetchedRecords, 1).context()), ], ackStreamSequence: 3, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({ - [subject2]: { - [updateMessagePosition]: [findFetchedRecord(fetchedRecords, 4)], - }, - }), + newFetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords([findFetchedRecord(fetchedRecords, 4)]), }) }) test('stitches context across multiple subjects with a heartbeat message and pending change', () => { const subject1 = 'bemi-subject-1' const subject2 = 'bemi-subject-2' - const updatePosition = CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE.position + 1 const fetchedRecords = [ new FetchedRecord({ subject: subject1, @@ -174,6 +263,7 @@ describe('stitchFetchedRecords', () => { messagePrefix: MESSAGE_PREFIX_CONTEXT, }), new FetchedRecord({ subject: subject1, streamSequence: 2, changeAttributes: CHANGE_ATTRIBUTES.CREATE }), + new FetchedRecord({ subject: subject2, streamSequence: 3, @@ -183,7 +273,10 @@ describe('stitchFetchedRecords', () => { new FetchedRecord({ subject: subject2, streamSequence: 4, - changeAttributes: { ...CHANGE_ATTRIBUTES.UPDATE, position: updatePosition }, + changeAttributes: { + ...CHANGE_ATTRIBUTES.UPDATE, + transactionId: CHANGE_ATTRIBUTES.HEARTBEAT_MESSAGE.transactionId + 1, + }, }), ] @@ -197,11 +290,7 @@ describe('stitchFetchedRecords', () => { findFetchedRecord(fetchedRecords, 2).setContext(findFetchedRecord(fetchedRecords, 1).context()), ], ackStreamSequence: 3, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({ - [subject2]: { - [updatePosition]: [findFetchedRecord(fetchedRecords, 4)], - }, - }), + newFetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords([findFetchedRecord(fetchedRecords, 4)]), }) }) @@ -234,7 +323,7 @@ describe('stitchFetchedRecords', () => { findFetchedRecord(fetchedRecords, 2).setContext(findFetchedRecord(fetchedRecords, 1).context()), ], ackStreamSequence: 3, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({}), + newFetchedRecordBuffer: new FetchedRecordBuffer(), }) }) }) @@ -262,11 +351,7 @@ describe('stitchFetchedRecords', () => { findFetchedRecord(fetchedRecords1, 1).setContext(findFetchedRecord(fetchedRecords1, 2).context()), ], ackStreamSequence: 1, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({ - [subject]: { - [POSITIONS.UPDATE]: [findFetchedRecord(fetchedRecords1, 3)], - }, - }), + newFetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords([findFetchedRecord(fetchedRecords1, 3)]), }) const fetchedRecords2 = [ @@ -293,11 +378,7 @@ describe('stitchFetchedRecords', () => { findFetchedRecord(fetchedRecords1, 3).setContext(findFetchedRecord(fetchedRecords2, 4).context()), ], ackStreamSequence: 3, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({ - [subject]: { - [POSITIONS.DELETE]: [findFetchedRecord(fetchedRecords2, 5)], - }, - }), + newFetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords([findFetchedRecord(fetchedRecords2, 5)]), }) }) @@ -314,11 +395,7 @@ describe('stitchFetchedRecords', () => { expect(result1).toStrictEqual({ stitchedFetchedRecords: [], ackStreamSequence: undefined, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({ - [subject]: { - [POSITIONS.CREATE]: [findFetchedRecord(fetchedRecords1, 1)], - }, - }), + newFetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords(fetchedRecords1), }) const fetchedRecords2 = [ @@ -342,15 +419,11 @@ describe('stitchFetchedRecords', () => { findFetchedRecord(fetchedRecords2, 3), ], ackStreamSequence: 3, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({ - [subject]: { - [POSITIONS.DELETE]: [findFetchedRecord(fetchedRecords2, 4)], - }, - }), + newFetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords([findFetchedRecord(fetchedRecords2, 4)]), }) }) - test('saves pending change messages after receiving a heartbeat message with a greater position', () => { + test('saves pending change messages after receiving a heartbeat message with a greater sequence number', () => { const subject = 'bemi-subject' const fetchedRecords1 = [ new FetchedRecord({ subject, streamSequence: 1, changeAttributes: CHANGE_ATTRIBUTES.CREATE }), @@ -363,11 +436,7 @@ describe('stitchFetchedRecords', () => { expect(result1).toStrictEqual({ stitchedFetchedRecords: [], ackStreamSequence: undefined, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({ - [subject]: { - [POSITIONS.CREATE]: [findFetchedRecord(fetchedRecords1, 1)], - }, - }), + newFetchedRecordBuffer: new FetchedRecordBuffer().addFetchedRecords(fetchedRecords1), }) const fetchedRecords2 = [ @@ -386,7 +455,7 @@ describe('stitchFetchedRecords', () => { expect(result2).toStrictEqual({ stitchedFetchedRecords: [findFetchedRecord(fetchedRecords1, 1)], ackStreamSequence: 2, - newFetchedRecordBuffer: FetchedRecordBuffer.fromStore({}), + newFetchedRecordBuffer: new FetchedRecordBuffer(), }) }) }) diff --git a/core/src/stitching.ts b/core/src/stitching.ts index 2043777..293bb3d 100644 --- a/core/src/stitching.ts +++ b/core/src/stitching.ts @@ -25,9 +25,9 @@ export const stitchFetchedRecords = ({ let maxSubjectSequence: number | undefined = undefined sortedFetchedRecords.forEach((fetchedRecord) => { - const position = fetchedRecord.changeAttributes.position.toString() - const samePositionFetchedRecords = fetchedRecordBuffer.fetchedRecordsByPosition(subject, position) - const contextFetchedRecord = samePositionFetchedRecords.find((r) => r.isContextMessage()) + const transactionId = fetchedRecord.changeAttributes.transactionId.toString() + const sameTransactionIdFetchedRecords = fetchedRecordBuffer.fetchedRecordsByTransactionId(subject, transactionId) + const contextFetchedRecord = sameTransactionIdFetchedRecords.find((r) => r.isContextMessage()) // If it's a heartbeat message/change, use its sequence number if (fetchedRecord.isHeartbeatMessage()) { @@ -42,7 +42,7 @@ export const stitchFetchedRecords = ({ // Last message without a pair - add it to the buffer if ( useBuffer && - samePositionFetchedRecords.length === 1 && // No-pair change or context + sameTransactionIdFetchedRecords.length === 1 && // No-pair change or context fetchedRecord === sortedFetchedRecords[sortedFetchedRecords.length - 1] // Last message ) { newFetchedRecordBuffer = newFetchedRecordBuffer.addFetchedRecord(fetchedRecord) diff --git a/docs/docs/changelog.md b/docs/docs/changelog.md index f6e3d9d..b63903c 100644 --- a/docs/docs/changelog.md +++ b/docs/docs/changelog.md @@ -9,6 +9,11 @@ keywords: ['Bemi Changelog', 'Bemi New Features', 'Postgres Audit Trails', 'Chan # Changelog +## 2024-12 + +* [Bemi Core](https://github.com/BemiHQ/bemi) + * Stitch context with out of order LSN positions + ## 2024-11 * [Bemi Django](https://github.com/BemiHQ/bemi-django)