From 8547db248273d2876c02f188fe7788d45a41f675 Mon Sep 17 00:00:00 2001 From: Vikram Saraph Date: Thu, 9 Aug 2018 10:49:04 -0700 Subject: [PATCH] Network.executeWithEvents for surfacing (lifecycle) events Summary: We want to be able to surface lifecycle events to Relay subscriptions, through the `onNext` callback. In order to do this, we must implement another function, `Network.executeWithEvents`, if we want to avoid passing events through the observable returned by `Network.execute`. Summary of changes: - `StreamPayload` is a union of `ExecutePayload` and `EventPayload`. - The `Network` object is returned by `RelayNetwork.create`, so this function has been modified so that its returned object also contains a `Network.executeWithEvents` function. - `RelayModernEnvironment` also has an `executeWithEvents`, which calls `Network.executeWithEvents`. Its `do` simply ignores events via early return. `RelayStoreTypes` has been updated to include `executeWithEvents` in `Environment`'s interface. - The `SubscribeFunction` type in `RelayNetworkTypes` now has a return type of `Observable`. In `ConvertToExecuteFunction`, there is now a `convertSubcribeWithEvents` function that takes a `SubscribeFunction` and returns a `StreamFunction`. The `convertSubcribe` function still returns an `ExecuteFunction`, though there is extra filter logic to filter out events. - Within `requestRelaySubscription`, `GraphQLSubscriptionConfig` now includes an optional `receiveEvents` field. If present, it calls `executeWithEvents`. Otherwise it defaults to `execute`, which returns an Observable with no events. Reviewed By: kassens Differential Revision: D9209901 fbshipit-source-id: 12aaf37066c4f044d881c477916cd73e9b19be4c --- packages/relay-runtime/index.js | 2 + .../network/ConvertToExecuteFunction.js | 56 ++++++++++--- .../relay-runtime/network/RelayNetwork.js | 44 +++++++++- .../network/RelayNetworkTypes.js | 29 ++++++- .../store/RelayModernEnvironment.js | 82 +++++++++++++++++++ .../relay-runtime/store/RelayStoreTypes.js | 19 ++++- .../__tests__/RelayModernEnvironment-test.js | 6 ++ .../subscription/requestRelaySubscription.js | 57 ++++++++++--- .../RelayModernMockEnvironment.js | 2 + 9 files changed, 270 insertions(+), 27 deletions(-) diff --git a/packages/relay-runtime/index.js b/packages/relay-runtime/index.js index 83ee514cc8a6e..7bdb89478e46e 100644 --- a/packages/relay-runtime/index.js +++ b/packages/relay-runtime/index.js @@ -63,9 +63,11 @@ export type { export type {MutationConfig} from './mutations/commitRelayModernMutation'; export type {RelayNetworkLog} from './network/RelayNetworkLoggerTransaction'; export type { + EventPayload, ExecutePayload, GraphQLResponse, PayloadError, + StreamPayload, UploadableMap, } from './network/RelayNetworkTypes'; export type { diff --git a/packages/relay-runtime/network/ConvertToExecuteFunction.js b/packages/relay-runtime/network/ConvertToExecuteFunction.js index 7837d373ce125..3643dd03c5787 100644 --- a/packages/relay-runtime/network/ConvertToExecuteFunction.js +++ b/packages/relay-runtime/network/ConvertToExecuteFunction.js @@ -22,9 +22,27 @@ import type { ExecutePayload, FetchFunction, GraphQLResponse, + StreamFunction, + StreamPayload, SubscribeFunction, } from './RelayNetworkTypes'; +function filterDataFromStream( + observable: RelayObservable, +): RelayObservable { + return RelayObservable.create(sink => { + return observable.subscribe({ + next(value) { + if (value.kind === 'data') { + sink.next(value); + } + }, + error: sink.error, + complete: sink.complete, + }); + }); +} + /** * Converts a FetchFunction into an ExecuteFunction for use by RelayNetwork. */ @@ -35,11 +53,14 @@ function convertFetch(fn: FetchFunction): ExecuteFunction { // a failure to fetch. To avoid handling this special case throughout the // Relay codebase, it is explicitly handled here. if (result instanceof Error) { - return RelayObservable.create(sink => sink.error(result)); + return filterDataFromStream( + RelayObservable.create(sink => sink.error(result)), + ); } - return RelayObservable.from(result).map(value => - convertToExecutePayload(request, variables, value), - ); + const withEvents = RelayObservable.from(result).map(value => { + return convertToStreamPayload(request, variables, value); + }); + return filterDataFromStream(withEvents); }; } @@ -47,10 +68,19 @@ function convertFetch(fn: FetchFunction): ExecuteFunction { * Converts a SubscribeFunction into an ExecuteFunction for use by RelayNetwork. */ function convertSubscribe(fn: SubscribeFunction): ExecuteFunction { + return function subscribe(operation, variables, cacheConfig) { + const withEvents = RelayObservable.fromLegacy(observer => + fn(operation, variables, cacheConfig, observer), + ).map(value => convertToStreamPayload(operation, variables, value)); + return filterDataFromStream(withEvents); + }; +} + +function convertSubscribeWithEvents(fn: SubscribeFunction): StreamFunction { return function subscribe(operation, variables, cacheConfig) { return RelayObservable.fromLegacy(observer => fn(operation, variables, cacheConfig, observer), - ).map(value => convertToExecutePayload(operation, variables, value)); + ).map(value => convertToStreamPayload(operation, variables, value)); }; } @@ -59,11 +89,11 @@ function convertSubscribe(fn: SubscribeFunction): ExecuteFunction { * an ExecutePayload. A GraphQLResponse may be returned directly from older or * simpler Relay Network implementations. */ -function convertToExecutePayload( +function convertToStreamPayload( request: RequestNode, variables: Variables, - value: GraphQLResponse | ExecutePayload, -): ExecutePayload { + value: GraphQLResponse | StreamPayload, +): StreamPayload { if (!value.data && !value.errors && value.response) { if (!value.operation) { warning( @@ -73,9 +103,14 @@ function convertToExecutePayload( ); return createExecutePayload(request, variables, value.response); } + // assume value is data if value.response defined, but no kind field given + return {...value, kind: 'data'}; + } else if (!value.data && !value.errors && value.kind === 'event') { return value; + } else { + // in this case we have a GraphQLResponse + return createExecutePayload(request, variables, value); } - return createExecutePayload(request, variables, value); } function createExecutePayload(request, variables, response) { @@ -84,10 +119,11 @@ function createExecutePayload(request, variables, response) { 'ConvertToExecuteFunction: Batch request must return ExecutePayload.', ); } - return {operation: request.operation, variables, response}; + return {kind: 'data', operation: request.operation, variables, response}; } module.exports = { convertFetch, convertSubscribe, + convertSubscribeWithEvents, }; diff --git a/packages/relay-runtime/network/RelayNetwork.js b/packages/relay-runtime/network/RelayNetwork.js index f65335101d673..8d60151a0d879 100644 --- a/packages/relay-runtime/network/RelayNetwork.js +++ b/packages/relay-runtime/network/RelayNetwork.js @@ -14,7 +14,11 @@ const RelayObservable = require('./RelayObservable'); const invariant = require('invariant'); -const {convertFetch, convertSubscribe} = require('./ConvertToExecuteFunction'); +const { + convertFetch, + convertSubscribe, + convertSubscribeWithEvents, +} = require('./ConvertToExecuteFunction'); import type {RequestNode} from '../util/RelayConcreteNode'; import type {CacheConfig, Variables} from '../util/RelayRuntimeTypes'; @@ -22,6 +26,7 @@ import type { FetchFunction, Network, ExecutePayload, + StreamPayload, SubscribeFunction, UploadableMap, } from './RelayNetworkTypes'; @@ -39,6 +44,9 @@ function create( const observeSubscribe = subscribeFn ? convertSubscribe(subscribeFn) : undefined; + const observeSubscribeWithEvents = subscribeFn + ? convertSubscribeWithEvents(subscribeFn) + : undefined; function execute( request: RequestNode, @@ -72,7 +80,39 @@ function create( return observeFetch(request, variables, cacheConfig, uploadables); } - return {execute}; + function executeWithEvents( + request: RequestNode, + variables: Variables, + cacheConfig: CacheConfig, + uploadables?: ?UploadableMap, + ): RelayObservable { + if (request.operationKind === 'subscription') { + invariant( + observeSubscribeWithEvents, + 'RelayNetwork: This network layer does not support Subscriptions. ' + + 'To use Subscriptions, provide a custom network layer.', + ); + + invariant( + !uploadables, + 'RelayNetwork: Cannot provide uploadables while subscribing.', + ); + return observeSubscribeWithEvents(request, variables, cacheConfig); + } + + const pollInterval = cacheConfig.poll; + if (pollInterval != null) { + invariant( + !uploadables, + 'RelayNetwork: Cannot provide uploadables while polling.', + ); + return observeFetch(request, variables, {force: true}).poll(pollInterval); + } + + return observeFetch(request, variables, cacheConfig, uploadables); + } + + return {execute, executeWithEvents}; } module.exports = {create}; diff --git a/packages/relay-runtime/network/RelayNetworkTypes.js b/packages/relay-runtime/network/RelayNetworkTypes.js index b986e7d7451c7..c452c87dde5ae 100644 --- a/packages/relay-runtime/network/RelayNetworkTypes.js +++ b/packages/relay-runtime/network/RelayNetworkTypes.js @@ -24,6 +24,7 @@ import type RelayObservable, {ObservableFromValue} from './RelayObservable'; */ export type Network = {| execute: ExecuteFunction, + executeWithEvents: StreamFunction, |}; export type PayloadData = {[key: string]: mixed}; @@ -55,6 +56,7 @@ export type GraphQLResponse = * raw GraphQL network response as well as any related client metadata. */ export type ExecutePayload = {| + kind: 'data', // The operation executed operation: ConcreteOperation, // The variables which were used during this execution. @@ -65,6 +67,20 @@ export type ExecutePayload = {| isOptimistic?: boolean, |}; +/** + * Events sent over a GraphQL stream operation (such as subscriptions). + * Only received if executeWithEvents is called instead of execute. + */ +export type EventPayload = {| + kind: 'event', + event: string, +|}; + +/** + * A stream consists of data and events. + */ +export type StreamPayload = ExecutePayload | EventPayload; + /** * A function that returns an Observable representing the response of executing * a GraphQL operation. @@ -76,6 +92,17 @@ export type ExecuteFunction = ( uploadables?: ?UploadableMap, ) => RelayObservable; +/** + * A function that returns an Observable representing the stream of data and + * events pushed by a GraphQL subscription + */ +export type StreamFunction = ( + request: RequestNode, + variables: Variables, + cacheConfig: CacheConfig, + uploadables?: ?UploadableMap, +) => RelayObservable; + /** * A function that executes a GraphQL operation with request/response semantics. * @@ -102,7 +129,7 @@ export type SubscribeFunction = ( cacheConfig: CacheConfig, observer?: LegacyObserver, ) => - | RelayObservable + | RelayObservable | RelayObservable | Disposable; diff --git a/packages/relay-runtime/store/RelayModernEnvironment.js b/packages/relay-runtime/store/RelayModernEnvironment.js index c35e08b441bcb..8c2b8cf4cd330 100644 --- a/packages/relay-runtime/store/RelayModernEnvironment.js +++ b/packages/relay-runtime/store/RelayModernEnvironment.js @@ -29,6 +29,7 @@ import type { Network, PayloadData, PayloadError, + StreamPayload, UploadableMap, } from '../network/RelayNetworkTypes'; import type RelayObservable from '../network/RelayObservable'; @@ -253,6 +254,87 @@ class RelayModernEnvironment implements Environment { }); } + /** + * Returns an Observable of StreamPayload. Similar to .execute({...}), + * except the stream can also return events, which is especially useful when + * executing a GraphQL subscription. However, events are not commited to + * the publish queue, they are simply ignored in the .do({...}) stream. + */ + executeWithEvents({ + operation, + cacheConfig, + updater, + }: { + operation: OperationSelector, + cacheConfig?: ?CacheConfig, + updater?: ?SelectorStoreUpdater, + }): RelayObservable { + let optimisticResponse; + return this._network + .executeWithEvents(operation.node, operation.variables, cacheConfig || {}) + .do({ + next: executePayload => { + if (executePayload.kind !== 'data') { + return; + } + const responsePayload = normalizePayload(executePayload); + const {source, fieldPayloads, deferrableSelections} = responsePayload; + for (const selectionKey of deferrableSelections || new Set()) { + this._deferrableSelections.add(selectionKey); + } + if (executePayload.isOptimistic) { + invariant( + optimisticResponse == null, + 'environment.execute: only support one optimistic respnose per ' + + 'execute.', + ); + optimisticResponse = { + source: source, + fieldPayloads: fieldPayloads, + }; + this._publishQueue.applyUpdate(optimisticResponse); + this._publishQueue.run(); + } else { + if (optimisticResponse) { + this._publishQueue.revertUpdate(optimisticResponse); + optimisticResponse = undefined; + } + const writeSelector = createOperationSelector( + operation.node, + executePayload.variables, + executePayload.operation, + ); + if (executePayload.operation.kind === 'DeferrableOperation') { + const fragmentKey = deferrableFragmentKey( + executePayload.variables[ + executePayload.operation.rootFieldVariable + ], + executePayload.operation.fragmentName, + getOperationVariables( + executePayload.operation, + executePayload.variables, + ), + ); + this._deferrableSelections.delete(fragmentKey); + } + this._publishQueue.commitPayload( + writeSelector, + responsePayload, + updater, + ); + this._publishQueue.run(); + } + }, + }) + .finally(() => { + if (optimisticResponse) { + this._publishQueue.revertUpdate(optimisticResponse); + optimisticResponse = undefined; + this._publishQueue.run(); + } + }); + } + /** * Returns an Observable of ExecutePayload resulting from executing the * provided Mutation operation, the result of which is then normalized and diff --git a/packages/relay-runtime/store/RelayStoreTypes.js b/packages/relay-runtime/store/RelayStoreTypes.js index 152a03b29b698..2b5e37328e4ef 100644 --- a/packages/relay-runtime/store/RelayStoreTypes.js +++ b/packages/relay-runtime/store/RelayStoreTypes.js @@ -13,6 +13,7 @@ import type { ExecutePayload, PayloadError, + StreamPayload, UploadableMap, } from '../network/RelayNetworkTypes'; import type {PayloadData} from '../network/RelayNetworkTypes'; @@ -26,7 +27,12 @@ import type { RequestNode, ConcreteOperation, } from '../util/RelayConcreteNode'; -import type {DataID, Disposable, Variables} from '../util/RelayRuntimeTypes'; +import type { + CacheConfig, + DataID, + Disposable, + Variables, +} from '../util/RelayRuntimeTypes'; import type {RecordState} from './RelayRecordState'; import type { CEnvironment, @@ -238,6 +244,17 @@ export interface Environment */ getStore(): Store; + /** + * Returns an Observable that can also possibly include event payloads. + * Contrast this with .execute({...}), which will ignore any events pushed + * to the Observable. + */ + executeWithEvents({| + operation: OperationSelector, + cacheConfig?: ?CacheConfig, + updater?: ?SelectorStoreUpdater, + |}): RelayObservable; + /** * Returns an Observable of ExecutePayload resulting from executing the * provided Mutation operation, the result of which is then normalized and diff --git a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-test.js b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-test.js index ec557fcdfb38f..f21054d9d1707 100644 --- a/packages/relay-runtime/store/__tests__/RelayModernEnvironment-test.js +++ b/packages/relay-runtime/store/__tests__/RelayModernEnvironment-test.js @@ -572,6 +572,7 @@ describe('RelayModernEnvironment', () => { expect(next.mock.calls.length).toBe(1); expect(next).toBeCalledWith({ + kind: 'data', response: payload, variables, operation: operation.node.operation, @@ -723,6 +724,7 @@ describe('RelayModernEnvironment', () => { expect(next.mock.calls.length).toBe(1); expect(next).toBeCalledWith({ + kind: 'data', response: payload, variables, operation: operation.node.operation, @@ -805,6 +807,7 @@ describe('RelayModernEnvironment', () => { }, }; dataSource.next({ + kind: 'data', operation: query.operation, variables, response: payload, @@ -855,6 +858,7 @@ describe('RelayModernEnvironment', () => { }; dataSource.next({ + kind: 'data', operation: query.operation, variables, response: optimisticResponse, @@ -863,6 +867,7 @@ describe('RelayModernEnvironment', () => { jest.runAllTimers(); dataSource.next({ + kind: 'data', operation: query.operation, variables, response: realResponse, @@ -906,6 +911,7 @@ describe('RelayModernEnvironment', () => { }, }; dataSource.next({ + kind: 'data', operation: query.operation, variables, response: payload, diff --git a/packages/relay-runtime/subscription/requestRelaySubscription.js b/packages/relay-runtime/subscription/requestRelaySubscription.js index fc77fb66902b1..7d63b4f56e389 100644 --- a/packages/relay-runtime/subscription/requestRelaySubscription.js +++ b/packages/relay-runtime/subscription/requestRelaySubscription.js @@ -27,6 +27,7 @@ export type GraphQLSubscriptionConfig = {| onError?: ?(error: Error) => void, onNext?: ?(response: ?Object) => void, updater?: ?SelectorStoreUpdater, + receiveEvents?: boolean, |}; function requestRelaySubscription( @@ -40,7 +41,14 @@ function requestRelaySubscription( 'requestRelaySubscription: Must use Subscription operation', ); } - const {configs, onCompleted, onError, onNext, variables} = config; + const { + configs, + onCompleted, + onError, + onNext, + variables, + receiveEvents, + } = config; const operation = createOperationSelector(subscription, variables); warning( @@ -57,18 +65,41 @@ function requestRelaySubscription( ) : config; - return environment - .execute({ - operation, - updater, - cacheConfig: {force: true}, - }) - .map(() => environment.lookup(operation.fragment).data) - .subscribeLegacy({ - onNext, - onError, - onCompleted, - }); + if (receiveEvents) { + return environment + .executeWithEvents({ + operation, + updater, + cacheConfig: {force: true}, + }) + .subscribeLegacy({ + onNext: payload => { + if (onNext) { + if (payload.kind === 'event') { + onNext(payload); + } else { + const data = environment.lookup(operation.fragment).data; + onNext({kind: 'data', ...data}); + } + } + }, + onError, + onCompleted, + }); + } else { + return environment + .execute({ + operation, + updater, + cacheConfig: {force: true}, + }) + .map(() => environment.lookup(operation.fragment).data) + .subscribeLegacy({ + onNext, + onError, + onCompleted, + }); + } } module.exports = requestRelaySubscription; diff --git a/packages/relay-test-utils/RelayModernMockEnvironment.js b/packages/relay-test-utils/RelayModernMockEnvironment.js index ad90f3d611b0b..3b5d1c17fa90b 100644 --- a/packages/relay-test-utils/RelayModernMockEnvironment.js +++ b/packages/relay-test-utils/RelayModernMockEnvironment.js @@ -190,6 +190,7 @@ function createMockEnvironment(options: { const nextValue = (request, payload) => { const {sink, variables} = getRequest(request); sink.next({ + kind: 'data', operation: request.operation, variables: variables, response: ensureValidPayload(payload), @@ -203,6 +204,7 @@ function createMockEnvironment(options: { const resolve = (request, payload) => { const {sink, variables} = getRequest(request); sink.next({ + kind: 'data', operation: request.operation, variables: variables, response: ensureValidPayload(payload),