Skip to content

Commit

Permalink
added possibility to pass graphql-ws options into the replication opt…
Browse files Browse the repository at this point in the history
…ions (#6598)

* added possibility to pass graphql-ws options into the replication options

* added test for the graphql replication ws options

* docs updated with the graphql replication pull wsOptions

* added explicit type for the capturedWSStates

* fixed waits inside the wsOptions test
  • Loading branch information
epodgaysky authored Nov 20, 2024
1 parent 152c55a commit 2dfff79
Show file tree
Hide file tree
Showing 5 changed files with 80 additions and 2 deletions.
7 changes: 7 additions & 0 deletions docs-src/docs/replication-graphql.md
Original file line number Diff line number Diff line change
Expand Up @@ -330,6 +330,13 @@ const replicationState = replicateGraphQL(
queryBuilder: pullQueryBuilder,
streamQueryBuilder: pullStreamQueryBuilder,
includeWsHeaders: false, // Includes headers as connection parameter to Websocket.

// Websocket options that can be passed as a parameter to initialize the subscription
// Can be applied anything from the graphql-ws ClientOptions - https://the-guild.dev/graphql/ws/docs/interfaces/client.ClientOptions
// Excepting parameters: 'url', 'shouldRetry', 'webSocketImpl', 'connectionParams' - locked for the internal usage
wsOptions: {
retryAttempts: 10,
}
},
deletedField: 'deleted'
}
Expand Down
5 changes: 4 additions & 1 deletion src/plugins/replication-graphql/graphql-websocket.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import { Client, createClient } from 'graphql-ws';
import { getFromMapOrCreate, getFromMapOrThrow } from '../../plugins/utils/index.ts';
import ws from 'isomorphic-ws';
import { RxGraphQLPullWSOptions } from '../../types';

const { WebSocket: IsomorphicWebSocket } = ws;

Expand All @@ -15,14 +16,16 @@ export const GRAPHQL_WEBSOCKET_BY_URL: Map<string, WebsocketWithRefCount> = new

export function getGraphQLWebSocket(
url: string,
headers?: { [k: string]: string; }
headers?: { [k: string]: string; },
options: RxGraphQLPullWSOptions = {},
): Client {

const has = getFromMapOrCreate(
GRAPHQL_WEBSOCKET_BY_URL,
url,
() => {
const wsClient = createClient({
...options,
url,
shouldRetry: () => true,
webSocketImpl: IsomorphicWebSocket,
Expand Down
2 changes: 1 addition & 1 deletion src/plugins/replication-graphql/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ export function replicateGraphQL<RxDocType, CheckpointType>(
graphqlReplicationState.start = () => {
if (mustUseSocket) {
const httpHeaders = pull.includeWsHeaders ? mutateableClientState.headers : undefined;
const wsClient = getGraphQLWebSocket(ensureNotFalsy(url.ws), httpHeaders);
const wsClient = getGraphQLWebSocket(ensureNotFalsy(url.ws), httpHeaders, pull.wsOptions);

wsClient.on('connected', () => {
pullStream$.next('RESYNC');
Expand Down
3 changes: 3 additions & 0 deletions src/types/plugins/replication-graphql.d.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import { ClientOptions } from 'graphql-ws';
import { RxReplicationWriteToMasterRow } from '../replication-protocol.ts';
import { ById, MaybePromise } from '../util.ts';
import {
Expand Down Expand Up @@ -27,6 +28,7 @@ export type RxGraphQLReplicationPushQueryBuilder = (
rows: RxReplicationWriteToMasterRow<any>[]
) => RxGraphQLReplicationQueryBuilderResponse;

export type RxGraphQLPullWSOptions = Omit<ClientOptions, 'url' | 'shouldRetry' | 'webSocketImpl' | 'connectionParams'>;

export type RxGraphQLReplicationPullQueryBuilder<CheckpointType> = (
latestPulledCheckpoint: CheckpointType | undefined,
Expand All @@ -41,6 +43,7 @@ export type GraphQLSyncPullOptions<RxDocType, CheckpointType> = Omit<
dataPath?: string;
responseModifier?: RxGraphQLPullResponseModifier<RxDocType, CheckpointType>;
includeWsHeaders?: boolean;
wsOptions?: RxGraphQLPullWSOptions;
};

export type RxGraphQLPullResponseModifier<RxDocType, CheckpointType> = (
Expand Down
65 changes: 65 additions & 0 deletions test/unit/replication-graphql.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1635,6 +1635,71 @@ describe('replication-graphql.test.ts', () => {
await server.close();
await c.database.destroy();
});
it('should respect pull.wsOptions', async () => {
const capturedWSStates: string[] = [];
const [c, server] = await Promise.all([
humansCollection.createHumanWithTimestamp(0),
SpawnServer.spawn()
]);

const replicationState = replicateGraphQL({
replicationIdentifier: randomCouchString(10),
collection: c,
url: server.url,
pull: {
batchSize,
queryBuilder: pullQueryBuilder,
streamQueryBuilder: pullStreamQueryBuilder,
includeWsHeaders: true,
wsOptions: {
on: {
'connected': () => {
capturedWSStates.push('connected');
},
'connecting': () => {
capturedWSStates.push('connecting');
},
'closed': () => {
capturedWSStates.push('closed');
},
'error': () => {
capturedWSStates.push('error');
}
}
},
},
headers: {
token: 'Bearer token'
},
live: true,
deletedField: 'deleted',
});

ensureReplicationHasNoErrors(replicationState);

await replicationState.awaitInitialReplication();

await waitUntil(() => {
return capturedWSStates.length === 2;
});

assert.equal(capturedWSStates.includes('connected'), true);
assert.equal(capturedWSStates.includes('connecting'), true);
assert.equal(capturedWSStates.includes('closed'), false);
assert.equal(capturedWSStates.includes('error'), false);

replicationState.cancel();

await waitUntil(() => {
return capturedWSStates.length === 3;
});

assert.equal(capturedWSStates.includes('closed'), true);
assert.equal(capturedWSStates.includes('error'), false);

server.close();
c.database.destroy();
});
it('should respect the pull.responseModifier', async () => {
const checkpointIterationModeAmount = 5;
const eventObservationModeAmount = 3;
Expand Down

0 comments on commit 2dfff79

Please sign in to comment.