Skip to content

Commit

Permalink
Merge pull request #9 from powersync-ja/improve-logging
Browse files Browse the repository at this point in the history
Improve logging & reduce sync rule update delay
  • Loading branch information
rkistner authored Jun 10, 2024
2 parents 97b8b2b + 28d4873 commit 511f6d7
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 18 deletions.
2 changes: 1 addition & 1 deletion packages/rsocket-router/src/router/types.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import {router as micro_router} from '@journeyapps-platform/micro';
import { router as micro_router } from '@journeyapps-platform/micro';
import * as t from 'ts-codec';
import { OnExtensionSubscriber, OnNextSubscriber, OnTerminalSubscriber } from 'rsocket-core';
import { SocketRouterObserver } from './SocketRouterListener.js';
Expand Down
8 changes: 5 additions & 3 deletions packages/service-core/src/metrics/Metrics.ts
Original file line number Diff line number Diff line change
Expand Up @@ -146,14 +146,16 @@ export class Metrics {
}
micro.logger.info('Configuring telemetry.');

micro.logger.info(`
micro.logger.info(
`
Attention:
PowerSync collects completely anonymous telemetry regarding usage.
This information is used to shape our roadmap to better serve our customers.
You can learn more, including how to opt-out if you'd not like to participate in this anonymous program, by visiting the following URL:
https://docs.powersync.com/self-hosting/telemetry
Anonymous telemetry is currently: ${options.disable_telemetry_sharing ? 'disabled' : 'enabled'}
`.trim());
`.trim()
);

const configuredExporters: MetricReader[] = [];

Expand All @@ -167,7 +169,7 @@ Anonymous telemetry is currently: ${options.disable_telemetry_sharing ? 'disable
exporter: new OTLPMetricExporter({
url: options.internal_metrics_endpoint
}),
exportIntervalMillis: 1000 * 60 * 5 // 5 minutes
exportIntervalMillis: 1000 * 60 * 5 // 5 minutes
});

configuredExporters.push(periodicExporter);
Expand Down
3 changes: 2 additions & 1 deletion packages/service-core/src/replication/ErrorRateLimiter.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ export class DefaultErrorRateLimiter implements ErrorRateLimiter {

async waitUntilAllowed(options?: { signal?: AbortSignal | undefined } | undefined): Promise<void> {
const delay = Math.max(0, this.nextAllowed - Date.now());
this.setDelay(5_000);
// Minimum delay between connections, even without errors
this.setDelay(500);
await setTimeout(delay, undefined, { signal: options?.signal });
}

Expand Down
4 changes: 2 additions & 2 deletions packages/service-core/src/storage/MongoBucketStorage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -350,12 +350,12 @@ export class MongoBucketStorage implements BucketStorageFactory {

if (!instance) {
const manager = locks.createMongoLockManager(this.db.locks, {
name: `instance-id-insertion-lock`
name: `instance-id-insertion-lock`
});

await manager.lock(async () => {
await this.db.instance.insertOne({
_id: uuid(),
_id: uuid()
});
});

Expand Down
6 changes: 2 additions & 4 deletions packages/service-core/src/storage/mongo/MongoBucketBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -463,10 +463,8 @@ export class MongoBucketBatch implements BucketStorageBatch {

await this.withTransaction(async () => {
flushTry += 1;
if (flushTry == 1) {
micro.logger.info(`${this.slot_name} ${description}`);
} else if (flushTry % 10 == 0) {
micro.logger.info(`${this.slot_name} ${description} ops - try ${flushTry}`);
if (flushTry % 10 == 0) {
micro.logger.info(`${this.slot_name} ${description} - try ${flushTry}`);
}
if (flushTry > 20 && Date.now() > lastTry) {
throw new Error('Max transaction tries exceeded');
Expand Down
27 changes: 25 additions & 2 deletions packages/service-core/src/storage/mongo/PersistedBatch.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ import { JSONBig } from '@powersync/service-jsonbig';
import { EvaluatedParameters, EvaluatedRow } from '@powersync/service-sync-rules';
import * as bson from 'bson';
import * as mongo from 'mongodb';
import * as micro from '@journeyapps-platform/micro';

import * as util from '@/util/util-index.js';
import { SourceTable } from '../SourceTable.js';
Expand Down Expand Up @@ -42,6 +43,11 @@ export class PersistedBatch {
bucketParameters: mongo.AnyBulkWriteOperation<BucketParameterDocument>[] = [];
currentData: mongo.AnyBulkWriteOperation<CurrentDataDocument>[] = [];

/**
* For debug logging only.
*/
debugLastOpId: bigint | null = null;

/**
* Very rough estimate of transaction size.
*/
Expand Down Expand Up @@ -75,13 +81,16 @@ export class PersistedBatch {
const checksum = util.hashData(k.table, k.id, recordData);
this.currentSize += recordData.length + 200;

const op_id = options.op_seq.next();
this.debugLastOpId = op_id;

this.bucketData.push({
insertOne: {
document: {
_id: {
g: this.group_id,
b: k.bucket,
o: options.op_seq.next()
o: op_id
},
op: 'PUT',
source_table: options.table.id,
Expand All @@ -97,13 +106,17 @@ export class PersistedBatch {

for (let bd of remaining_buckets.values()) {
// REMOVE

const op_id = options.op_seq.next();
this.debugLastOpId = op_id;

this.bucketData.push({
insertOne: {
document: {
_id: {
g: this.group_id,
b: bd.bucket,
o: options.op_seq.next()
o: op_id
},
op: 'REMOVE',
source_table: options.table.id,
Expand Down Expand Up @@ -145,7 +158,9 @@ export class PersistedBatch {
const binLookup = serializeLookup(result.lookup);
const hex = binLookup.toString('base64');
remaining_lookups.delete(hex);

const op_id = data.op_seq.next();
this.debugLastOpId = op_id;
this.bucketParameters.push({
insertOne: {
document: {
Expand All @@ -167,6 +182,7 @@ export class PersistedBatch {
// 2. "REMOVE" entries for any lookup not touched.
for (let lookup of remaining_lookups.values()) {
const op_id = data.op_seq.next();
this.debugLastOpId = op_id;
this.bucketParameters.push({
insertOne: {
document: {
Expand Down Expand Up @@ -237,9 +253,16 @@ export class PersistedBatch {
});
}

micro.logger.info(
`powersync_${this.group_id} Flushed ${this.bucketData.length} + ${this.bucketParameters.length} + ${
this.currentData.length
} updates, ${Math.round(this.currentSize / 1024)}kb. Last op_id: ${this.debugLastOpId}`
);

this.bucketData = [];
this.bucketParameters = [];
this.currentData = [];
this.currentSize = 0;
this.debugLastOpId = null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,8 @@ export class CompoundConfigCollector {
migrations: baseConfig.migrations,
telemetry: {
disable_telemetry_sharing: baseConfig.telemetry?.disable_telemetry_sharing ?? false,
internal_service_endpoint: baseConfig.telemetry?.internal_service_endpoint ?? 'https://pulse.journeyapps.com/v1/metrics'
internal_service_endpoint:
baseConfig.telemetry?.internal_service_endpoint ?? 'https://pulse.journeyapps.com/v1/metrics'
},
slot_name_prefix: connections[0]?.slot_name_prefix ?? 'powersync_'
};
Expand Down
10 changes: 6 additions & 4 deletions packages/types/src/config/PowerSyncConfig.ts
Original file line number Diff line number Diff line change
Expand Up @@ -142,10 +142,12 @@ export const powerSyncConfig = t.object({
})
.optional(),

telemetry: t.object({
disable_telemetry_sharing: t.boolean,
internal_service_endpoint: t.string.optional()
}).optional()
telemetry: t
.object({
disable_telemetry_sharing: t.boolean,
internal_service_endpoint: t.string.optional()
})
.optional()
});

export type PowerSyncConfig = t.Decoded<typeof powerSyncConfig>;
Expand Down

0 comments on commit 511f6d7

Please sign in to comment.