Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Move methods to observable (Issue #194) #362

Merged
merged 7 commits into from
Jul 11, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ Expect active development and potentially significant breaking changes in the `0

### vNEXT

- **Breaking change** Moved refetch(), startPolling(), and stopPolling() methods from QuerySubscription to ObservableQuery. This shouldn't affect anyone using `react-apollo`, but if you were calling those methods on the subscription directly, you need to call them on the query handle/observable instead. The benefit of this is that developers that want to use RxJS for their observable handling can now have access to these methods. [Issue #194] (https://github.com/apollostack/apollo-client/issues/194) and [PR #362] (https://github.com/apollostack/apollo-client/pull/362)

### v0.3.30

- Don't throw on unknown directives, instead just pass them through. This can open the door to implementing `@live`, `@defer`, and `@stream`, if coupled with some changes in the network layer. [PR #372](https://github.com/apollostack/apollo-client/pull/372)
Expand Down
99 changes: 58 additions & 41 deletions src/QueryManager.ts
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,28 @@ import {
QueryScheduler,
} from './scheduler';

import { Observable, Observer, Subscription } from './util/Observable';
import { Observable, Observer, Subscription, SubscriberFunction } from './util/Observable';

export class ObservableQuery extends Observable<GraphQLResult> {
public subscribe(observer: Observer<GraphQLResult>): QuerySubscription {
return super.subscribe(observer) as QuerySubscription;
public refetch: (variables?: any) => Promise<GraphQLResult>;
public stopPolling: () => void;
public startPolling: (p: number) => void;

constructor(options: {
subscriberFunction: SubscriberFunction<GraphQLResult>,
refetch: (variables?: any) => Promise<GraphQLResult>,
stopPolling: () => void,
startPolling: (p: number) => void
}) {
super(options.subscriberFunction);
this.refetch = options.refetch;
this.stopPolling = options.stopPolling;
this.startPolling = options.startPolling;
}

public subscribe(observer: Observer<GraphQLResult>): Subscription {
return super.subscribe(observer);
}

public result(): Promise<GraphQLResult> {
return new Promise((resolve, reject) => {
Expand All @@ -91,12 +106,6 @@ export class ObservableQuery extends Observable<GraphQLResult> {
}
}

export interface QuerySubscription extends Subscription {
refetch(variables?: any): Promise<GraphQLResult>;
stopPolling(): void;
startPolling(pollInterval: number): void;
}

export interface WatchQueryOptions {
query: Document;
variables?: { [key: string]: any };
Expand Down Expand Up @@ -137,7 +146,7 @@ export class QueryManager {
// with them in case of some destabalizing action (e.g. reset of the Apollo store).
private observableQueries: { [queryId: string]: {
observableQuery: ObservableQuery;
subscriptions: QuerySubscription[];
subscriptions: Subscription[];
} };

constructor({
Expand Down Expand Up @@ -310,36 +319,15 @@ export class QueryManager {
public watchQuery(options: WatchQueryOptions, shouldSubscribe = true): ObservableQuery {
// Call just to get errors synchronously
getQueryDefinition(options.query);
const observableQuery = new ObservableQuery((observer) => {
const queryId = this.generateQueryId();
const queryId = this.generateQueryId();

let observableQuery;

const subscriberFunction = (observer) => {
const retQuerySubscription = {
unsubscribe: () => {
this.stopQuery(queryId);
},
refetch: (variables: any): Promise<GraphQLResult> => {
// If no new variables passed, use existing variables
variables = variables || options.variables;

// Use the same options as before, but with new variables and forceFetch true
return this.fetchQuery(queryId, assign(options, {
forceFetch: true,
variables,
}) as WatchQueryOptions);
},
stopPolling: (): void => {
if (this.pollingTimers[queryId]) {
clearInterval(this.pollingTimers[queryId]);
}
},
startPolling: (pollInterval): void => {
this.pollingTimers[queryId] = setInterval(() => {
const pollingOptions = assign({}, options) as WatchQueryOptions;
// subsequent fetches from polling always reqeust new data
pollingOptions.forceFetch = true;
this.fetchQuery(queryId, pollingOptions);
}, pollInterval);
},
};

if (shouldSubscribe) {
Expand Down Expand Up @@ -387,8 +375,40 @@ export class QueryManager {
}
}
});

return retQuerySubscription;
};

const refetch = (variables?: any) => {
// If no new variables passed, use existing variables
variables = variables || options.variables;

// Use the same options as before, but with new variables and forceFetch true
return this.fetchQuery(queryId, assign(options, {
forceFetch: true,
variables,
}) as WatchQueryOptions);
};

const stopPolling = () => {
if (this.pollingTimers[queryId]) {
clearInterval(this.pollingTimers[queryId]);
}
};

const startPolling = (pollInterval) => {
this.pollingTimers[queryId] = setInterval(() => {
const pollingOptions = assign({}, options) as WatchQueryOptions;
// subsequent fetches from polling always reqeust new data
pollingOptions.forceFetch = true;
this.fetchQuery(queryId, pollingOptions);
}, pollInterval);
};

observableQuery = new ObservableQuery({
subscriberFunction,
refetch,
stopPolling,
startPolling,
});

return observableQuery;
Expand Down Expand Up @@ -466,7 +486,7 @@ export class QueryManager {
}

// Associates a query subscription with an ObservableQuery in this.observableQueries
public addQuerySubscription(queryId: string, querySubscription: QuerySubscription) {
public addQuerySubscription(queryId: string, querySubscription: Subscription) {
if (this.observableQueries.hasOwnProperty(queryId)) {
this.observableQueries[queryId].subscriptions.push(querySubscription);
} else {
Expand Down Expand Up @@ -505,10 +525,7 @@ export class QueryManager {
// the promise for it will be rejected and its results will not be written to the
// store.
Object.keys(this.observableQueries).forEach((queryId) => {
const subscriptions = this.observableQueries[queryId].subscriptions;

// we can refetch any one of the subscriptions.
subscriptions[subscriptions.length - 1].refetch();
this.observableQueries[queryId].observableQuery.refetch();
});
}

Expand Down
64 changes: 34 additions & 30 deletions src/scheduler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,6 @@
// At the moment, the QueryScheduler implements the one-polling-instance-at-a-time logic and
// adds queries to the QueryBatcher queue.

import {
GraphQLResult,
} from 'graphql';

import {
ObservableQuery,
WatchQueryOptions,
Expand Down Expand Up @@ -63,9 +59,9 @@ export class QueryScheduler {
queryId?: string): string {
if (!queryId) {
queryId = this.queryManager.generateQueryId();
// Fire an initial fetch before we start the polling query
this.fetchQuery(queryId, options);
}
// Fire an initial fetch before we start the polling query
this.fetchQuery(queryId, options);
this.queryManager.addQueryListener(queryId, listener);

this.pollingTimers[queryId] = setInterval(() => {
Expand Down Expand Up @@ -100,39 +96,47 @@ export class QueryScheduler {
if (!options.pollInterval) {
throw new Error('Tried to register a non-polling query with the scheduler.');
}
const queryId = this.queryManager.generateQueryId();

return new ObservableQuery((observer) => {
const subscriberFunction = (observer) => {
// "Fire" (i.e. add to the QueryBatcher queue)
const queryListener = this.queryManager.queryListenerForObserver(options, observer);
const queryId = this.startPollingQuery(options, queryListener);
this.startPollingQuery(options, queryListener, queryId);

return {
unsubscribe: () => {
this.stopPollingQuery(queryId);
},

refetch: (variables: any): Promise<GraphQLResult> => {
variables = variables || options.variables;
return this.fetchQuery(queryId, assign(options, {
forceFetch: true,
variables,
}) as WatchQueryOptions);
},

startPolling: (pollInterval): void => {
this.pollingTimers[queryId] = setInterval(() => {
const pollingOptions = assign({}, options) as WatchQueryOptions;
pollingOptions.forceFetch = true;
this.fetchQuery(queryId, pollingOptions).then(() => {
this.removeInFlight(queryId);
});
}, pollInterval);
},

stopPolling: (): void => {
this.stopPollingQuery(queryId);
},
};
};

const refetch = (variables: any) => {
variables = variables || options.variables;
return this.fetchQuery(queryId, assign(options, {
forceFetch: true,
variables,
}) as WatchQueryOptions);
};

const startPolling = () => {
this.pollingTimers[queryId] = setInterval(() => {
const pollingOptions = assign({}, options) as WatchQueryOptions;
pollingOptions.forceFetch = true;
this.fetchQuery(queryId, pollingOptions).then(() => {
this.removeInFlight(queryId);
});
}, options.pollInterval);
};

const stopPolling = () => {
this.stopPollingQuery(queryId);
};

return new ObservableQuery({
subscriberFunction,
refetch,
stopPolling,
startPolling,
});
}

Expand Down
1 change: 1 addition & 0 deletions src/util/Observable.ts
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ export class Observable<T> {

constructor(subscriberFunction: SubscriberFunction<T>) {
this.subscriberFunction = subscriberFunction;

}

public [$$observable]() {
Expand Down
Loading