Skip to content

Commit

Permalink
Fix tests to use the new apis.
Browse files Browse the repository at this point in the history
  • Loading branch information
rkistner committed Feb 7, 2025
1 parent df5de48 commit 8a6f571
Show file tree
Hide file tree
Showing 4 changed files with 36 additions and 34 deletions.
13 changes: 7 additions & 6 deletions modules/module-mongodb/test/src/change_stream_utils.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
import { mongo } from '@powersync/lib-service-mongodb';
import { ActiveCheckpoint, BucketStorageFactory, OpId, SyncRulesBucketStorage } from '@powersync/service-core';
import { BucketStorageFactory, OpId, ReplicationCheckpoint, SyncRulesBucketStorage } from '@powersync/service-core';
import { test_utils } from '@powersync/service-core-tests';

import { ChangeStream, ChangeStreamOptions } from '@module/replication/ChangeStream.js';
Expand Down Expand Up @@ -138,7 +138,7 @@ export class ChangeStreamTestContext {
export async function getClientCheckpoint(
client: mongo.MongoClient,
db: mongo.Db,
bucketStorage: BucketStorageFactory,
storageFactory: BucketStorageFactory,
options?: { timeout?: number }
): Promise<OpId> {
const start = Date.now();
Expand All @@ -147,14 +147,15 @@ export async function getClientCheckpoint(
// Since we don't use LSNs anymore, the only way to get that is to wait.

const timeout = options?.timeout ?? 50_000;
let lastCp: ActiveCheckpoint | null = null;
let lastCp: ReplicationCheckpoint | null = null;

while (Date.now() - start < timeout) {
const cp = await bucketStorage.getActiveCheckpoint();
lastCp = cp;
if (!cp.hasSyncRules()) {
const storage = await storageFactory.getActiveStorage();
const cp = await storage?.getCheckpoint();
if (cp == null) {
throw new Error('No sync rules available');
}
lastCp = cp;
if (cp.lsn && cp.lsn >= lsn) {
return cp.checkpoint;
}
Expand Down
14 changes: 7 additions & 7 deletions modules/module-mysql/test/src/BinlogStreamUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ import { BinLogStream, BinLogStreamOptions } from '@module/replication/BinLogStr
import { MySQLConnectionManager } from '@module/replication/MySQLConnectionManager.js';
import { logger } from '@powersync/lib-services-framework';
import {
ActiveCheckpoint,
BucketStorageFactory,
OpId,
OplogEntry,
ReplicationCheckpoint,
storage,
SyncRulesBucketStorage
} from '@powersync/service-core';
Expand Down Expand Up @@ -148,7 +148,7 @@ export class BinlogStreamTestContext {

export async function getClientCheckpoint(
connection: mysqlPromise.Connection,
bucketStorage: BucketStorageFactory,
storageFactory: BucketStorageFactory,
options?: { timeout?: number }
): Promise<OpId> {
const start = Date.now();
Expand All @@ -157,16 +157,16 @@ export async function getClientCheckpoint(
// Since we don't use LSNs anymore, the only way to get that is to wait.

const timeout = options?.timeout ?? 50_000;
let lastCp: ActiveCheckpoint | null = null;
let lastCp: ReplicationCheckpoint | null = null;

logger.info('Expected Checkpoint: ' + gtid.comparable);
while (Date.now() - start < timeout) {
const cp = await bucketStorage.getActiveCheckpoint();
lastCp = cp;
//logger.info('Last Checkpoint: ' + lastCp.lsn);
if (!cp.hasSyncRules()) {
const storage = await storageFactory.getActiveStorage();
const cp = await storage?.getCheckpoint();
if (cp == null) {
throw new Error('No sync rules available');
}
lastCp = cp;
if (cp.lsn && cp.lsn >= gtid.comparable) {
return cp.checkpoint;
}
Expand Down
7 changes: 4 additions & 3 deletions modules/module-postgres/test/src/util.ts
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ export function connectPgPool() {

export async function getClientCheckpoint(
db: pgwire.PgClient,
bucketStorage: BucketStorageFactory,
storageFactory: BucketStorageFactory,
options?: { timeout?: number }
): Promise<OpId> {
const start = Date.now();
Expand All @@ -77,8 +77,9 @@ export async function getClientCheckpoint(

logger.info(`Waiting for LSN checkpoint: ${lsn}`);
while (Date.now() - start < timeout) {
const cp = await bucketStorage.getActiveCheckpoint();
if (!cp.hasSyncRules()) {
const storage = await storageFactory.getActiveStorage();
const cp = await storage?.getCheckpoint();
if (cp == null) {
throw new Error('No sync rules available');
}
if (cp.lsn && cp.lsn >= lsn) {
Expand Down
36 changes: 18 additions & 18 deletions packages/service-core-tests/src/tests/register-sync-tests.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,13 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
});

const stream = sync.streamResponse({
storage: f,
bucketStorage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
include_checksum: true,
raw_data: true
},
parseOptions: test_utils.PARSE_OPTIONS,
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: Date.now() / 1000 + 10 } as any
Expand Down Expand Up @@ -109,13 +109,13 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
});

const stream = sync.streamResponse({
storage: f,
bucketStorage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
include_checksum: true,
raw_data: false
},
parseOptions: test_utils.PARSE_OPTIONS,
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: Date.now() / 1000 + 10 } as any
Expand All @@ -134,17 +134,17 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
content: BASIC_SYNC_RULES
});

const storage = await f.getInstance(syncRules);
await storage.autoActivate();
const bucketStorage = await f.getInstance(syncRules);
await bucketStorage.autoActivate();

const stream = sync.streamResponse({
storage: f,
bucketStorage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
include_checksum: true,
raw_data: true
},
parseOptions: test_utils.PARSE_OPTIONS,
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: 0 } as any
Expand All @@ -165,13 +165,13 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
await bucketStorage.autoActivate();

const stream = sync.streamResponse({
storage: f,
bucketStorage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
include_checksum: true,
raw_data: true
},
parseOptions: test_utils.PARSE_OPTIONS,
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: Date.now() / 1000 + 10 } as any
Expand Down Expand Up @@ -222,19 +222,19 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
content: BASIC_SYNC_RULES
});

const storage = await f.getInstance(syncRules);
await storage.autoActivate();
const bucketStorage = await f.getInstance(syncRules);
await bucketStorage.autoActivate();

const exp = Date.now() / 1000 + 0.1;

const stream = sync.streamResponse({
storage: f,
bucketStorage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
include_checksum: true,
raw_data: true
},
parseOptions: test_utils.PARSE_OPTIONS,
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: exp } as any
Expand Down Expand Up @@ -288,13 +288,13 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
});

const stream = sync.streamResponse({
storage: f,
bucketStorage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
include_checksum: true,
raw_data: true
},
parseOptions: test_utils.PARSE_OPTIONS,
tracker,
syncParams: new RequestParameters({ sub: '' }, {}),
token: { exp: Date.now() / 1000 + 10 } as any
Expand Down Expand Up @@ -411,13 +411,13 @@ export function registerSyncTests(factory: storage.TestStorageFactory) {
});

const params: sync.SyncStreamParameters = {
storage: f,
bucketStorage: bucketStorage,
syncRules: bucketStorage.getParsedSyncRules(test_utils.PARSE_OPTIONS),
params: {
buckets: [],
include_checksum: true,
raw_data: true
},
parseOptions: test_utils.PARSE_OPTIONS,
tracker,
syncParams: new RequestParameters({ sub: 'test' }, {}),
token: { sub: 'test', exp: Date.now() / 1000 + 10 } as any
Expand Down

0 comments on commit 8a6f571

Please sign in to comment.