Skip to content

Commit

Permalink
Network.executeWithEvents for surfacing (lifecycle) events
Browse files Browse the repository at this point in the history
Summary:
We want to be able to surface lifecycle events to Relay subscriptions, through the `onNext` callback. In order to do this, we must implement another function, `Network.executeWithEvents`, if we want to avoid passing events through the observable returned by `Network.execute`.

Summary of changes:
- `StreamPayload` is a union of `ExecutePayload` and `EventPayload`.
- The `Network` object is returned by `RelayNetwork.create`, so this function has been modified so that its returned object also contains a `Network.executeWithEvents` function.
- `RelayModernEnvironment` also has an `executeWithEvents`, which calls `Network.executeWithEvents`. Its `do` simply ignores events via early return. `RelayStoreTypes` has been updated to include `executeWithEvents` in `Environment`'s interface.
- The `SubscribeFunction` type in `RelayNetworkTypes` now has a return type of `Observable<StreamPayload>`. In `ConvertToExecuteFunction`, there is now a `convertSubcribeWithEvents` function that takes a `SubscribeFunction` and returns a `StreamFunction`. The `convertSubcribe` function still returns an `ExecuteFunction`, though there is extra filter logic to filter out events.
- Within `requestRelaySubscription`, `GraphQLSubscriptionConfig` now includes an optional `receiveEvents` field. If present, it calls `executeWithEvents`. Otherwise it defaults to `execute`, which returns an Observable with no events.

Reviewed By: kassens

Differential Revision: D9209901

fbshipit-source-id: 12aaf37066c4f044d881c477916cd73e9b19be4c
  • Loading branch information
Vikram Saraph authored and facebook-github-bot committed Aug 9, 2018
1 parent 8079bd7 commit 8547db2
Show file tree
Hide file tree
Showing 9 changed files with 270 additions and 27 deletions.
2 changes: 2 additions & 0 deletions packages/relay-runtime/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,11 @@ export type {
export type {MutationConfig} from './mutations/commitRelayModernMutation';
export type {RelayNetworkLog} from './network/RelayNetworkLoggerTransaction';
export type {
EventPayload,
ExecutePayload,
GraphQLResponse,
PayloadError,
StreamPayload,
UploadableMap,
} from './network/RelayNetworkTypes';
export type {
Expand Down
56 changes: 46 additions & 10 deletions packages/relay-runtime/network/ConvertToExecuteFunction.js
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,27 @@ import type {
ExecutePayload,
FetchFunction,
GraphQLResponse,
StreamFunction,
StreamPayload,
SubscribeFunction,
} from './RelayNetworkTypes';

function filterDataFromStream(
observable: RelayObservable<StreamPayload>,
): RelayObservable<ExecutePayload> {
return RelayObservable.create(sink => {
return observable.subscribe({
next(value) {
if (value.kind === 'data') {
sink.next(value);
}
},
error: sink.error,
complete: sink.complete,
});
});
}

/**
* Converts a FetchFunction into an ExecuteFunction for use by RelayNetwork.
*/
Expand All @@ -35,22 +53,34 @@ function convertFetch(fn: FetchFunction): ExecuteFunction {
// a failure to fetch. To avoid handling this special case throughout the
// Relay codebase, it is explicitly handled here.
if (result instanceof Error) {
return RelayObservable.create(sink => sink.error(result));
return filterDataFromStream(
RelayObservable.create(sink => sink.error(result)),
);
}
return RelayObservable.from(result).map(value =>
convertToExecutePayload(request, variables, value),
);
const withEvents = RelayObservable.from(result).map(value => {
return convertToStreamPayload(request, variables, value);
});
return filterDataFromStream(withEvents);
};
}

/**
* Converts a SubscribeFunction into an ExecuteFunction for use by RelayNetwork.
*/
function convertSubscribe(fn: SubscribeFunction): ExecuteFunction {
return function subscribe(operation, variables, cacheConfig) {
const withEvents = RelayObservable.fromLegacy(observer =>
fn(operation, variables, cacheConfig, observer),
).map(value => convertToStreamPayload(operation, variables, value));
return filterDataFromStream(withEvents);
};
}

function convertSubscribeWithEvents(fn: SubscribeFunction): StreamFunction {
return function subscribe(operation, variables, cacheConfig) {
return RelayObservable.fromLegacy(observer =>
fn(operation, variables, cacheConfig, observer),
).map(value => convertToExecutePayload(operation, variables, value));
).map(value => convertToStreamPayload(operation, variables, value));
};
}

Expand All @@ -59,11 +89,11 @@ function convertSubscribe(fn: SubscribeFunction): ExecuteFunction {
* an ExecutePayload. A GraphQLResponse may be returned directly from older or
* simpler Relay Network implementations.
*/
function convertToExecutePayload(
function convertToStreamPayload(
request: RequestNode,
variables: Variables,
value: GraphQLResponse | ExecutePayload,
): ExecutePayload {
value: GraphQLResponse | StreamPayload,
): StreamPayload {
if (!value.data && !value.errors && value.response) {
if (!value.operation) {
warning(
Expand All @@ -73,9 +103,14 @@ function convertToExecutePayload(
);
return createExecutePayload(request, variables, value.response);
}
// assume value is data if value.response defined, but no kind field given
return {...value, kind: 'data'};
} else if (!value.data && !value.errors && value.kind === 'event') {
return value;
} else {
// in this case we have a GraphQLResponse
return createExecutePayload(request, variables, value);
}
return createExecutePayload(request, variables, value);
}

function createExecutePayload(request, variables, response) {
Expand All @@ -84,10 +119,11 @@ function createExecutePayload(request, variables, response) {
'ConvertToExecuteFunction: Batch request must return ExecutePayload.',
);
}
return {operation: request.operation, variables, response};
return {kind: 'data', operation: request.operation, variables, response};
}

module.exports = {
convertFetch,
convertSubscribe,
convertSubscribeWithEvents,
};
44 changes: 42 additions & 2 deletions packages/relay-runtime/network/RelayNetwork.js
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,19 @@ const RelayObservable = require('./RelayObservable');

const invariant = require('invariant');

const {convertFetch, convertSubscribe} = require('./ConvertToExecuteFunction');
const {
convertFetch,
convertSubscribe,
convertSubscribeWithEvents,
} = require('./ConvertToExecuteFunction');

import type {RequestNode} from '../util/RelayConcreteNode';
import type {CacheConfig, Variables} from '../util/RelayRuntimeTypes';
import type {
FetchFunction,
Network,
ExecutePayload,
StreamPayload,
SubscribeFunction,
UploadableMap,
} from './RelayNetworkTypes';
Expand All @@ -39,6 +44,9 @@ function create(
const observeSubscribe = subscribeFn
? convertSubscribe(subscribeFn)
: undefined;
const observeSubscribeWithEvents = subscribeFn
? convertSubscribeWithEvents(subscribeFn)
: undefined;

function execute(
request: RequestNode,
Expand Down Expand Up @@ -72,7 +80,39 @@ function create(
return observeFetch(request, variables, cacheConfig, uploadables);
}

return {execute};
function executeWithEvents(
request: RequestNode,
variables: Variables,
cacheConfig: CacheConfig,
uploadables?: ?UploadableMap,
): RelayObservable<StreamPayload> {
if (request.operationKind === 'subscription') {
invariant(
observeSubscribeWithEvents,
'RelayNetwork: This network layer does not support Subscriptions. ' +
'To use Subscriptions, provide a custom network layer.',
);

invariant(
!uploadables,
'RelayNetwork: Cannot provide uploadables while subscribing.',
);
return observeSubscribeWithEvents(request, variables, cacheConfig);
}

const pollInterval = cacheConfig.poll;
if (pollInterval != null) {
invariant(
!uploadables,
'RelayNetwork: Cannot provide uploadables while polling.',
);
return observeFetch(request, variables, {force: true}).poll(pollInterval);
}

return observeFetch(request, variables, cacheConfig, uploadables);
}

return {execute, executeWithEvents};
}

module.exports = {create};
29 changes: 28 additions & 1 deletion packages/relay-runtime/network/RelayNetworkTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import type RelayObservable, {ObservableFromValue} from './RelayObservable';
*/
export type Network = {|
execute: ExecuteFunction,
executeWithEvents: StreamFunction,
|};

export type PayloadData = {[key: string]: mixed};
Expand Down Expand Up @@ -55,6 +56,7 @@ export type GraphQLResponse =
* raw GraphQL network response as well as any related client metadata.
*/
export type ExecutePayload = {|
kind: 'data',
// The operation executed
operation: ConcreteOperation,
// The variables which were used during this execution.
Expand All @@ -65,6 +67,20 @@ export type ExecutePayload = {|
isOptimistic?: boolean,
|};

/**
* Events sent over a GraphQL stream operation (such as subscriptions).
* Only received if executeWithEvents is called instead of execute.
*/
export type EventPayload = {|
kind: 'event',
event: string,
|};

/**
* A stream consists of data and events.
*/
export type StreamPayload = ExecutePayload | EventPayload;

/**
* A function that returns an Observable representing the response of executing
* a GraphQL operation.
Expand All @@ -76,6 +92,17 @@ export type ExecuteFunction = (
uploadables?: ?UploadableMap,
) => RelayObservable<ExecutePayload>;

/**
* A function that returns an Observable representing the stream of data and
* events pushed by a GraphQL subscription
*/
export type StreamFunction = (
request: RequestNode,
variables: Variables,
cacheConfig: CacheConfig,
uploadables?: ?UploadableMap,
) => RelayObservable<StreamPayload>;

/**
* A function that executes a GraphQL operation with request/response semantics.
*
Expand All @@ -102,7 +129,7 @@ export type SubscribeFunction = (
cacheConfig: CacheConfig,
observer?: LegacyObserver<GraphQLResponse>,
) =>
| RelayObservable<ExecutePayload>
| RelayObservable<StreamPayload>
| RelayObservable<GraphQLResponse>
| Disposable;

Expand Down
82 changes: 82 additions & 0 deletions packages/relay-runtime/store/RelayModernEnvironment.js
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import type {
Network,
PayloadData,
PayloadError,
StreamPayload,
UploadableMap,
} from '../network/RelayNetworkTypes';
import type RelayObservable from '../network/RelayObservable';
Expand Down Expand Up @@ -253,6 +254,87 @@ class RelayModernEnvironment implements Environment {
});
}

/**
* Returns an Observable of StreamPayload. Similar to .execute({...}),
* except the stream can also return events, which is especially useful when
* executing a GraphQL subscription. However, events are not commited to
* the publish queue, they are simply ignored in the .do({...}) stream.
*/
executeWithEvents({
operation,
cacheConfig,
updater,
}: {
operation: OperationSelector,
cacheConfig?: ?CacheConfig,
updater?: ?SelectorStoreUpdater,
}): RelayObservable<StreamPayload> {
let optimisticResponse;
return this._network
.executeWithEvents(operation.node, operation.variables, cacheConfig || {})
.do({
next: executePayload => {
if (executePayload.kind !== 'data') {
return;
}
const responsePayload = normalizePayload(executePayload);
const {source, fieldPayloads, deferrableSelections} = responsePayload;
for (const selectionKey of deferrableSelections || new Set()) {
this._deferrableSelections.add(selectionKey);
}
if (executePayload.isOptimistic) {
invariant(
optimisticResponse == null,
'environment.execute: only support one optimistic respnose per ' +
'execute.',
);
optimisticResponse = {
source: source,
fieldPayloads: fieldPayloads,
};
this._publishQueue.applyUpdate(optimisticResponse);
this._publishQueue.run();
} else {
if (optimisticResponse) {
this._publishQueue.revertUpdate(optimisticResponse);
optimisticResponse = undefined;
}
const writeSelector = createOperationSelector(
operation.node,
executePayload.variables,
executePayload.operation,
);
if (executePayload.operation.kind === 'DeferrableOperation') {
const fragmentKey = deferrableFragmentKey(
executePayload.variables[
executePayload.operation.rootFieldVariable
],
executePayload.operation.fragmentName,
getOperationVariables(
executePayload.operation,
executePayload.variables,
),
);
this._deferrableSelections.delete(fragmentKey);
}
this._publishQueue.commitPayload(
writeSelector,
responsePayload,
updater,
);
this._publishQueue.run();
}
},
})
.finally(() => {
if (optimisticResponse) {
this._publishQueue.revertUpdate(optimisticResponse);
optimisticResponse = undefined;
this._publishQueue.run();
}
});
}

/**
* Returns an Observable of ExecutePayload resulting from executing the
* provided Mutation operation, the result of which is then normalized and
Expand Down
19 changes: 18 additions & 1 deletion packages/relay-runtime/store/RelayStoreTypes.js
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import type {
ExecutePayload,
PayloadError,
StreamPayload,
UploadableMap,
} from '../network/RelayNetworkTypes';
import type {PayloadData} from '../network/RelayNetworkTypes';
Expand All @@ -26,7 +27,12 @@ import type {
RequestNode,
ConcreteOperation,
} from '../util/RelayConcreteNode';
import type {DataID, Disposable, Variables} from '../util/RelayRuntimeTypes';
import type {
CacheConfig,
DataID,
Disposable,
Variables,
} from '../util/RelayRuntimeTypes';
import type {RecordState} from './RelayRecordState';
import type {
CEnvironment,
Expand Down Expand Up @@ -238,6 +244,17 @@ export interface Environment
*/
getStore(): Store;

/**
* Returns an Observable that can also possibly include event payloads.
* Contrast this with .execute({...}), which will ignore any events pushed
* to the Observable.
*/
executeWithEvents({|
operation: OperationSelector,
cacheConfig?: ?CacheConfig,
updater?: ?SelectorStoreUpdater,
|}): RelayObservable<StreamPayload>;

/**
* Returns an Observable of ExecutePayload resulting from executing the
* provided Mutation operation, the result of which is then normalized and
Expand Down
Loading

0 comments on commit 8547db2

Please sign in to comment.