Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Ability to use a responseModifier for GraphQL push replication #4385

38 changes: 36 additions & 2 deletions docs-src/replication-graphql.md
Original file line number Diff line number Diff line change
Expand Up @@ -361,7 +361,7 @@ const replicationState: RxGraphQLReplicationState<RxDocType> = replicateGraphQL(
With the `pull.responseModifier` you can modify the whole response from the GraphQL endpoint **before** it is processed by RxDB.
For example if your endpoint is not capable of returning a valid checkpoint, but instead only returns the plain document array, you can use the `responseModifier` to aggregate the checkpoint from the returned documents.

```js
```ts
import {

} from 'rxdb';
Expand Down Expand Up @@ -396,7 +396,42 @@ const replicationState: RxGraphQLReplicationState<RxDocType> = replicateGraphQL(
);
```

### push.responseModifier

It's also possible to modify the response of a push mutation. For example if your server returns more than the just conflicting docs:

```graphql
type PushResponse {
conflicts: [Human]
conflictMessages: [ReplicationConflictMessage]
}

type Mutation {
# Returns a PushResponse type that contains the conflicts along with other information
pushHuman(rows: [HumanInputPushRow!]): PushResponse!
}
```

```ts
import {} from "rxdb";
const replicationState: RxGraphQLReplicationState<RxDocType> = replicateGraphQL(
{
collection: myRxCollection,
url: {/* ... */},
headers: {/* ... */},
push: {
responseModifier: async function (plainResponse) {
/**
* In this example we aggregate the conflicting documents from a response object
*/
return plainResponse.conflicts;
},
},
pull: {/* ... */},
/* ... */
}
);
```

#### Helper Functions

Expand Down Expand Up @@ -442,4 +477,3 @@ See [the fetch spec](https://fetch.spec.whatwg.org/#concept-request-credentials-


**NOTICE:** To play around, check out the full example of the RxDB [GraphQL replication with server and client](https://github.com/pubkey/rxdb/tree/master/examples/graphql)

11 changes: 9 additions & 2 deletions src/plugins/replication-graphql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -161,8 +161,15 @@ export function replicateGraphQL<RxDocType, CheckpointType>(
if (result.errors) {
throw result.errors;
}
const dataPath = Object.keys(result.data)[0];
const data: any = getProperty(result.data, dataPath);
const dataPath = push.dataPath || Object.keys(result.data)[0];
let data: any = getProperty(result.data, dataPath);

if (push.responseModifier) {
data = await push.responseModifier(
data,
);
}

return data;
},
batchSize: push.batchSize,
Expand Down
9 changes: 8 additions & 1 deletion src/types/plugins/replication-graphql.d.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import { RxReplicationWriteToMasterRow } from '../replication-protocol';
import { ById, MaybePromise } from '../util';
import { ReplicationOptions, ReplicationPullHandlerResult, ReplicationPullOptions, ReplicationPushOptions } from './replication';
import { ReplicationOptions, ReplicationPullHandlerResult, ReplicationPullOptions, ReplicationPushHandlerResult, ReplicationPushOptions } from './replication';

export interface RxGraphQLReplicationQueryBuilderResponseObject {
query: string;
Expand Down Expand Up @@ -43,13 +43,20 @@ export type RxGraphQLPullResponseModifier<RxDocType, CheckpointType> = (
requestCheckpoint?: CheckpointType
) => MaybePromise<ReplicationPullHandlerResult<RxDocType, CheckpointType>>;

export type RxGraphQLPushResponseModifier<RxDocType> = (
// the exact response that was returned from the server
plainResponse: ReplicationPushHandlerResult<RxDocType> | any,
) => MaybePromise<ReplicationPushHandlerResult<RxDocType>>;

export type RxGraphQLReplicationPullStreamQueryBuilder = (headers: { [k: string]: string; }) => RxGraphQLReplicationQueryBuilderResponse;

export type GraphQLSyncPushOptions<RxDocType> = Omit<
ReplicationPushOptions<RxDocType>,
'handler'
> & {
queryBuilder: RxGraphQLReplicationPushQueryBuilder;
dataPath?: string;
responseModifier?: RxGraphQLPushResponseModifier<RxDocType>;
};

export type GraphQLServerUrl = {
Expand Down
2 changes: 2 additions & 0 deletions src/types/plugins/replication.d.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@ export type ReplicationPullHandlerResult<RxDocType, CheckpointType> = {
documents: WithDeleted<RxDocType>[];
};

export type ReplicationPushHandlerResult<RxDocType> = RxDocType[];

export type ReplicationPullHandler<RxDocType, CheckpointType> = (
lastPulledCheckpoint: CheckpointType,
batchSize: number
Expand Down
41 changes: 40 additions & 1 deletion test/unit/replication-graphql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ import {
buildSchema,
parse as parseQuery
} from 'graphql';
import { RxDocumentData } from '../../src/types';
import { ReplicationPushHandlerResult, RxDocumentData } from '../../src/types';
import { enableKeyCompression } from '../helper/schemas';

declare type WithDeleted<T> = T & { deleted: boolean; };
Expand Down Expand Up @@ -1047,6 +1047,45 @@ describe('replication-graphql.test.ts', () => {
const docsOnServer = server.getDocuments();
assert.strictEqual(docsOnServer.length, batchSize);

server.close();
c.database.destroy();
});
it('should respect the push.responseModifier', async () => {
const [c, server] = await Promise.all([
humansCollection.createHumanWithTimestamp(batchSize),
SpawnServer.spawn()
]);

let responseHaveBeenCalledTimes = 0;
const replicationState = replicateGraphQL({
collection: c,
url: server.url,
push: {
batchSize,
queryBuilder: pushQueryBuilder,
responseModifier(
originalResponse: ReplicationPushHandlerResult<HumanWithTimestampDocumentType>,
) {
responseHaveBeenCalledTimes += 1;
return originalResponse;
}
},
live: false,
retryTime: 1000,
deletedField: 'deleted'
});
ensureReplicationHasNoErrors(replicationState);


await replicationState.awaitInitialReplication();

const docsOnServer = server.getDocuments();
assert.strictEqual(docsOnServer.length, batchSize);
assert.strictEqual(
responseHaveBeenCalledTimes,
1
);

server.close();
c.database.destroy();
});
Expand Down