-
Notifications
You must be signed in to change notification settings - Fork 3k
/
Copy pathSequentialQueue.ts
198 lines (169 loc) · 7.36 KB
/
SequentialQueue.ts
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
import Onyx from 'react-native-onyx';
import * as ActiveClientManager from '@libs/ActiveClientManager';
import * as Request from '@libs/Request';
import * as RequestThrottle from '@libs/RequestThrottle';
import * as PersistedRequests from '@userActions/PersistedRequests';
import * as QueuedOnyxUpdates from '@userActions/QueuedOnyxUpdates';
import CONST from '@src/CONST';
import ONYXKEYS from '@src/ONYXKEYS';
import type OnyxRequest from '@src/types/onyx/Request';
import * as NetworkStore from './NetworkStore';
let resolveIsReadyPromise: ((args?: unknown[]) => void) | undefined;
let isReadyPromise = new Promise((resolve) => {
resolveIsReadyPromise = resolve;
});
// Resolve the isReadyPromise immediately so that the queue starts working as soon as the page loads
resolveIsReadyPromise?.();
let isSequentialQueueRunning = false;
let currentRequest: Promise<void> | null = null;
let isQueuePaused = false;
/**
* Puts the queue into a paused state so that no requests will be processed
*/
function pause() {
if (isQueuePaused) {
return;
}
console.debug('[SequentialQueue] Pausing the queue');
isQueuePaused = true;
}
/**
* Gets the current Onyx queued updates, apply them and clear the queue if the queue is not paused.
*/
function flushOnyxUpdatesQueue() {
// The only situation where the queue is paused is if we found a gap between the app current data state and our server's. If that happens,
// we'll trigger async calls to make the client updated again. While we do that, we don't want to insert anything in Onyx.
if (isQueuePaused) {
return;
}
QueuedOnyxUpdates.flushQueue();
}
/**
* Process any persisted requests, when online, one at a time until the queue is empty.
*
* If a request fails due to some kind of network error, such as a request being throttled or when our backend is down, then we retry it with an exponential back off process until a response
* is successfully returned. The first time a request fails we set a random, small, initial wait time. After waiting, we retry the request. If there are subsequent failures the request wait
* time is doubled creating an exponential back off in the frequency of requests hitting the server. Since the initial wait time is random and it increases exponentially, the load of
* requests to our backend is evenly distributed and it gradually decreases with time, which helps the servers catch up.
*/
function process(): Promise<void> {
// When the queue is paused, return early. This prevents any new requests from happening. The queue will be flushed again when the queue is unpaused.
if (isQueuePaused) {
return Promise.resolve();
}
const persistedRequests = PersistedRequests.getAll();
if (persistedRequests.length === 0 || NetworkStore.isOffline()) {
return Promise.resolve();
}
const requestToProcess = persistedRequests[0];
// Set the current request to a promise awaiting its processing so that getCurrentRequest can be used to take some action after the current request has processed.
currentRequest = Request.processWithMiddleware(requestToProcess, true)
.then((response) => {
// A response might indicate that the queue should be paused. This happens when a gap in onyx updates is detected between the client and the server and
// that gap needs resolved before the queue can continue.
if (response?.shouldPauseQueue) {
pause();
}
PersistedRequests.remove(requestToProcess);
RequestThrottle.clear();
return process();
})
.catch((error) => {
// On sign out we cancel any in flight requests from the user. Since that user is no longer signed in their requests should not be retried.
// Duplicate records don't need to be retried as they just mean the record already exists on the server
if (error.name === CONST.ERROR.REQUEST_CANCELLED || error.message === CONST.ERROR.DUPLICATE_RECORD) {
PersistedRequests.remove(requestToProcess);
RequestThrottle.clear();
return process();
}
return RequestThrottle.sleep()
.then(process)
.catch(() => {
Onyx.update(requestToProcess.failureData ?? []);
PersistedRequests.remove(requestToProcess);
RequestThrottle.clear();
return process();
});
});
return currentRequest;
}
function flush() {
// When the queue is paused, return early. This will keep an requests in the queue and they will get flushed again when the queue is unpaused
if (isQueuePaused) {
return;
}
if (isSequentialQueueRunning || PersistedRequests.getAll().length === 0) {
return;
}
// ONYXKEYS.PERSISTED_REQUESTS is shared across clients, thus every client/tab will have a copy
// It is very important to only process the queue from leader client otherwise requests will be duplicated.
if (!ActiveClientManager.isClientTheLeader()) {
return;
}
isSequentialQueueRunning = true;
// Reset the isReadyPromise so that the queue will be flushed as soon as the request is finished
isReadyPromise = new Promise((resolve) => {
resolveIsReadyPromise = resolve;
});
// Ensure persistedRequests are read from storage before proceeding with the queue
const connectionID = Onyx.connect({
key: ONYXKEYS.PERSISTED_REQUESTS,
callback: () => {
Onyx.disconnect(connectionID);
process().finally(() => {
isSequentialQueueRunning = false;
resolveIsReadyPromise?.();
currentRequest = null;
flushOnyxUpdatesQueue();
});
},
});
}
/**
* Unpauses the queue and flushes all the requests that were in it or were added to it while paused
*/
function unpause() {
if (!isQueuePaused) {
return;
}
const numberOfPersistedRequests = PersistedRequests.getAll().length || 0;
console.debug(`[SequentialQueue] Unpausing the queue and flushing ${numberOfPersistedRequests} requests`);
isQueuePaused = false;
flushOnyxUpdatesQueue();
flush();
}
function isRunning(): boolean {
return isSequentialQueueRunning;
}
function isPaused(): boolean {
return isQueuePaused;
}
// Flush the queue when the connection resumes
NetworkStore.onReconnection(flush);
function push(request: OnyxRequest) {
// Add request to Persisted Requests so that it can be retried if it fails
PersistedRequests.save(request);
// If we are offline we don't need to trigger the queue to empty as it will happen when we come back online
if (NetworkStore.isOffline()) {
return;
}
// If the queue is running this request will run once it has finished processing the current batch
if (isSequentialQueueRunning) {
isReadyPromise.then(flush);
return;
}
flush();
}
function getCurrentRequest(): Promise<void> {
if (currentRequest === null) {
return Promise.resolve();
}
return currentRequest;
}
/**
* Returns a promise that resolves when the sequential queue is done processing all persisted write requests.
*/
function waitForIdle(): Promise<unknown> {
return isReadyPromise;
}
export {flush, getCurrentRequest, isRunning, isPaused, push, waitForIdle, pause, unpause};