-
Notifications
You must be signed in to change notification settings - Fork 345
/
Copy pathActorProxy.cs
312 lines (285 loc) · 15 KB
/
ActorProxy.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
// ------------------------------------------------------------
// Copyright (c) Microsoft Corporation.
// Licensed under the MIT License.
// ------------------------------------------------------------
namespace Dapr.Actors.Client
{
using System;
using System.IO;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using Dapr.Actors.Communication;
using Dapr.Actors.Communication.Client;
/// <summary>
/// Provides the base implementation for the proxy to the remote actor objects implementing <see cref="IActor"/> interfaces.
/// The proxy object can be used for client-to-actor and actor-to-actor communication.
/// </summary>
public class ActorProxy : IActorProxy
{
/// <summary>
/// The default factory used to create an actor proxy
/// </summary>
public static IActorProxyFactory DefaultProxyFactory { get; } = new ActorProxyFactory();
private ActorRemotingClient actorRemotingClient;
private ActorNonRemotingClient actorNonRemotingClient;
/// <summary>
/// Initializes a new instance of the <see cref="ActorProxy"/> class.
/// This constructor is protected so that it can be used by generated class which derives from ActorProxy when making Remoting calls.
/// This constructor is also marked as internal so that it can be called by ActorProxyFactory when making non-remoting calls.
/// </summary>
protected internal ActorProxy()
{
}
/// <inheritdoc/>
public ActorId ActorId { get; private set; }
/// <inheritdoc/>
public string ActorType { get; private set; }
internal IActorMessageBodyFactory ActorMessageBodyFactory { get; set; }
internal JsonSerializerOptions JsonSerializerOptions { get; set; }
internal string DaprApiToken;
/// <summary>
/// Creates a proxy to the actor object that implements an actor interface.
/// </summary>
/// <typeparam name="TActorInterface">
/// The actor interface implemented by the remote actor object.
/// The returned proxy object will implement this interface.
/// </typeparam>
/// <param name="actorId">The actor ID of the proxy actor object. Methods called on this proxy will result in requests
/// being sent to the actor with this ID.</param>
/// <param name="actorType">Type of actor implementation.</param>
/// <param name="options">The optional <see cref="ActorProxyOptions" /> to use when creating the actor proxy.</param>
/// <returns>Proxy to the actor object.</returns>
public static TActorInterface Create<TActorInterface>(ActorId actorId, string actorType, ActorProxyOptions options = null)
where TActorInterface : IActor
{
return DefaultProxyFactory.CreateActorProxy<TActorInterface>(actorId, actorType, options);
}
/// <summary>
/// Creates a proxy to the actor object that implements an actor interface.
/// </summary>
/// <param name="actorId">The actor ID of the proxy actor object. Methods called on this proxy will result in requests
/// being sent to the actor with this ID.</param>
/// <param name="actorInterfaceType">
/// The actor interface type implemented by the remote actor object.
/// The returned proxy object will implement this interface.
/// </param>
/// <param name="actorType">Type of actor implementation.</param>
/// <param name="options">The optional <see cref="ActorProxyOptions" /> to use when creating the actor proxy.</param>
/// <returns>Proxy to the actor object.</returns>
public static object Create(ActorId actorId, Type actorInterfaceType, string actorType, ActorProxyOptions options = null)
{
if (!typeof(IActor).IsAssignableFrom(actorInterfaceType))
{
throw new ArgumentException("The interface must implement IActor.", nameof(actorInterfaceType));
}
return DefaultProxyFactory.CreateActorProxy(actorId, actorInterfaceType, actorType, options);
}
/// <summary>
/// Creates an Actor Proxy for making calls without Remoting.
/// </summary>
/// <param name="actorId">Actor Id.</param>
/// <param name="actorType">Type of actor.</param>
/// <param name="options">The optional <see cref="ActorProxyOptions" /> to use when creating the actor proxy.</param>
/// <returns>Actor proxy to interact with remote actor object.</returns>
public static ActorProxy Create(ActorId actorId, string actorType, ActorProxyOptions options = null)
{
return DefaultProxyFactory.Create(actorId, actorType, options);
}
/// <summary>
/// Invokes the specified method for the actor with argument. The argument will be serialized as JSON.
/// </summary>
/// <typeparam name="TRequest">The data type of the object that will be serialized.</typeparam>
/// <typeparam name="TResponse">Return type of method.</typeparam>
/// <param name="method">Actor method name.</param>
/// <param name="data">Object argument for actor method.</param>
/// <param name="cancellationToken">Cancellation Token.</param>
/// <returns>Response form server.</returns>
public async Task<TResponse> InvokeMethodAsync<TRequest, TResponse>(string method, TRequest data, CancellationToken cancellationToken = default)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync<TRequest>(stream, data, JsonSerializerOptions);
await stream.FlushAsync();
var jsonPayload = Encoding.UTF8.GetString(stream.ToArray());
var response = await this.actorNonRemotingClient.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, jsonPayload, cancellationToken);
return await JsonSerializer.DeserializeAsync<TResponse>(response, JsonSerializerOptions);
}
/// <summary>
/// Invokes the specified method for the actor with argument. The argument will be serialized as JSON.
/// </summary>
/// <typeparam name="TRequest">The data type of the object that will be serialized.</typeparam>
/// <param name="method">Actor method name.</param>
/// <param name="data">Object argument for actor method.</param>
/// <param name="cancellationToken">Cancellation Token.</param>
/// <returns>Response form server.</returns>
public async Task InvokeMethodAsync<TRequest>(string method, TRequest data, CancellationToken cancellationToken = default)
{
using var stream = new MemoryStream();
await JsonSerializer.SerializeAsync<TRequest>(stream, data, JsonSerializerOptions);
await stream.FlushAsync();
var jsonPayload = Encoding.UTF8.GetString(stream.ToArray());
await this.actorNonRemotingClient.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, jsonPayload, cancellationToken);
}
/// <summary>
/// Invokes the specified method for the actor with argument.
/// </summary>
/// <typeparam name="TResponse">Return type of method.</typeparam>
/// <param name="method">Actor method name.</param>
/// <param name="cancellationToken">Cancellation Token.</param>
/// <returns>Response form server.</returns>
public async Task<TResponse> InvokeMethodAsync<TResponse>(string method, CancellationToken cancellationToken = default)
{
var response = await this.actorNonRemotingClient.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, null, cancellationToken);
return await JsonSerializer.DeserializeAsync<TResponse>(response, JsonSerializerOptions);
}
/// <summary>
/// Invokes the specified method for the actor with argument.
/// </summary>
/// <param name="method">Actor method name.</param>
/// <param name="cancellationToken">Cancellation Token.</param>
/// <returns>Response form server.</returns>
public Task InvokeMethodAsync(string method, CancellationToken cancellationToken = default)
{
return this.actorNonRemotingClient.InvokeActorMethodWithoutRemotingAsync(this.ActorType, this.ActorId.ToString(), method, null, cancellationToken);
}
/// <summary>
/// Initialize when ActorProxy is created for Remoting.
/// </summary>
internal void Initialize(
ActorRemotingClient client,
ActorId actorId,
string actorType,
ActorProxyOptions options)
{
this.actorRemotingClient = client;
this.ActorId = actorId;
this.ActorType = actorType;
this.ActorMessageBodyFactory = client.GetRemotingMessageBodyFactory();
this.JsonSerializerOptions = options?.JsonSerializerOptions ?? new JsonSerializerOptions(JsonSerializerDefaults.Web);
this.DaprApiToken = options?.DaprApiToken;
}
/// <summary>
/// Initialize when ActorProxy is created for non-Remoting calls.
/// </summary>
internal void Initialize(
ActorNonRemotingClient client,
ActorId actorId,
string actorType,
ActorProxyOptions options)
{
this.actorNonRemotingClient = client;
this.ActorId = actorId;
this.ActorType = actorType;
this.JsonSerializerOptions = options?.JsonSerializerOptions ?? this.JsonSerializerOptions;
}
/// <summary>
/// Invokes the specified method for the actor with provided request.
/// </summary>
/// <param name="interfaceId">Interface ID.</param>
/// <param name="methodId">Method ID.</param>
/// <param name="methodName">Method Name.</param>
/// <param name="requestMsgBodyValue">Request Message Body Value.</param>
/// <param name="cancellationToken">Cancellation Token.</param>
/// <returns>A <see cref="Task{TResult}"/> representing the result of the asynchronous operation.</returns>
protected async Task<IActorResponseMessageBody> InvokeMethodAsync(
int interfaceId,
int methodId,
string methodName,
IActorRequestMessageBody requestMsgBodyValue,
CancellationToken cancellationToken)
{
var headers = new ActorRequestMessageHeader
{
ActorId = this.ActorId,
ActorType = this.ActorType,
InterfaceId = interfaceId,
MethodId = methodId,
CallContext = Actors.Helper.GetCallContext(),
MethodName = methodName,
};
var responseMsg = await this.actorRemotingClient.InvokeAsync(
new ActorRequestMessage(
headers,
requestMsgBodyValue),
cancellationToken);
return responseMsg?.GetBody();
}
/// <summary>
/// Creates the Actor request message Body.
/// </summary>
/// <param name="interfaceName">Full Name of the service interface for which this call is invoked.</param>
/// <param name="methodName">Method Name of the service interface for which this call is invoked.</param>
/// <param name="parameterCount">Number of Parameters in the service interface Method.</param>
/// <param name="wrappedRequest">Wrapped Request Object.</param>
/// <returns>A request message body.</returns>
protected IActorRequestMessageBody CreateRequestMessageBody(
string interfaceName,
string methodName,
int parameterCount,
object wrappedRequest)
{
return this.ActorMessageBodyFactory.CreateRequestMessageBody(interfaceName, methodName, parameterCount, wrappedRequest);
}
/// <summary>
/// This method is used by the generated proxy type and should be used directly. This method converts the Task with object
/// return value to a Task without the return value for the void method invocation.
/// </summary>
/// <param name="task">A task returned from the method that contains null return value.</param>
/// <returns>A task that represents the asynchronous operation for remote method call without the return value.</returns>
protected Task ContinueWith(Task<object> task)
{
return task;
}
/// <summary>
/// This method is used by the generated proxy type and should be used directly. This method converts the Task with object
/// return value to a Task without the return value for the void method invocation.
/// </summary>
/// <param name="interfaceId">Interface Id for the actor interface.</param>
/// <param name="methodId">Method Id for the actor method.</param>
/// <param name="responseBody">Response body.</param>
/// <returns>Return value of method call as <see cref="object"/>.</returns>
protected virtual object GetReturnValue(int interfaceId, int methodId, object responseBody)
{
return Task.CompletedTask;
}
/// <summary>
/// Called by the generated proxy class to get the result from the response body.
/// </summary>
/// <typeparam name="TRetval"><see cref="System.Type"/> of the remote method return value.</typeparam>
/// <param name="interfaceId">InterfaceId of the remoting interface.</param>
/// <param name="methodId">MethodId of the remoting Method.</param>
/// <param name="task">A task that represents the asynchronous operation for remote method call.</param>
/// <returns>A task that represents the asynchronous operation for remote method call.
/// The value of the TRetval contains the remote method return value. </returns>
protected async Task<TRetval> ContinueWithResult<TRetval>(
int interfaceId,
int methodId,
Task<IActorResponseMessageBody> task)
{
var responseBody = await task;
if (responseBody is WrappedMessage wrappedMessage)
{
var obj = this.GetReturnValue(
interfaceId,
methodId,
wrappedMessage.Value);
return (TRetval)obj;
}
return (TRetval)responseBody.Get(typeof(TRetval));
}
/// <summary>
/// This check if we are wrapping actor message or not.
/// </summary>
/// <param name="requestMessageBody">Actor Request Message Body.</param>
/// <returns>true or false. </returns>
protected bool CheckIfItsWrappedRequest(IActorRequestMessageBody requestMessageBody)
{
if (requestMessageBody is WrappedMessage)
{
return true;
}
return false;
}
}
}