-
Notifications
You must be signed in to change notification settings - Fork 1
/
Copy pathDX.Threading.pas
288 lines (252 loc) · 6.78 KB
/
DX.Threading.pas
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
unit DX.Threading;
interface
uses
System.Classes, System.SysUtils,
System.Generics.Defaults, System.Generics.Collections,
System.SyncObjs,
DX.Threading.Command,
FMX.Forms;
type
/// <summary>
/// <para>
/// TAsyncCommandQueue basically provides a queue like structure which takes anonymous methods - "commands" - (from calls
/// to SaveAsync, RefreshAsync etc.) and processes them in an internal thread in the order they were added to the
/// queue.
/// </para>
/// <para>
/// If one of the commands fails then its error handler is executed and the next command in queue will be
/// processed.
/// </para>
/// </summary>
TAsyncCommandQueue = class(TObject)
private type
TCommand = TProc;
TCommandBuffer = class(TList<TAsyncCommand>)
private
public
/// <summary>
/// Appends Acommand to the (end of the) list.
/// </summary>
procedure Run(ACommand: TAsyncCommand);
/// <summary>
/// Returns a reference to the first (head) command - and marks it as "owned"
/// </summary>
function First: TAsyncCommand;
/// <summary>
/// Returns true if no commands are in queue.
/// </summary>
function IsEmpty: Boolean;
/// <summary>
/// Removes the first (head) command from the list
/// </summary>
procedure Remove(ACommand: TAsyncCommand);
/// <summary>
/// Cancels and removes all commands
/// </summary>
procedure Cancel;
end;
TCommandThread = class(TThread)
protected
procedure Execute; override;
procedure CheckForException;
public
FCommands: TCommandBuffer;
FCommandException: TObject;
end;
private
FCommands: TCommandBuffer;
FCommandThread: TCommandThread;
public
constructor Create; virtual;
destructor Destroy; override;
/// <summary>
/// Appends a command to be executed to the command queue.
/// </summary>
/// <param name="ACommand">
/// The command to be executed
/// </param>
/// <param name="ACompletionHandler">
/// A anonymous method to be executed after the asynchronous operation completes successfully.
/// </param>
/// <param name="AErrorHandler">
/// A anonymous method to be executed after the asynchronous operation completes unsuccessfully.
/// </param>
procedure Run(AExecutionProc: TProc; ADoneProc: TProc = nil; AErrorProc: TErrorProc = nil);
procedure Cancel;
procedure WaitForDone;
end;
/// <summary>
/// GAsyncCommandQueue is provided as Singleton Pattern.
/// </summary>
function GAsyncCommandQueue: TAsyncCommandQueue;
implementation
{ TAsyncCommandQueue }
uses
{$IF Defined(MSWINDOWS)}
Winapi.Windows,
{$ENDIF}
REST.Exception;
var
GQueue: TAsyncCommandQueue = nil;
procedure DoRunLoop;
begin
if assigned(WakeMainThread) then
WakeMainThread(nil);
end;
function GAsyncCommandQueue: TAsyncCommandQueue;
begin
if not assigned(GQueue) then
GQueue := TAsyncCommandQueue.Create;
Result := GQueue;
end;
procedure TAsyncCommandQueue.Run(AExecutionProc: TProc; ADoneProc: TProc = nil; AErrorProc: TErrorProc = nil);
var
LCommand: TAsyncCommand;
begin
LCommand := TAsyncCommand.Create(AExecutionProc, ADoneProc, AErrorProc, False);
FCommands.Run(LCommand);
end;
procedure TAsyncCommandQueue.Cancel;
begin
FCommands.Cancel;
end;
constructor TAsyncCommandQueue.Create;
begin
inherited Create;
FCommandThread := TCommandThread.Create(true);
FCommands := TCommandBuffer.Create;
FCommandThread.FCommands := FCommands;
FCommandThread.Start;
end;
destructor TAsyncCommandQueue.Destroy;
begin
FreeAndNil(FCommandThread);
FreeAndNil(FCommands);
end;
procedure TAsyncCommandQueue.WaitForDone;
begin
while not FCommands.IsEmpty do
begin
if TThread.CurrentThread.ThreadID = MainThreadID then
begin
// if MainThread, then we make sure that Synchronize calls can still be executed
// and if this is a GUI application i.e. there is a MessageLoop, then we keep that one running as well
if not CheckSynchronize(10) then
Application.ProcessMessages;
end
else
begin
// if not Mainthread, then we do just some plain busy waiting
sleep(10);
end;
end;
// if the queue stopp on an exception this
FCommandThread.CheckForException;
end;
procedure TAsyncCommandQueue.TCommandThread.Execute;
var
LCommand: TAsyncCommand;
begin
while not terminated do
begin
// Loop through waiting commands to be executed
LCommand := FCommands.First;
if not assigned(LCommand) then
begin
// technically worst case: 100ms until a new command in the queue will be executed
// Todo: use TMonitor.pulse (when new command gets appended) instead of continuous sleep() - First
sleep(100)
end
else
begin
// execute the command and block
LCommand.Start;
try
LCommand.WaitFor;
FCommands.Remove(LCommand);
FreeAndNil(LCommand);
except
on e: Exception do
FCommandException := e;
end;
end;
end;
end;
{ TAsyncCommandQueue.TCommandBuffer }
procedure TAsyncCommandQueue.TCommandBuffer.Run(ACommand: TAsyncCommand);
begin
if assigned(ACommand) then
begin
TMonitor.Enter(self);
try
Add(ACommand);
finally
TMonitor.Exit(self);
end;
end;
end;
function TAsyncCommandQueue.TCommandBuffer.First: TAsyncCommand;
begin
TMonitor.Enter(self);
try
if Count > 0 then
begin
Result := Items[0];
Result.Owned := true;
end
else
Result := nil;
finally
TMonitor.Exit(self);
end;
end;
function TAsyncCommandQueue.TCommandBuffer.IsEmpty: Boolean;
begin
TMonitor.Enter(self);
try
Result := Count = 0;
finally
TMonitor.Exit(self);
end;
end;
procedure TAsyncCommandQueue.TCommandBuffer.Remove(ACommand: TAsyncCommand);
begin
if assigned(ACommand) then
begin
TMonitor.Enter(self);
try
RemoveItem(ACommand, TDirection.FromBeginning);
finally
TMonitor.Exit(self);
end;
end;
end;
procedure TAsyncCommandQueue.TCommandBuffer.Cancel;
var
LItem: TAsyncCommand;
begin
TMonitor.Enter(self);
try
while Count > 0 do
begin
LItem := Items[Count - 1];
LItem.terminate;
Delete(Count - 1);
if not LItem.Owned then
LItem.Free;
end;
finally
TMonitor.Exit(self);
end;
end;
procedure TAsyncCommandQueue.TCommandThread.CheckForException;
begin
if assigned(FCommandException) then
begin
raise FCommandException;
end;
end;
initialization
finalization
GQueue.Free;
end.