-
Notifications
You must be signed in to change notification settings - Fork 4.9k
/
Copy pathAmqpClient.cs
609 lines (533 loc) · 29.9 KB
/
AmqpClient.cs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
using System;
using System.Diagnostics;
using System.Globalization;
using System.Runtime.ExceptionServices;
using System.Security.Authentication;
using System.Threading;
using System.Threading.Tasks;
using Azure.Core;
using Azure.Core.Diagnostics;
using Azure.Messaging.EventHubs.Authorization;
using Azure.Messaging.EventHubs.Consumer;
using Azure.Messaging.EventHubs.Core;
using Azure.Messaging.EventHubs.Diagnostics;
using Azure.Messaging.EventHubs.Producer;
using Microsoft.Azure.Amqp;
namespace Azure.Messaging.EventHubs.Amqp
{
/// <summary>
/// A transport client abstraction responsible for brokering operations for AMQP-based connections.
/// It is intended that the public <see cref="EventHubConnection" /> make use of an instance via containment
/// and delegate operations to it.
/// </summary>
///
/// <seealso cref="Azure.Messaging.EventHubs.Core.TransportClient" />
///
internal class AmqpClient : TransportClient
{
/// <summary>
/// The buffer to apply when considering refreshing; credentials that expire less than this duration will be refreshed.
/// </summary>
///
private static TimeSpan CredentialRefreshBuffer { get; } = TimeSpan.FromMinutes(5);
/// <summary>Indicates whether or not this instance has been closed.</summary>
private volatile bool _closed;
/// <summary>The currently active token to use for authorization with the Event Hubs service.</summary>
private AccessToken _accessToken;
/// <summary>
/// Indicates whether or not this client has been closed.
/// </summary>
///
/// <value>
/// <c>true</c> if the client is closed; otherwise, <c>false</c>.
/// </value>
///
public override bool IsClosed => _closed;
/// <summary>
/// The endpoint for the Event Hubs service to which the client is associated.
/// </summary>
///
public override Uri ServiceEndpoint { get; }
/// <summary>
/// The endpoint to used establishing a connection to the Event Hubs service to which the scope is associated.
/// </summary>
///
public Uri ConnectionEndpoint { get; }
/// <summary>
/// The name of the Event Hub to which the client is bound.
/// </summary>
///
private string EventHubName { get; }
/// <summary>
/// Gets the credential to use for authorization with the Event Hubs service.
/// </summary>
///
private EventHubTokenCredential Credential { get; }
/// <summary>
/// The converter to use for translating between AMQP messages and client library
/// types.
/// </summary>
///
private AmqpMessageConverter MessageConverter { get; }
/// <summary>
/// The AMQP connection scope responsible for managing transport constructs for this instance.
/// </summary>
///
private AmqpConnectionScope ConnectionScope { get; }
/// <summary>
/// The AMQP link intended for use with management operations.
/// </summary>
///
private FaultTolerantAmqpObject<RequestResponseAmqpLink> ManagementLink { get; }
/// <summary>
/// Initializes a new instance of the <see cref="AmqpClient"/> class.
/// </summary>
///
/// <param name="host">The fully qualified host name for the Event Hubs namespace. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub to connect the client to.</param>
/// <param name="operationTimeout">The amount of time to allow for an AMQP operation using the link to complete before attempting to cancel it.</param>
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.</param>
/// <param name="clientOptions">A set of options to apply when configuring the client.</param>
/// <param name="useTls"><c>true</c> if the client should secure the connection using TLS; otherwise, <c>false</c>.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
/// is assumed that callers are trusted and have performed deep validation.
///
/// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
/// creation of clones or otherwise protecting the parameters is assumed to be the purview of the
/// caller.
/// </remarks>
///
public AmqpClient(string host,
string eventHubName,
TimeSpan operationTimeout,
EventHubTokenCredential credential,
EventHubConnectionOptions clientOptions,
bool useTls = true) : this(host, eventHubName, useTls, operationTimeout, credential, clientOptions, null, null)
{
}
/// <summary>
/// Initializes a new instance of the <see cref="AmqpClient"/> class.
/// </summary>
///
/// <param name="host">The fully qualified host name for the Event Hubs namespace. This is likely to be similar to <c>{yournamespace}.servicebus.windows.net</c>.</param>
/// <param name="eventHubName">The name of the specific Event Hub to connect the client to.</param>
/// <param name="useTls"><c>true</c> if the client should secure the connection using TLS; otherwise, <c>false</c>.</param>
/// <param name="operationTimeout">The amount of time to allow for an AMQP operation using the link to complete before attempting to cancel it.</param>
/// <param name="credential">The Azure managed identity credential to use for authorization. Access controls may be specified by the Event Hubs namespace or the requested Event Hub, depending on Azure configuration.</param>
/// <param name="clientOptions">A set of options to apply when configuring the client.</param>
/// <param name="connectionScope">The optional scope to use for AMQP connection management. If <c>null</c>, a new scope will be created.</param>
/// <param name="messageConverter">The optional converter to use for transforming AMQP message-related types. If <c>null</c>, a new converter will be created.</param>
///
/// <remarks>
/// As an internal type, this class performs only basic sanity checks against its arguments. It
/// is assumed that callers are trusted and have performed deep validation.
///
/// Any parameters passed are assumed to be owned by this instance and safe to mutate or dispose;
/// creation of clones or otherwise protecting the parameters is assumed to be the purview of the
/// caller.
/// </remarks>
///
protected AmqpClient(string host,
string eventHubName,
bool useTls,
TimeSpan operationTimeout,
EventHubTokenCredential credential,
EventHubConnectionOptions clientOptions,
AmqpConnectionScope connectionScope,
AmqpMessageConverter messageConverter)
{
Argument.AssertNotNullOrEmpty(host, nameof(host));
Argument.AssertNotNullOrEmpty(eventHubName, nameof(eventHubName));
Argument.AssertNotNegative(operationTimeout, nameof(operationTimeout));
Argument.AssertNotNull(credential, nameof(credential));
Argument.AssertNotNull(clientOptions, nameof(clientOptions));
try
{
EventHubsEventSource.Log.EventHubClientCreateStart(host, eventHubName);
ServiceEndpoint = new UriBuilder
{
Scheme = clientOptions.TransportType.GetUriScheme(useTls),
Host = host
}.Uri;
ConnectionEndpoint = clientOptions.CustomEndpointAddress switch
{
null => ServiceEndpoint,
_ => new UriBuilder
{
Scheme = ServiceEndpoint.Scheme,
Host = clientOptions.CustomEndpointAddress.Host,
Port = clientOptions.CustomEndpointAddress.IsDefaultPort ? -1 : clientOptions.CustomEndpointAddress.Port
}.Uri
};
EventHubName = eventHubName;
Credential = credential;
MessageConverter = messageConverter ?? new AmqpMessageConverter();
ConnectionScope = connectionScope ?? new AmqpConnectionScope(
ServiceEndpoint,
ConnectionEndpoint,
eventHubName,
credential,
clientOptions.TransportType,
clientOptions.Proxy,
clientOptions.ConnectionIdleTimeout,
null,
clientOptions.SendBufferSizeInBytes,
clientOptions.ReceiveBufferSizeInBytes,
clientOptions.CertificateValidationCallback);
ManagementLink = new FaultTolerantAmqpObject<RequestResponseAmqpLink>(
linkTimeout => CreateManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None),
link =>
{
link.Session?.SafeClose();
link.SafeClose();
EventHubsEventSource.Log.FaultTolerantAmqpObjectClose(nameof(RequestResponseAmqpLink), "", EventHubName, "", "", link.TerminalException?.Message);
});
}
finally
{
EventHubsEventSource.Log.EventHubClientCreateComplete(host, eventHubName);
}
}
/// <summary>
/// Retrieves information about an Event Hub, including the number of partitions present
/// and their identifiers.
/// </summary>
///
/// <param name="retryPolicy">The retry policy to use as the basis for retrieving the information.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The set of information for the Event Hub that this client is associated with.</returns>
///
public override async Task<EventHubProperties> GetPropertiesAsync(EventHubsRetryPolicy retryPolicy,
CancellationToken cancellationToken)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));
Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
var failedAttemptCount = 0;
var retryDelay = default(TimeSpan?);
var link = default(RequestResponseAmqpLink);
try
{
var tryTimeout = retryPolicy.CalculateTryTimeout(0);
var stopWatch = ValueStopwatch.StartNew();
while (!cancellationToken.IsCancellationRequested)
{
try
{
EventHubsEventSource.Log.GetPropertiesStart(EventHubName);
// Create the request message and the management link.
var token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
using AmqpMessage request = MessageConverter.CreateEventHubPropertiesRequest(EventHubName, token);
if (!ManagementLink.TryGetOpenedObject(out link))
{
link = await ManagementLink.GetOrCreateAsync(tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
}
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
// Send the request and process the response.
using AmqpMessage response = await link.RequestAsync(request, cancellationToken).ConfigureAwait(false);
AmqpError.ThrowIfErrorResponse(response, EventHubName);
return MessageConverter.CreateEventHubPropertiesFromResponse(response);
}
catch (Exception ex)
{
Exception activeEx = ex.TranslateServiceException(EventHubName);
// Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
// Otherwise, mark the exception as active and break out of the loop.
++failedAttemptCount;
retryDelay = retryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!_closed) && (!cancellationToken.IsCancellationRequested))
{
EventHubsEventSource.Log.GetPropertiesError(EventHubName, activeEx.Message);
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
tryTimeout = retryPolicy.CalculateTryTimeout(failedAttemptCount);
}
else if (ex is AmqpException)
{
ExceptionDispatchInfo.Capture(activeEx).Throw();
}
else
{
throw;
}
}
}
// If no value has been returned nor exception thrown by this point,
// then cancellation has been requested.
throw new TaskCanceledException();
}
catch (Exception ex)
{
EventHubsEventSource.Log.GetPropertiesError(EventHubName, ex.Message);
throw;
}
finally
{
EventHubsEventSource.Log.GetPropertiesComplete(EventHubName);
}
}
/// <summary>
/// Retrieves information about a specific partition for an Event Hub, including elements that describe the available
/// events in the partition event stream.
/// </summary>
///
/// <param name="partitionId">The unique identifier of a partition associated with the Event Hub.</param>
/// <param name="retryPolicy">The retry policy to use as the basis for retrieving the information.</param>
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
/// <returns>The set of information for the requested partition under the Event Hub this client is associated with.</returns>
///
public override async Task<PartitionProperties> GetPartitionPropertiesAsync(string partitionId,
EventHubsRetryPolicy retryPolicy,
CancellationToken cancellationToken)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));
Argument.AssertNotNullOrEmpty(partitionId, nameof(partitionId));
Argument.AssertNotNull(retryPolicy, nameof(retryPolicy));
var failedAttemptCount = 0;
var retryDelay = default(TimeSpan?);
var token = default(string);
var link = default(RequestResponseAmqpLink);
try
{
var tryTimeout = retryPolicy.CalculateTryTimeout(0);
var stopWatch = ValueStopwatch.StartNew();
while (!cancellationToken.IsCancellationRequested)
{
try
{
EventHubsEventSource.Log.GetPartitionPropertiesStart(EventHubName, partitionId);
// Create the request message and the management link.
token = await AcquireAccessTokenAsync(cancellationToken).ConfigureAwait(false);
using AmqpMessage request = MessageConverter.CreatePartitionPropertiesRequest(EventHubName, partitionId, token);
if (!ManagementLink.TryGetOpenedObject(out link))
{
link = await ManagementLink.GetOrCreateAsync(tryTimeout.CalculateRemaining(stopWatch.GetElapsedTime()), cancellationToken).ConfigureAwait(false);
}
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
// Send the request and process the response.
using AmqpMessage response = await link.RequestAsync(request, cancellationToken).ConfigureAwait(false);
AmqpError.ThrowIfErrorResponse(response, EventHubName);
return MessageConverter.CreatePartitionPropertiesFromResponse(response);
}
catch (Exception ex)
{
Exception activeEx = ex.TranslateServiceException(EventHubName);
// Determine if there should be a retry for the next attempt; if so enforce the delay but do not quit the loop.
// Otherwise, mark the exception as active and break out of the loop.
++failedAttemptCount;
retryDelay = retryPolicy.CalculateRetryDelay(activeEx, failedAttemptCount);
if ((retryDelay.HasValue) && (!ConnectionScope.IsDisposed) && (!_closed) && (!cancellationToken.IsCancellationRequested))
{
EventHubsEventSource.Log.GetPartitionPropertiesError(EventHubName, partitionId, activeEx.Message);
await Task.Delay(retryDelay.Value, cancellationToken).ConfigureAwait(false);
tryTimeout = retryPolicy.CalculateTryTimeout(failedAttemptCount);
}
else if (ex is AmqpException)
{
ExceptionDispatchInfo.Capture(activeEx).Throw();
}
else
{
throw;
}
}
}
// If no value has been returned nor exception thrown by this point,
// then cancellation has been requested.
throw new TaskCanceledException();
}
catch (Exception ex)
{
EventHubsEventSource.Log.GetPartitionPropertiesError(EventHubName, partitionId, ex.Message);
throw;
}
finally
{
EventHubsEventSource.Log.GetPartitionPropertiesComplete(EventHubName, partitionId);
}
}
/// <summary>
/// Creates a producer strongly aligned with the active protocol and transport,
/// responsible for publishing <see cref="EventData" /> to the Event Hub.
/// </summary>
///
/// <param name="partitionId">The identifier of the partition to which the transport producer should be bound; if <c>null</c>, the producer is unbound.</param>
/// <param name="producerIdentifier">The identifier to associate with the consumer; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
/// <param name="requestedFeatures">The flags specifying the set of special transport features that should be opted-into.</param>
/// <param name="partitionOptions">The set of options, if any, that should be considered when initializing the producer.</param>
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
///
/// <returns>A <see cref="TransportProducer"/> configured in the requested manner.</returns>
///
public override TransportProducer CreateProducer(string partitionId,
string producerIdentifier,
TransportProducerFeatures requestedFeatures,
PartitionPublishingOptions partitionOptions,
EventHubsRetryPolicy retryPolicy)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));
return new AmqpProducer
(
EventHubName,
partitionId,
producerIdentifier,
ConnectionScope,
MessageConverter,
retryPolicy,
requestedFeatures,
partitionOptions
);
}
/// <summary>
/// Creates a consumer strongly aligned with the active protocol and transport, responsible
/// for reading <see cref="EventData" /> from a specific Event Hub partition, in the context
/// of a specific consumer group.
///
/// A consumer may be exclusive, which asserts ownership over the partition for the consumer
/// group to ensure that only one consumer from that group is reading the from the partition.
/// These exclusive consumers are sometimes referred to as "Epoch Consumers."
///
/// A consumer may also be non-exclusive, allowing multiple consumers from the same consumer
/// group to be actively reading events from the partition. These non-exclusive consumers are
/// sometimes referred to as "Non-epoch Consumers."
///
/// Designating a consumer as exclusive may be specified by setting the <paramref name="ownerLevel" />.
/// When <c>null</c>, consumers are created as non-exclusive.
/// </summary>
///
/// <param name="consumerGroup">The name of the consumer group this consumer is associated with. Events are read in the context of this group.</param>
/// <param name="partitionId">The identifier of the Event Hub partition from which events will be received.</param>
/// <param name="consumerIdentifier">The identifier to associate with the consumer; if <c>null</c> or <see cref="string.Empty" />, a random identifier will be generated.</param>
/// <param name="eventPosition">The position within the partition where the consumer should begin reading events.</param>
/// <param name="retryPolicy">The policy which governs retry behavior and try timeouts.</param>
/// <param name="trackLastEnqueuedEventProperties">Indicates whether information on the last enqueued event on the partition is sent as events are received.</param>
/// <param name="invalidateConsumerWhenPartitionStolen">Indicates whether or not the consumer should consider itself invalid when a partition is stolen.</param>
/// <param name="ownerLevel">The relative priority to associate with the link; for a non-exclusive link, this value should be <c>null</c>.</param>
/// <param name="prefetchCount">Controls the number of events received and queued locally without regard to whether an operation was requested. If <c>null</c> a default will be used.</param>
/// <param name="prefetchSizeInBytes">The cache size of the prefetch queue. When set, the link makes a best effort to ensure prefetched messages fit into the specified size.</param>
///
/// <returns>A <see cref="TransportConsumer" /> configured in the requested manner.</returns>
///
public override TransportConsumer CreateConsumer(string consumerGroup,
string partitionId,
string consumerIdentifier,
EventPosition eventPosition,
EventHubsRetryPolicy retryPolicy,
bool trackLastEnqueuedEventProperties,
bool invalidateConsumerWhenPartitionStolen,
long? ownerLevel,
uint? prefetchCount,
long? prefetchSizeInBytes)
{
Argument.AssertNotClosed(_closed, nameof(AmqpClient));
return new AmqpConsumer
(
EventHubName,
consumerGroup,
partitionId,
consumerIdentifier,
eventPosition,
trackLastEnqueuedEventProperties,
invalidateConsumerWhenPartitionStolen,
ownerLevel,
prefetchCount,
prefetchSizeInBytes,
ConnectionScope,
MessageConverter,
retryPolicy
);
}
/// <summary>
/// Closes the connection to the transport client instance.
/// </summary>
///
/// <param name="cancellationToken">An optional <see cref="CancellationToken"/> instance to signal the request to cancel the operation.</param>
///
public override async Task CloseAsync(CancellationToken cancellationToken)
{
if (_closed)
{
return;
}
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
_closed = true;
var clientId = GetHashCode().ToString(CultureInfo.InvariantCulture);
var clientType = GetType().Name;
try
{
EventHubsEventSource.Log.ClientCloseStart(clientType, EventHubName, clientId);
if (ManagementLink?.TryGetOpenedObject(out var _) == true)
{
await ManagementLink.CloseAsync(CancellationToken.None).ConfigureAwait(false);
}
ManagementLink?.Dispose();
ConnectionScope?.Dispose();
}
catch (Exception ex)
{
_closed = false;
EventHubsEventSource.Log.ClientCloseError(clientType, EventHubName, clientId, ex.Message);
throw;
}
finally
{
EventHubsEventSource.Log.ClientCloseComplete(clientType, EventHubName, clientId);
}
}
/// <summary>
/// Creates the AMQP link to be used for management operations and ensures
/// that any corresponding state has been updated based on the link configuration.
/// </summary>
///
/// <param name="operationTimeout">The timeout to apply to management operations using the link..</param>
/// <param name="linkTimeout">The timeout to apply for creating the link.</param>
/// <param name="cancellationToken">The cancellation token to consider when creating the link.</param>
///
/// <returns>The AMQP link to use for management operations.</returns>
///
private async Task<RequestResponseAmqpLink> CreateManagementLinkAsync(TimeSpan operationTimeout,
TimeSpan linkTimeout,
CancellationToken cancellationToken)
{
var link = default(RequestResponseAmqpLink);
try
{
link = await ConnectionScope.OpenManagementLinkAsync(operationTimeout, linkTimeout, CancellationToken.None).ConfigureAwait(false);
}
catch (Exception ex)
{
ExceptionDispatchInfo.Capture(ex.TranslateConnectionCloseDuringLinkCreationException(EventHubName)).Throw();
}
return link;
}
/// <summary>
/// Acquires an access token for authorization with the Event Hubs service.
/// </summary>
///
/// <returns>The token to use for service authorization.</returns>
///
private async Task<string> AcquireAccessTokenAsync(CancellationToken cancellationToken)
{
cancellationToken.ThrowIfCancellationRequested<TaskCanceledException>();
AccessToken activeToken = _accessToken;
// If there was no current token, or it is within the buffer for expiration, request a new token.
// There is a benign race condition here, where there may be multiple requests in-flight for a new token. Since
// overlapping requests should be within a small window, allow the acquired token to replace the current one without
// attempting to coordinate or ensure that the most recent is kept.
if ((string.IsNullOrEmpty(activeToken.Token)) || (activeToken.ExpiresOn <= DateTimeOffset.UtcNow.Add(CredentialRefreshBuffer)))
{
activeToken = await Credential.GetTokenUsingDefaultScopeAsync(cancellationToken).ConfigureAwait(false);
if ((string.IsNullOrEmpty(activeToken.Token)))
{
throw new AuthenticationException(Resources.CouldNotAcquireAccessToken);
}
_accessToken = activeToken;
}
return activeToken.Token;
}
}
}