Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GraphQL Replication Plugin: Allow syncing revisions #2000

Merged
merged 6 commits into from
Apr 13, 2020
Merged
36 changes: 33 additions & 3 deletions src/plugins/replication-graphql/crawling-checkpoint.ts
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,9 @@ export async function setLastPushSequence(
export async function getChangesSinceLastPushSequence(
collection: RxCollection,
endpointHash: string,
lastPulledRevField: string,
batchSize = 10,
syncRevisions: boolean = false,
): Promise<{ results: { id: string, seq: number, changes: { rev: string }[] }[], last_seq: number }> {
let lastPushSequence = await getLastPushSequence(
collection,
Expand All @@ -94,9 +96,11 @@ export async function getChangesSinceLastPushSequence(
changes = await collection.pouch.changes({
since: lastPushSequence,
limit: batchSize,
include_docs: true
});
const useResults = changes.results.filter((change: any) => {
include_docs: true,
// style: 'all_docs'
} as any);

const filteredResults = changes.results.filter((change: any) => {
/**
* filter out changes with revisions resulting from the pull-stream
* so that they will not be upstreamed again
Expand All @@ -106,6 +110,7 @@ export async function getChangesSinceLastPushSequence(
change.doc._rev
)) return false;

if (change.doc[lastPulledRevField] === change.doc._rev) return false;
/**
* filter out internal docs
* that are used for views or indexes in pouchdb
Expand All @@ -115,6 +120,31 @@ export async function getChangesSinceLastPushSequence(
return true;
});

let useResults = filteredResults;

if (filteredResults.length > 0 && syncRevisions) {
const docsSearch = filteredResults.map((result: any) => {
return {
id: result.id,
rev: result.doc._rev
};
});

const bulkGetDocs = await collection.pouch.bulkGet({
docs: docsSearch,
revs: true,
latest: true
});

useResults = bulkGetDocs.results.map((result: any) => {
return {
id: result.id,
doc: result.docs[0]['ok'],
deleted: result.docs[0]['ok']._deleted
};
});
}

if (useResults.length === 0 && changes.results.length === batchSize) {
// no pushable docs found but also not reached the end -> re-run
lastPushSequence = changes.last_seq;
Expand Down
75 changes: 46 additions & 29 deletions src/plugins/replication-graphql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -69,9 +69,11 @@ export class RxGraphQLReplicationState {
public pull: GraphQLSyncPullOptions,
public push: GraphQLSyncPushOptions,
public deletedFlag: string,
public lastPulledRevField: string,
public live: boolean,
public liveInterval: number,
public retryTime: number
public retryTime: number,
public syncRevisions: boolean
) {
this.client = GraphQLClient({
url,
Expand Down Expand Up @@ -195,6 +197,7 @@ export class RxGraphQLReplicationState {
let result;
try {
result = await this.client.query(pullGraphQL.query, pullGraphQL.variables);

if (result.errors) {
throw new Error(result.errors);
}
Expand Down Expand Up @@ -249,16 +252,22 @@ export class RxGraphQLReplicationState {
const changes = await getChangesSinceLastPushSequence(
this.collection,
this.endpointHash,
this.push.batchSize
this.lastPulledRevField,
this.push.batchSize,
this.syncRevisions
);

const changesWithDocs = changes.results.map((change: any) => {
let doc = change['doc'];

doc[this.deletedFlag] = !!change['deleted'];
delete doc._rev;
delete doc._deleted;
delete doc._attachments;
delete doc[this.lastPulledRevField];

if (!this.syncRevisions) {
delete doc._rev;
}

doc = (this.push as any).modifier(doc);

Expand All @@ -282,7 +291,7 @@ export class RxGraphQLReplicationState {
const pushObj = this.push.queryBuilder(changeWithDoc.doc);
const result = await this.client.query(pushObj.query, pushObj.variables);
if (result.errors) {
throw new Error(result.errors);
throw new Error(JSON.stringify(result.errors));
} else {
this._subjects.send.next(changeWithDoc.doc);
lastSuccessfullChange = changeWithDoc;
Expand Down Expand Up @@ -330,35 +339,39 @@ export class RxGraphQLReplicationState {
// console.log('handleDocumentFromRemote(' + toPouch._id + ') start');
toPouch._deleted = deletedValue;
delete toPouch[this.deletedFlag];
const primaryValue = toPouch._id;

const pouchState = docsWithRevisions[primaryValue];
let newRevision = createRevisionForPulledDocument(
this.endpointHash,
toPouch
);
if (pouchState) {
const newRevisionHeight = pouchState.revisions.start + 1;
const revisionId = newRevision;
newRevision = newRevisionHeight + '-' + newRevision;
toPouch._revisions = {
start: newRevisionHeight,
ids: pouchState.revisions.ids
};
toPouch._revisions.ids.unshift(revisionId);
if (!this.syncRevisions) {
const primaryValue = toPouch._id;

const pouchState = docsWithRevisions[primaryValue];
let newRevision = createRevisionForPulledDocument(
this.endpointHash,
toPouch
);
if (pouchState) {
const newRevisionHeight = pouchState.revisions.start + 1;
const revisionId = newRevision;
newRevision = newRevisionHeight + '-' + newRevision;
toPouch._revisions = {
start: newRevisionHeight,
ids: pouchState.revisions.ids
};
toPouch._revisions.ids.unshift(revisionId);
} else {
newRevision = '1-' + newRevision;
}

toPouch._rev = newRevision;
} else {
newRevision = '1-' + newRevision;
toPouch[this.lastPulledRevField] = toPouch._rev;
}
toPouch._rev = newRevision;

await this.collection.pouch.bulkDocs(
[
toPouch
],
{
new_edits: false
}
);
], {
new_edits: false
});

/**
* because bulkDocs with new_edits: false
Expand All @@ -367,14 +380,14 @@ export class RxGraphQLReplicationState {
* so other instances get informed about it
*/
const originalDoc = flatClone(toPouch);

if (deletedValue) {
originalDoc._deleted = deletedValue;
} else {
delete originalDoc._deleted;
}
delete originalDoc[this.deletedFlag];
delete originalDoc._revisions;
originalDoc._rev = newRevision;

const cE = changeEventfromPouchChange(
originalDoc,
Expand All @@ -401,10 +414,12 @@ export function syncGraphQL(
pull,
push,
deletedFlag,
lastPulledRevField = 'last_pulled_rev',
live = false,
liveInterval = 1000 * 10, // in ms
retryTime = 1000 * 5, // in ms
autoStart = true // if this is false, the replication does nothing at start
autoStart = true, // if this is false, the replication does nothing at start
syncRevisions = false,
}: any
) {
const collection = this;
Expand All @@ -427,9 +442,11 @@ export function syncGraphQL(
pull,
push,
deletedFlag,
lastPulledRevField,
live,
liveInterval,
retryTime
retryTime,
syncRevisions
);

if (!autoStart) return replicationState;
Expand Down
1 change: 1 addition & 0 deletions src/types/plugins/replication-graphql.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,4 +24,5 @@ export type SyncOptionsGraphQL = {
liveInterval?: number; // time in ms
retryTime?: number; // time in ms
autoStart?: boolean; // if this is false, the replication does nothing at start
syncRevisions?: boolean;
};
14 changes: 13 additions & 1 deletion test/helper/graphql-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -81,19 +81,31 @@ export async function spawn<T>(
type Mutation {
setHuman(human: HumanInput): Human
}
input RevisionInput {
start: Int!,
ids: [String!]!
}
input HumanInput {
id: ID!,
name: String!,
age: Int!,
updatedAt: Int!,
deleted: Boolean!
deleted: Boolean!,
_rev: String,
_revisions: RevisionInput,
}
type Revision {
start: Int!,
ids: [String!]!
}
type Human {
id: ID!,
name: String!,
age: Int!,
updatedAt: Int!,
deleted: Boolean!
_rev: String,
_revisions: Revision,
}
type Subscription {
humanChanged: Human
Expand Down
6 changes: 6 additions & 0 deletions test/helper/schema-objects.ts
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,12 @@ export interface HumanWithTimestampDocumentType {
name: string;
age: number;
updatedAt: number;
_rev?: string;
_revisions?: {
start: number;
ids: [string];
},
last_pulled_rev?: string;
}
export function humanWithTimestamp(): HumanWithTimestampDocumentType {
const now = new Date().getTime() / 1000;
Expand Down
6 changes: 6 additions & 0 deletions test/helper/schemas.ts
Original file line number Diff line number Diff line change
Expand Up @@ -844,6 +844,9 @@ export const humanWithTimestamp: RxJsonSchema<HumanWithTimestampDocumentType> =
},
updatedAt: {
type: 'number'
},
last_pulled_rev: {
type: 'string'
}
},
required: ['id', 'name', 'age', 'updatedAt']
Expand Down Expand Up @@ -872,6 +875,9 @@ export const humanWithTimestampAllIndex: RxJsonSchema<HumanWithTimestampDocument
updatedAt: {
type: 'number',
index: true
},
last_pulled_rev: {
type: 'string'
}
},
required: ['id', 'name', 'age', 'updatedAt']
Expand Down
Loading