Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugReport] DexieStorage and array of string #3778

Closed
wants to merge 10 commits into from
11 changes: 11 additions & 0 deletions docs-src/replication.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ The deleted field must always be exactly `_deleted`. If your remote endpoint use
## The replication cycle

The replication works in cycles. A cycle is triggered when:
- When calling `replicateRxCollection()` (if `autoStart` is `true` as by default)
- Automatically on writes to non-[local](./rx-local-document.md) documents.
- When `liveInterval` is reached from the time of last `run()` cycle.
- The `run()` method is called manually.
Expand Down Expand Up @@ -124,6 +125,16 @@ const replicationState = await replicateRxCollection({
* (optional), default is 5 seconds.
*/
retryTime: number,
/**
* Trigger or not a first replication
* if `false`, the first replication should be trigged by :
* - `replicationState.run()`
* - a write to non-[local](./rx-local-document.md) document
* Used with `liveInterval` greater than `0`, the polling for remote changes starts
* after the first triggered replication.
* (optional), only needed when live=true, default is true.
*/
autoStart: true,
/**
* Optional,
* only needed when you want to replicate remote changes to the local state.
Expand Down
4 changes: 3 additions & 1 deletion src/plugins/replication-graphql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ export function syncGraphQL<RxDocType>(
live = false,
liveInterval = 1000 * 10, // in ms
retryTime = 1000 * 5, // in ms
autoStart = true,
}: SyncOptionsGraphQL<RxDocType>
): RxGraphQLReplicationState<RxDocType> {
const collection = this;
Expand Down Expand Up @@ -253,7 +254,8 @@ export function syncGraphQL<RxDocType>(
waitForLeadership,
live,
liveInterval,
retryTime
retryTime,
autoStart
});

const graphqlReplicationState = new RxGraphQLReplicationState(
Expand Down
119 changes: 57 additions & 62 deletions src/plugins/replication/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ export class RxReplicationStateBase<RxDocType> {
*/
public pendingRetries = 0;

public liveInterval: number;

constructor(
/**
* hash of the identifier, used to flag revisions
Expand All @@ -91,8 +93,9 @@ export class RxReplicationStateBase<RxDocType> {
public readonly pull?: ReplicationPullOptions<RxDocType>,
public readonly push?: ReplicationPushOptions<RxDocType>,
public readonly live?: boolean,
public liveInterval?: number,
liveInterval?: number,
public retryTime?: number,
public autoStart?: boolean,
) {
let replicationStates = REPLICATION_STATE_BY_COLLECTION.get(collection);
if (!replicationStates) {
Expand All @@ -115,6 +118,16 @@ export class RxReplicationStateBase<RxDocType> {
}
});
});
this.liveInterval = liveInterval !== void 0 ? ensureInteger(liveInterval) : 1000 * 10;
}

async continuePolling() {
await this.collection.promiseWait(this.liveInterval);
await this.run(
// do not retry on liveInterval-runs because they might stack up
// when failing
false
);
}

isStopped(): boolean {
Expand All @@ -137,7 +150,7 @@ export class RxReplicationStateBase<RxDocType> {

/**
* Returns a promise that resolves when:
* - All local data is repliacted with the remote
* - All local data is replicated with the remote
* - No replication cycle is running or in retry-state
*/
async awaitInSync(): Promise<true> {
Expand Down Expand Up @@ -194,6 +207,9 @@ export class RxReplicationStateBase<RxDocType> {
}
this.runQueueCount--;
});
if (this.live && this.pull && this.liveInterval > 0 && this.pendingRetries < 1) {
this.runningPromise.then(() => this.continuePolling());
}
return this.runningPromise;
}

Expand Down Expand Up @@ -534,7 +550,8 @@ export function replicateRxCollection<RxDocType>(
live = false,
liveInterval = 1000 * 10,
retryTime = 1000 * 5,
waitForLeadership
waitForLeadership,
autoStart = true,
}: ReplicationOptions<RxDocType>
): RxReplicationState<RxDocType> {

Expand All @@ -555,8 +572,9 @@ export function replicateRxCollection<RxDocType>(
live,
liveInterval,
retryTime,
autoStart
);

ensureInteger(replicationState.liveInterval);
/**
* Always await this Promise to ensure that the current instance
* is leader when waitForLeadership=true
Expand All @@ -567,65 +585,42 @@ export function replicateRxCollection<RxDocType>(
if (replicationState.isStopped()) {
return;
}
if (autoStart) {
replicationState.run();
}
if (replicationState.live && push) {
/**
* When a non-local document is written to the collection,
* we have to run the replication run() once to ensure
* that the change is pushed to the remote.
*/
const changeEventsSub = collection.$.pipe(
filter(cE => !cE.isLocal)
).subscribe(changeEvent => {
if (replicationState.isStopped()) {
return;
}
const doc = getDocumentDataOfRxChangeEvent(changeEvent);

// trigger run() once
replicationState.run();

/**
* Start sync-interval and listeners
* if it is a live replication.
*/
if (replicationState.live) {
const liveInterval: number = ensureInteger(replicationState.liveInterval);
if (pull && liveInterval > 0) {
(async () => {
while (!replicationState.isStopped()) {
await collection.promiseWait(liveInterval);
if (replicationState.isStopped()) {
return;
}
await replicationState.run(
// do not retry on liveInterval-runs because they might stack up
// when failing
false
);
}
})();
}
if (push) {
/**
* When a non-local document is written to the collection,
* we have to run the replication run() once to ensure
* that the change is pushed to the remote.
*/
const changeEventsSub = collection.$.pipe(
filter(cE => !cE.isLocal)
).subscribe(changeEvent => {
if (replicationState.isStopped()) {
return;
}
const doc = getDocumentDataOfRxChangeEvent(changeEvent);

if (
/**
* Do not run() if the change
* was from a pull-replication cycle.
*/
!wasLastWriteFromPullReplication(
replicationState.replicationIdentifierHash,
doc
) ||
/**
* If the event is a delete, we still have to run the replication
* because wasLastWriteFromPullReplication() will give the wrong answer.
*/
changeEvent.operation === 'DELETE'
) {
replicationState.run();
}
});
replicationState.subs.push(changeEventsSub);
}
if (
/**
* Do not run() if the change
* was from a pull-replication cycle.
*/
!wasLastWriteFromPullReplication(
replicationState.replicationIdentifierHash,
doc
) ||
/**
* If the event is a delete, we still have to run the replication
* because wasLastWriteFromPullReplication() will give the wrong answer.
*/
changeEvent.operation === 'DELETE'
) {
replicationState.run();
}
});
replicationState.subs.push(changeEventsSub);
}
});
return replicationState as any;
Expand Down
1 change: 1 addition & 0 deletions src/types/plugins/replication-graphql.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -45,4 +45,5 @@ export type SyncOptionsGraphQL<RxDocType> = {
live?: boolean; // default=false
liveInterval?: number; // time in milliseconds
retryTime?: number; // time in milliseconds
autoStart?: boolean; // default=true
};
6 changes: 6 additions & 0 deletions src/types/plugins/replication.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -104,4 +104,10 @@ export type ReplicationOptions<RxDocType> = {
*
*/
waitForLeadership?: boolean; // default=true
/**
* Calling `replicateRxCollection()` implies to run a replication.
* If set to false, it will not run replication on `replicateRxCollection()`.
* This means you need to call replicationState.run() to trigger the first replication.
*/
autoStart?: boolean; // default=true
}
29 changes: 11 additions & 18 deletions test/unit/bug-report.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* this is a template for a test.
* If you found a bug, edit this test to reproduce it
* and than make a pull-request with that failing test.
* The maintainer will later move your test to the correct possition in the test-suite.
* The maintainer will later move your test to the correct position in the test-suite.
*
* To run this test do:
* - 'npm run test:node' so it runs in nodejs
Expand All @@ -18,7 +18,7 @@ import {
} from '../../';

describe('bug-report.test.js', () => {
it('should fail because it reproduces the bug', async () => {
it.only('should fail because it reproduces the bug', async () => {

/**
* If your test should only run in nodejs or only run in the browser,
Expand All @@ -45,17 +45,12 @@ describe('bug-report.test.js', () => {
type: 'string',
maxLength: 100
},
firstName: {
type: 'string'
names: {
type: 'array',
items:{
type:'string'
}
},
lastName: {
type: 'string'
},
age: {
type: 'integer',
minimum: 0,
maximum: 150
}
}
};

Expand Down Expand Up @@ -83,9 +78,7 @@ describe('bug-report.test.js', () => {
// insert a document
await collections.mycollection.insert({
passportId: 'foobar',
firstName: 'Bob',
lastName: 'Kelso',
age: 56
names: ['Bob', 'Kelso']
});

/**
Expand All @@ -108,15 +101,15 @@ describe('bug-report.test.js', () => {
// find the document in the other tab
const myDocument = await collectionInOtherTab.mycollection
.findOne()
.where('firstName')
.eq('Bob')
.where('passportId')
.eq('foobar')
.exec();

/*
* assert things,
* here your tests should fail to show that there is a bug
*/
assert.strictEqual(myDocument.age, 56);
assert.deepStrictEqual(myDocument.names, ['Bob', 'Kelso']);

// you can also wait for events
const emitted = [];
Expand Down
58 changes: 47 additions & 11 deletions test/unit/replication.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
*/

import assert from 'assert';
import {
import AsyncTestUtil, {
clone,
wait,
waitUntil
Expand Down Expand Up @@ -547,32 +547,68 @@ describe('replication.test.js', () => {
});
it('should push data even if liveInterval is set to 0', async () => {
const {localCollection, remoteCollection} = await getTestCollections({local: 0, remote: 0});
const replicationState = replicateRxCollection({
let callProof = null;
replicateRxCollection({
collection: localCollection,
replicationIdentifier: REPLICATION_IDENTIFIER_TEST,
live: true,
liveInterval: 0,
autoStart: false,
push: {
handler() {
throw new Error();
callProof = 'yeah';
return Promise.resolve();
}
}
});
let error = null;
replicationState.error$.subscribe((err) => {
error = err;
},
});
await replicationState.run();
assert.strictEqual(error, null, 'Throwing pull handler should be called');
// ensure proof is still null once replicateRxCollection()
assert.strictEqual(callProof, null, 'replicateRxCollection should not trigger a push on init.');

// insert a new doc to trigger a push
await localCollection.insert(schemaObjects.humanWithTimestamp());

// wait for storage propagation
await AsyncTestUtil.wait(100);

assert.strictEqual(callProof, 'yeah', 'Throwing pull handler should be called');
localCollection.database.destroy();
remoteCollection.database.destroy();
});
});
config.parallel('other', () => {
describe('autoStart', () => {
it('should run first replication by default', async () => {
const replicationState = replicateRxCollection({
collection: {
database: {},
onDestroy: {then(){}}
} as RxCollection,
replicationIdentifier: REPLICATION_IDENTIFIER_TEST,
live: false,
autoStart: true,
waitForLeadership: false
});
await replicationState.awaitInitialReplication();
assert.strictEqual(replicationState.runCount,1);
});
it('should not run first replication when autoStart is set to false', async () => {
const replicationState = replicateRxCollection({
collection: {
database: {},
onDestroy: {then(){}}
} as RxCollection,
replicationIdentifier: REPLICATION_IDENTIFIER_TEST,
live: false,
autoStart: false,
waitForLeadership: false
});
// by definition awaitInitialReplication would be infinite
assert.strictEqual(replicationState.runCount,0);
});
});
describe('.awaitInSync()', () => {
it('should resolve after some time', async () => {
const { localCollection, remoteCollection } = await getTestCollections({ local: 5, remote: 5 });
const {localCollection, remoteCollection} = await getTestCollections({local: 5, remote: 5});

const replicationState = replicateRxCollection({
collection: localCollection,
Expand Down