Skip to content

Commit

Permalink
Move watchWriteCheckpoint to SyncRulesBucketStorage.
Browse files Browse the repository at this point in the history
We now always watch checkpoints per sync rules instance.
  • Loading branch information
rkistner committed Feb 7, 2025
1 parent 5d03a36 commit df5de48
Show file tree
Hide file tree
Showing 10 changed files with 453 additions and 513 deletions.
284 changes: 9 additions & 275 deletions modules/module-mongodb-storage/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,16 @@
import { SqlSyncRules } from '@powersync/service-sync-rules';
import { wrapWithAbort } from 'ix/asynciterable/operators/withabort.js';
import { LRUCache } from 'lru-cache/min';
import * as timers from 'timers/promises';

import { storage, sync, utils, WatchWriteCheckpointOptions } from '@powersync/service-core';
import { storage } from '@powersync/service-core';

import { DisposableObserver, ErrorCode, logger, ServiceError } from '@powersync/lib-services-framework';
import { DisposableObserver, logger } from '@powersync/lib-services-framework';
import { v4 as uuid } from 'uuid';

import * as lib_mongo from '@powersync/lib-service-mongodb';
import { mongo } from '@powersync/lib-service-mongodb';

import { PowerSyncMongo } from './implementation/db.js';
import { SyncRuleCheckpointState, SyncRuleDocument } from './implementation/models.js';
import { SyncRuleDocument } from './implementation/models.js';
import { MongoPersistedSyncRulesContent } from './implementation/MongoPersistedSyncRulesContent.js';
import { MongoSyncBucketStorage } from './implementation/MongoSyncBucketStorage.js';
import { generateSlotName } from './implementation/util.js';
Expand Down Expand Up @@ -272,19 +270,13 @@ export class MongoBucketStorage
});
}

async getActiveCheckpoint(): Promise<storage.ActiveCheckpoint> {
const doc = await this.db.sync_rules.findOne(
{
state: storage.SyncRuleState.ACTIVE
},
{
sort: { _id: -1 },
limit: 1,
projection: { _id: 1, last_checkpoint: 1, last_checkpoint_lsn: 1 }
}
);
async getActiveStorage(): Promise<MongoSyncBucketStorage | null> {
const content = await this.getActiveSyncRulesContent();
if (content == null) {
return null;
}

return this.makeActiveCheckpoint(doc);
return this.getInstance(content);
}

async getStorageMetrics(): Promise<storage.StorageMetrics> {
Expand Down Expand Up @@ -370,262 +362,4 @@ export class MongoBucketStorage

return instance!._id;
}

private makeActiveCheckpoint(doc: SyncRuleCheckpointState | null) {
return {
checkpoint: utils.timestampToOpId(doc?.last_checkpoint ?? 0n),
lsn: doc?.last_checkpoint_lsn ?? null,
hasSyncRules() {
return doc != null;
},
getBucketStorage: async () => {
if (doc == null) {
return null;
}
return (await this.storageCache.fetch(doc._id)) ?? null;
}
} satisfies storage.ActiveCheckpoint;
}

/**
* Instance-wide watch on the latest available checkpoint (op_id + lsn).
*/
private async *watchActiveCheckpoint(signal: AbortSignal): AsyncIterable<storage.ActiveCheckpoint> {
const pipeline: mongo.Document[] = [
{
$match: {
operationType: { $in: ['insert', 'update', 'replace'] }
}
},
{
$project: {
operationType: 1,
'documentKey._id': 1,
// For update
'updateDescription.updatedFields.state': 1,
'updateDescription.updatedFields.last_checkpoint': 1,
'updateDescription.updatedFields.last_checkpoint_lsn': 1,
// For insert/replace
'fullDocument._id': 1,
'fullDocument.state': 1,
'fullDocument.last_checkpoint': 1,
'fullDocument.last_checkpoint_lsn': 1
}
}
];

// Use this form instead of (doc: SyncRuleCheckpointState | null = null),
// otherwise we get weird "doc: never" issues.
let doc = null as SyncRuleCheckpointState | null;
let clusterTime = null as mongo.Timestamp | null;

await this.client.withSession(async (session) => {
doc = await this.db.sync_rules.findOne(
{
state: storage.SyncRuleState.ACTIVE
},
{
session,
sort: { _id: -1 },
limit: 1,
projection: {
_id: 1,
last_checkpoint: 1,
last_checkpoint_lsn: 1
}
}
);
const time = session.clusterTime?.clusterTime ?? null;
clusterTime = time;
});
if (clusterTime == null) {
throw new ServiceError(ErrorCode.PSYNC_S2401, 'Could not get clusterTime');
}

if (signal.aborted) {
return;
}

if (doc) {
yield this.makeActiveCheckpoint(doc);
}

const stream = this.db.sync_rules.watch(pipeline, {
// Start at the cluster time where we got the initial doc, to make sure
// we don't skip any updates.
// This may result in the first operation being a duplicate, but we filter
// it out anyway.
startAtOperationTime: clusterTime
});

signal.addEventListener(
'abort',
() => {
stream.close();
},
{ once: true }
);

let lastOp: storage.ActiveCheckpoint | null = null;
let lastDoc: SyncRuleCheckpointState | null = doc;

for await (const update of stream.stream()) {
if (signal.aborted) {
break;
}
if (update.operationType != 'insert' && update.operationType != 'update' && update.operationType != 'replace') {
continue;
}

const doc = await this.getOperationDoc(lastDoc, update);
if (doc == null) {
// Irrelevant update
continue;
}

lastDoc = doc;

const op = this.makeActiveCheckpoint(doc);
// Check for LSN / checkpoint changes - ignore other metadata changes
if (lastOp == null || op.lsn != lastOp.lsn || op.checkpoint != lastOp.checkpoint) {
lastOp = op;
yield op;
}
}
}

// Nothing is done here until a subscriber starts to iterate
private readonly sharedIter = new sync.BroadcastIterable((signal) => {
return this.watchActiveCheckpoint(signal);
});

/**
* User-specific watch on the latest checkpoint and/or write checkpoint.
*/
async *watchWriteCheckpoint(options: WatchWriteCheckpointOptions): AsyncIterable<storage.WriteCheckpoint> {
const { user_id, signal, filter } = options;
let lastCheckpoint: utils.OpId | null = null;
let lastWriteCheckpoint: bigint | null = null;

const iter = wrapWithAbort(this.sharedIter, signal);
for await (const cp of iter) {
const { checkpoint, lsn } = cp;

// lsn changes are not important by itself.
// What is important is:
// 1. checkpoint (op_id) changes.
// 2. write checkpoint changes for the specific user
const bucketStorage = await cp.getBucketStorage();
if (!bucketStorage) {
continue;
}

const lsnFilters: Record<string, string> = lsn ? { 1: lsn } : {};

const currentWriteCheckpoint = await bucketStorage.lastWriteCheckpoint({
user_id,
heads: {
...lsnFilters
}
});

if (currentWriteCheckpoint == lastWriteCheckpoint && checkpoint == lastCheckpoint) {
// No change - wait for next one
// In some cases, many LSNs may be produced in a short time.
// Add a delay to throttle the write checkpoint lookup a bit.
await timers.setTimeout(20 + 10 * Math.random());
continue;
}

lastWriteCheckpoint = currentWriteCheckpoint;
lastCheckpoint = checkpoint;

const syncRules = bucketStorage.getParsedSyncRules(options.parseOptions);

// We do not track individual bucket updates yet, always send an invalidation event.
filter?.({
syncRules,
invalidate: true
});

yield {
base: cp,
writeCheckpoint: currentWriteCheckpoint,
storage: bucketStorage,
syncRules
};
}
}

private async getOperationDoc(
lastDoc: SyncRuleCheckpointState | null,
update: lib_mongo.mongo.ChangeStreamDocument<SyncRuleDocument>
): Promise<SyncRuleCheckpointState | null> {
if (update.operationType == 'insert' || update.operationType == 'replace') {
if (update.fullDocument.state != storage.SyncRuleState.ACTIVE) {
// We only want to keep track of the active docs
return null;
}
return update.fullDocument;
} else if (update.operationType == 'update') {
const updatedFields = update.updateDescription.updatedFields ?? {};
if (lastDoc == null || lastDoc._id != update.documentKey._id) {
// SyncRulesDocument that we haven't seen before.
// We need to lookup the full document.
// This is kinda like fullDocument: 'updateLookup', expect that we do the lookup ourselves,
// and only in this specific case.
const lookupDoc: SyncRuleCheckpointState | null = await this.db.sync_rules.findOne(
{
_id: update.documentKey._id,
state: storage.SyncRuleState.ACTIVE
},
{
limit: 1,
projection: {
_id: 1,
last_checkpoint: 1,
last_checkpoint_lsn: 1
}
}
);
if (lookupDoc == null) {
return null;
}

// These updates may be slightly out of order due to having to to a document lookup .
// Make sure we pick the latest one.
return getMergedCheckpointState(lookupDoc, updatedFields);
} else {
// Update to lastDoc - merge the changes
return getMergedCheckpointState(lastDoc, updatedFields);
}
} else {
// Unknown event type
return null;
}
}
}

function getMergedCheckpointState(lastDoc: SyncRuleCheckpointState, update: Partial<SyncRuleCheckpointState>) {
const mergedDoc: SyncRuleCheckpointState = {
_id: lastDoc._id,
last_checkpoint: update.last_checkpoint ?? lastDoc.last_checkpoint,
last_checkpoint_lsn: update.last_checkpoint_lsn ?? lastDoc.last_checkpoint_lsn
};

// These updates may be slightly out of order due to having to to a document lookup in some cases.
// Make sure we pick the latest one.
if (
mergedDoc.last_checkpoint != null &&
(lastDoc.last_checkpoint == null || mergedDoc.last_checkpoint > lastDoc.last_checkpoint)
) {
return mergedDoc;
} else if (
mergedDoc.last_checkpoint_lsn != null &&
(lastDoc.last_checkpoint_lsn == null || mergedDoc.last_checkpoint_lsn > lastDoc.last_checkpoint_lsn)
) {
return mergedDoc;
} else {
return lastDoc;
}
}
Loading

0 comments on commit df5de48

Please sign in to comment.