Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Record details about replication action and associated document, if any, upon replication failure. #3630

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 46 additions & 0 deletions docs-src/replication-graphql.md
Original file line number Diff line number Diff line change
Expand Up @@ -349,6 +349,52 @@ replicationState.error$.subscribe(error => {
});
```

GraphQL errors are wrapped in a `RxReplicationError`, which has a `payload` property with information to help you handle the underlying error.
The payload has a `type` field with a value of `"pull"` or `"push"`, corresponding to an error in either a GraphQL pull or push replication operation, respectively.
If the error occurs doing a _push_, the `payload` also contains a `documentData` property, which corresponds to the document data supplied to the push query builder.
**Notice:** You may see errors in this observable that are not `RxReplicationError`.
Replication may fail for reasons unrelated to the GraphQL service.
E.g., your PouchDB or LokiJS database may have issues in which case a general error will be generated and passed on.

As an example, you can try to recover from errors like so:

```js
replicationState.error$.subscribe((error) => {
if (error.payload) {
if (error.payload.type === 'pull') {
console.log('error pulling from GraphQL server', error.innerErrors);
} else if (error.type === 'push') {
if (error.innerErrors && error.innerErrors.length > 0) {
const graphQLError = error.innerErrors[0];

// In this hypothetical case, there's a remote database uniqueness constraint being violated due to two
// clients pushing an object with the same property value. With the document data, you can decide how best
// to resolve the issue. In this case, the client that pushed last "loses" and we delete the object since
// the one it conflicts with will be pulled down during the next pull replication event.
// The `graphQLError` structure is dictated by your remote GraphQL service. The field names are likely
// to be different.
if (graphQLError.code === 'constraint-violation' && graphQLError.constraintName === "unique_profile_name") {
this.db.profiles
.findOne(documentData.id)
.exec()
.then((doc) => {
doc?.remove();
});
}
} else {
console.log('error pushing document to GraphQL server', documentData);
}
} else {
console.log('Unknown replication action', error.payload.type);
}
} else {
// General error occurred. E.g., issue communicating with local database.
console.log('something was wrong');
console.dir(error);
}
});
```

#### .canceled$

An `Observable` that emits `true` when the replication is canceled, `false` if not.
Expand Down
54 changes: 35 additions & 19 deletions src/plugins/replication-graphql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -58,10 +58,10 @@ import { getDocumentDataOfRxChangeEvent } from '../../rx-change-event';
import {
_handleToStorageInstance
} from '../../rx-collection-helper';
import {RxReplicationError} from '../replication';

addRxPlugin(RxDBLeaderElectionPlugin);


export class RxGraphQLReplicationState<RxDocType> {

constructor(
Expand Down Expand Up @@ -235,15 +235,19 @@ export class RxGraphQLReplicationState<RxDocType> {
result = await this.client.query(pullGraphQL.query, pullGraphQL.variables);
if (result.errors) {
if (typeof result.errors === 'string') {
throw new Error(result.errors);
throw new RxReplicationError(result.errors, { type: 'pull' });
} else {
const err: any = new Error('unknown errors occurred - see innerErrors for more details');
err.innerErrors = result.errors;
throw err;
throw new RxReplicationError('unknown errors occurred in replication pull - see innerErrors for more details', { type: 'pull' }, result.errors);
}
}
} catch (err) {
this._subjects.error.next(err);
} catch (err: any) {
let replicationError = err;

if (!(err instanceof RxReplicationError)) {
replicationError = new RxReplicationError(err.message, { type: 'pull' }, err)
}

this._subjects.error.next(replicationError);
return false;
}

Expand Down Expand Up @@ -359,29 +363,41 @@ export class RxGraphQLReplicationState<RxDocType> {

const pushObj = await this.push.queryBuilder(changeWithDoc.doc);

const result = await this.client.query(pushObj.query, pushObj.variables);

if (result.errors) {
if (typeof result.errors === 'string') {
throw new Error(result.errors);
try {
const result = await this.client.query(pushObj.query, pushObj.variables);

if (result.errors) {
if (typeof result.errors === 'string') {
throw new RxReplicationError(result.errors, {type: 'push', documentData: changeWithDoc.doc});
} else {
throw new RxReplicationError('unknown errors occurred in replication push - see innerErrors for more details', {
type: 'push',
documentData: changeWithDoc.doc
}, result.errors);
}
} else {
const err: any = new Error('unknown errors occurred - see innerErrors for more details');
err.innerErrors = result.errors;
throw err;
this._subjects.send.next(changeWithDoc.doc);
lastSuccessfullChange = changeWithDoc;
}
} else {
this._subjects.send.next(changeWithDoc.doc);
lastSuccessfullChange = changeWithDoc;
} catch (err: any) {
let replicationError = err;

if (!(err instanceof RxReplicationError)) {
replicationError = new RxReplicationError(err.message, { type: 'push', documentData: changeWithDoc.doc }, err)
}

throw replicationError
}
}
} catch (err) {
} catch (err: any) {
if (lastSuccessfullChange) {
await setLastPushSequence(
this.collection,
this.endpointHash,
lastSuccessfullChange.sequence
);
}

this._subjects.error.next(err);
return false;
}
Expand Down
22 changes: 22 additions & 0 deletions src/plugins/replication/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,28 @@ import { _handleToStorageInstance } from '../../rx-collection-helper';
import { newRxError } from '../../rx-error';
import { getDocumentDataOfRxChangeEvent } from '../../rx-change-event';

export type RxReplicationAction = 'pull' | 'push';

interface RxReplicationErrorPullPayload {
type: 'pull'
}

interface RxReplicationErrorPushPayload<RxDocType> {
type: 'push'
documentData: RxDocumentData<RxDocType>
}

export class RxReplicationError<RxDocType> extends Error {
readonly payload: RxReplicationErrorPullPayload | RxReplicationErrorPushPayload<any>
readonly innerErrors?: any;

constructor(message: string, payload: RxReplicationErrorPullPayload | RxReplicationErrorPushPayload<RxDocType>, innerErrors?: any) {
super(message);

this.payload = payload;
this.innerErrors = innerErrors;
}
}

export class RxReplicationStateBase<RxDocType> {
public readonly subs: Subscription[] = [];
Expand Down
47 changes: 47 additions & 0 deletions test/unit/replication-graphql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1983,6 +1983,53 @@ describe('replication-graphql.test.js', () => {
replicationState.cancel();
c.database.destroy();
});
it('should contain include replication action data in pull request failure', async () => {
const c = await humansCollection.createHumanWithTimestamp(0);
const replicationState = c.syncGraphQL({
url: ERROR_URL,
pull: {
queryBuilder
},
deletedFlag: 'deleted'
});

const error = await replicationState.error$.pipe(
first()
).toPromise();

assert.strictEqual(error.payload.type, 'pull');

replicationState.cancel();
c.database.destroy();
});
it('should contain include replication action data in push request failure', async () => {
const c = await humansCollection.createHumanWithTimestamp(0);
const replicationState = c.syncGraphQL({
url: ERROR_URL,
push: {
queryBuilder: pushQueryBuilder,
},
deletedFlag: 'deleted'
});

const localDoc = schemaObjects.humanWithTimestamp();
await c.insert(localDoc);

const error = await replicationState.error$.pipe(
first()
).toPromise();

const { documentData: actual } = error.payload;

assert.strictEqual(error.payload.type, 'push');
assert.strictEqual(actual.id, localDoc.id);
assert.strictEqual(actual.name, localDoc.name);
assert.strictEqual(actual.age, localDoc.age);
assert.strictEqual(actual.updatedAt, localDoc.updatedAt);

replicationState.cancel();
c.database.destroy();
});
it('should not exit .run() before the batch is inserted and its events have been emitted', async () => {
const c = await humansCollection.createHumanWithTimestamp(0);
const server = await SpawnServer.spawn(getTestData(1));
Expand Down