From a81834b3bc6d68e6784781d7a330ba6e170589ac Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Fri, 4 Dec 2020 09:16:17 -0800 Subject: [PATCH 1/8] Add legacy checkpoint reading support to EventHubs --- .../BlobsCheckpointStoreTests.cs | 147 +++++++++++++++++- .../BlobsCheckpointStore.cs | 141 ++++++++++++++++- 2 files changed, 277 insertions(+), 11 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index c98d6c523242..1f35a556e418 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.IO; using System.Linq; +using System.Text; using System.Threading; using System.Threading.Tasks; using Azure.Messaging.EventHubs.Consumer; @@ -433,6 +434,135 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe mockLogger.Verify(log => log.InvalidCheckpointFound(partitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup)); } + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLegacyCheckpoint() + { + var blobList = new List{ + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.BlobContent = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"Offset\":\"13\"," + + "\"SequenceNumber\":960180}"); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(EventPosition.FromOffset(13, false))); + Assert.That(checkpoints.Single().PartitionId, Is.EqualTo("0")); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOffsetIsPresentInLegacyCheckpoint() + { + var blobList = new List{ + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.BlobContent = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"SequenceNumber\":960180}"); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(EventPosition.FromSequenceNumber(960180, false))); + Assert.That(checkpoints.Single().PartitionId, Is.EqualTo("0")); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumberLegacyCheckpoint() + { + var blobList = new List{ + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.BlobContent = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386}"); + + + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + + target.Logger = mockLogger.Object; + + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoints.Any(), Is.False, "No valid checkpoints should exist."); + + mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [TestCase("")] + [TestCase("{\"PartitionId\":\"0\",\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\",")] + [TestCase("\0\0\0")] + public async Task ListCheckpointsConsidersDataInvalidWithLegacyCheckpointBlobContainingInvalidJson(string json) + { + var blobList = new List{ + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.BlobContent = Encoding.UTF8.GetBytes(json); + + + var mockLogger = new Mock(); + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + + target.Logger = mockLogger.Object; + + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Is.Not.Null, "A set of checkpoints should have been returned."); + Assert.That(checkpoints.Any(), Is.False, "No valid checkpoints should exist."); + + mockLogger.Verify(log => log.InvalidCheckpointFound("0", FullyQualifiedNamespace, EventHubName, ConsumerGroup)); + } + /// /// Verifies basic functionality of ListCheckpointsAsync and ensures the appropriate events are emitted when errors occur. /// @@ -1438,6 +1568,7 @@ private class MockBlobContainerClient : BlobContainerClient public override string Name { get; } internal IEnumerable Blobs; internal BlobInfo BlobInfo; + internal byte[] BlobContent; internal Exception BlobClientUploadBlobException; internal Exception BlobClientSetMetadataException; internal Exception GetBlobsAsyncException; @@ -1466,12 +1597,12 @@ public override AsyncPageable GetBlobsAsync(BlobTraits traits = BlobTr throw GetBlobsAsyncException; } - return new MockAsyncPageable(Blobs); + return new MockAsyncPageable(Blobs.Where(b => prefix == null || b.Name.StartsWith(prefix, StringComparison.Ordinal))); } public override BlobClient GetBlobClient(string blobName) { - return new MockBlobClient(blobName, BlobInfo, BlobClientUploadBlobException, BlobClientSetMetadataException, BlobClientUploadAsyncCallback, BlobClientSetMetadataAsyncCallback); + return new MockBlobClient(blobName, BlobInfo, BlobClientUploadBlobException, BlobClientSetMetadataException, BlobClientUploadAsyncCallback, BlobClientSetMetadataAsyncCallback, BlobContent); } } @@ -1481,6 +1612,8 @@ private class MockBlobClient : BlobClient internal BlobInfo BlobInfo; internal Exception BlobClientUploadBlobException; internal Exception BlobClientSetMetadataException; + private byte[] Content; + private Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> UploadAsyncCallback; private Action, BlobRequestConditions, CancellationToken> SetMetadataAsyncCallback; @@ -1489,7 +1622,8 @@ public MockBlobClient(string blobName, Exception blobClientUploadBlobException = null, Exception blobClientSetMetadataException = null, Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> uploadAsyncCallback = null, - Action, BlobRequestConditions, CancellationToken> setMetadataAsyncCallback = null) + Action, BlobRequestConditions, CancellationToken> setMetadataAsyncCallback = null, + byte[] content = null) { BlobClientUploadBlobException = blobClientUploadBlobException; BlobClientSetMetadataException = blobClientSetMetadataException; @@ -1497,6 +1631,7 @@ public MockBlobClient(string blobName, SetMetadataAsyncCallback = setMetadataAsyncCallback; Name = blobName; BlobInfo = blobInfo; + Content = content; } public override Task> SetMetadataAsync(IDictionary metadata, BlobRequestConditions conditions = null, CancellationToken cancellationToken = default(CancellationToken)) @@ -1540,6 +1675,12 @@ public override Task> UploadAsync(Stream content, Blob BlobsModelFactory.BlobContentInfo(new ETag("etag"), DateTime.UtcNow, new byte[] { }, string.Empty, 0L), Mock.Of())); } + + public override async Task DownloadToAsync(Stream destination, CancellationToken cancellationToken) + { + await destination.WriteAsync(Content, 0, Content.Length, cancellationToken); + return Mock.Of(); + } } private class MockAsyncPageable : AsyncPageable diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 2e73deaa5f37..4d604580f6f5 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Globalization; using System.IO; +using System.Text.Json; using System.Text.RegularExpressions; using System.Threading; using System.Threading.Tasks; @@ -14,6 +15,7 @@ using Azure.Messaging.EventHubs.Primitives; using Azure.Storage.Blobs; using Azure.Storage.Blobs.Models; +using Azure.Storage.Blobs.Specialized; namespace Azure.Messaging.EventHubs.Processor { @@ -41,6 +43,13 @@ internal partial class BlobsCheckpointStore : StorageManager /// private const string CheckpointPrefix = "{0}/{1}/{2}/checkpoint/"; + /// + /// Specifies a string that filters the results to return only legacy checkpoint blobs whose name begins + /// with the specified prefix. + /// + /// + private const string LegacyCheckpointPrefix = "{0}/{1}/{2}/"; + /// /// Specifies a string that filters the results to return only ownership blobs whose name begins /// with the specified prefix. @@ -61,21 +70,30 @@ internal partial class BlobsCheckpointStore : StorageManager /// private EventHubsRetryPolicy RetryPolicy { get; } + /// + /// Indicates whether to read legacy checkpoints when no current version checkpoints are available. + /// + /// + private bool ReadLegacyCheckpoints { get; } + /// /// Initializes a new instance of the class. /// /// /// The client used to interact with the Azure Blob Storage service. /// The retry policy to use as the basis for interacting with the Storage Blobs service. + /// Indicates whether to read legacy checkpoints when no current version checkpoints are available. /// public BlobsCheckpointStore(BlobContainerClient blobContainerClient, - EventHubsRetryPolicy retryPolicy) + EventHubsRetryPolicy retryPolicy, + bool readLegacyCheckpoints = false) { Argument.AssertNotNull(blobContainerClient, nameof(blobContainerClient)); Argument.AssertNotNull(retryPolicy, nameof(retryPolicy)); ContainerClient = blobContainerClient; RetryPolicy = retryPolicy; + ReadLegacyCheckpoints = readLegacyCheckpoints; BlobsCheckpointStoreCreated(nameof(BlobsCheckpointStore), blobContainerClient.AccountName, blobContainerClient.Name); } @@ -275,11 +293,9 @@ public override async Task> ListCheckpoint cancellationToken.ThrowIfCancellationRequested(); ListCheckpointsStart(fullyQualifiedNamespace, eventHubName, consumerGroup); - var prefix = string.Format(CultureInfo.InvariantCulture, CheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant()); - var checkpointCount = 0; - - async Task> listCheckpointsAsync(CancellationToken listCheckpointsToken) + async Task> listCheckpointsAsync(CancellationToken listCheckpointsToken) { + var prefix = string.Format(CultureInfo.InvariantCulture, CheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant()); var checkpoints = new List(); await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(traits: BlobTraits.Metadata, prefix: prefix, cancellationToken: listCheckpointsToken).ConfigureAwait(false)) @@ -322,13 +338,68 @@ async Task> listCheckpointsAsync(Cancellat } } - checkpointCount = checkpoints.Count; return checkpoints; }; + async Task> listLegacyCheckpointsAsync(CancellationToken listCheckpointsToken) + { + var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant()); + var checkpoints = new List(); + + await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(prefix: legacyPrefix, cancellationToken: listCheckpointsToken).ConfigureAwait(false)) + { + var partitionId = blob.Name.Substring(legacyPrefix.Length); + var startingPosition = default(EventPosition?); + + BlobBaseClient blobClient = ContainerClient.GetBlobClient(blob.Name); + using var memoryStream = new MemoryStream(); + await blobClient.DownloadToAsync(memoryStream, listCheckpointsToken).ConfigureAwait(false); + + TryReadLegacyCheckpoint( + memoryStream.GetBuffer().AsSpan(0, (int) memoryStream.Length), + out long? offset, + out long? sequenceNumber); + + if (offset.HasValue) + { + startingPosition = EventPosition.FromOffset(offset.Value, false); + } + else if (sequenceNumber.HasValue) + { + startingPosition = EventPosition.FromSequenceNumber(sequenceNumber.Value, false); + } + + if (startingPosition.HasValue) + { + checkpoints.Add(new BlobStorageCheckpoint + { + FullyQualifiedNamespace = fullyQualifiedNamespace, + EventHubName = eventHubName, + ConsumerGroup = consumerGroup, + PartitionId = partitionId, + StartingPosition = startingPosition.Value, + Offset = offset, + SequenceNumber = sequenceNumber + }); + } + else + { + InvalidCheckpointFound(partitionId, fullyQualifiedNamespace, eventHubName, consumerGroup); + } + } + + return checkpoints; + }; + + IList checkpoints = Array.Empty(); try { - return await ApplyRetryPolicy(listCheckpointsAsync, cancellationToken).ConfigureAwait(false); + checkpoints = await ApplyRetryPolicy(listCheckpointsAsync, cancellationToken).ConfigureAwait(false); + if (checkpoints.Count == 0 && ReadLegacyCheckpoints) + { + checkpoints = await ApplyRetryPolicy(listLegacyCheckpointsAsync, cancellationToken).ConfigureAwait(false); + } + return checkpoints; } catch (RequestFailedException ex) when (ex.ErrorCode == BlobErrorCode.ContainerNotFound) { @@ -342,7 +413,7 @@ async Task> listCheckpointsAsync(Cancellat } finally { - ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpointCount); + ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpoints.Count); } } @@ -493,6 +564,60 @@ async Task wrapper(CancellationToken token) return result; } + /// + /// Attempts to read a legacy checkpoint JSON format + /// {"PartitionId":"0","Owner":"681d365b-de1b-4288-9733-76294e17daf0","Token":"2d0c4276-827d-4ca4-a345-729caeca3b82","Epoch":386,"Offset":"8591964920","SequenceNumber":960180} + /// + /// true if either offset or sequence number was extracted from the checkpoint + private static bool TryReadLegacyCheckpoint(Span data, out long? offset, out long? sequenceNumber) + { + offset = null; + sequenceNumber = null; + + var jsonReader = new Utf8JsonReader(data); + try + { + if (!jsonReader.Read() || jsonReader.TokenType != JsonTokenType.StartObject) return false; + + while (jsonReader.Read() && jsonReader.TokenType == JsonTokenType.PropertyName) + { + switch (jsonReader.GetString()) + { + case "Offset": + jsonReader.Read(); + string offsetString = jsonReader.GetString(); + + if (offsetString == null || !long.TryParse(offsetString, out long offsetValue)) + { + return false; + } + + offset = offsetValue; + break; + case "SequenceNumber": + jsonReader.Read(); + + if (!jsonReader.TryGetInt64(out long sequenceNumberValue)) + { + return false; + } + + sequenceNumber = sequenceNumberValue; + break; + default: + jsonReader.Skip(); + break; + } + } + } + catch (JsonException) + { + return false; + } + + return offset != null || sequenceNumber != null; + } + /// /// Indicates that an attempt to retrieve a list of ownership has completed. /// From 7c14f4c81a2ba7d781e5b802596aad19b096c053 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Fri, 4 Dec 2020 09:24:25 -0800 Subject: [PATCH 2/8] cleanup parsing a bit --- .../BlobsCheckpointStoreTests.cs | 2 +- .../BlobsCheckpointStore.cs | 27 ++++++++----------- 2 files changed, 12 insertions(+), 17 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index 1f35a556e418..08d124c93edf 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -1612,8 +1612,8 @@ private class MockBlobClient : BlobClient internal BlobInfo BlobInfo; internal Exception BlobClientUploadBlobException; internal Exception BlobClientSetMetadataException; - private byte[] Content; + private byte[] Content; private Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> UploadAsyncCallback; private Action, BlobRequestConditions, CancellationToken> SetMetadataAsyncCallback; diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 4d604580f6f5..2c01054e484f 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -565,11 +565,10 @@ async Task wrapper(CancellationToken token) } /// - /// Attempts to read a legacy checkpoint JSON format + /// Attempts to read a legacy checkpoint JSON format and extract an offset and a sequence number /// {"PartitionId":"0","Owner":"681d365b-de1b-4288-9733-76294e17daf0","Token":"2d0c4276-827d-4ca4-a345-729caeca3b82","Epoch":386,"Offset":"8591964920","SequenceNumber":960180} /// - /// true if either offset or sequence number was extracted from the checkpoint - private static bool TryReadLegacyCheckpoint(Span data, out long? offset, out long? sequenceNumber) + private static void TryReadLegacyCheckpoint(Span data, out long? offset, out long? sequenceNumber) { offset = null; sequenceNumber = null; @@ -577,29 +576,27 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o var jsonReader = new Utf8JsonReader(data); try { - if (!jsonReader.Read() || jsonReader.TokenType != JsonTokenType.StartObject) return false; + if (!jsonReader.Read() || jsonReader.TokenType != JsonTokenType.StartObject) return; while (jsonReader.Read() && jsonReader.TokenType == JsonTokenType.PropertyName) { switch (jsonReader.GetString()) { case "Offset": - jsonReader.Read(); - string offsetString = jsonReader.GetString(); - - if (offsetString == null || !long.TryParse(offsetString, out long offsetValue)) + if (!jsonReader.Read() || + jsonReader.GetString() is not string offsetString || + !long.TryParse(offsetString, out long offsetValue)) { - return false; + return; } offset = offsetValue; break; case "SequenceNumber": - jsonReader.Read(); - - if (!jsonReader.TryGetInt64(out long sequenceNumberValue)) + if (!jsonReader.Read() || + !jsonReader.TryGetInt64(out long sequenceNumberValue)) { - return false; + return; } sequenceNumber = sequenceNumberValue; @@ -612,10 +609,8 @@ private static bool TryReadLegacyCheckpoint(Span data, out long? offset, o } catch (JsonException) { - return false; + // ignore } - - return offset != null || sequenceNumber != null; } /// From b35090f283ccc5ded5782eaf543bbc462e5e95eb Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 7 Dec 2020 10:56:39 -0800 Subject: [PATCH 3/8] Fancy mock blob client --- .../BlobsCheckpointStoreTests.cs | 385 ++++++++++-------- .../BlobsCheckpointStore.cs | 44 +- 2 files changed, 253 insertions(+), 176 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index 08d124c93edf..90ab292ed104 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -144,7 +144,8 @@ public async Task ClaimOwnershipLogsStartAndComplete() } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient(), + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -178,7 +179,7 @@ public void ClaimOwnershipLogsErrors() var expectedException = new DllNotFoundException("BOOM!"); var mockLog = new Mock(); - var mockContainerClient = new MockBlobContainerClient() { BlobClientUploadBlobException = expectedException }; + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", client => client.UploadBlobException = expectedException); var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); target.Logger = mockLog.Object; @@ -207,7 +208,8 @@ public async Task ClaimOwnershipForNewPartitionLogsOwnershipClaimed() } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient(), + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -241,7 +243,8 @@ public async Task ClaimOwnershipForExistingPartitionLogsOwnershipClaimed() } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient { BlobInfo = blobInfo }, + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", client => client.BlobInfo = blobInfo); + var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -275,7 +278,8 @@ public async Task ClaimOwnershipForExistingPartitionWithWrongEtagLogsOwnershipNo } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient { BlobInfo = blobInfo }, + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", client => client.BlobInfo = blobInfo); + var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -307,8 +311,10 @@ public void ClaimOwnershipForMissingPartitionThrowsAndLogsOwnershipNotClaimable( } }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient(), + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/ownership/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); + var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -449,13 +455,16 @@ public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLeg }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; - containerClient.BlobContent = Encoding.UTF8.GetBytes("{" + - "\"PartitionId\":\"0\"," + - "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + - "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + - "\"Epoch\":386," + - "\"Offset\":\"13\"," + - "\"SequenceNumber\":960180}"); + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"Offset\":\"13\"," + + "\"SequenceNumber\":960180}"); + }); var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); @@ -480,12 +489,15 @@ public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOf }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; - containerClient.BlobContent = Encoding.UTF8.GetBytes("{" + - "\"PartitionId\":\"0\"," + - "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + - "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + - "\"Epoch\":386," + - "\"SequenceNumber\":960180}"); + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"SequenceNumber\":960180}"); + }); var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); @@ -510,11 +522,14 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; - containerClient.BlobContent = Encoding.UTF8.GetBytes("{" + - "\"PartitionId\":\"0\"," + - "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + - "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + - "\"Epoch\":386}"); + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386}"); + }); var mockLogger = new Mock(); @@ -547,8 +562,11 @@ public async Task ListCheckpointsConsidersDataInvalidWithLegacyCheckpointBlobCon }; var containerClient = new MockBlobContainerClient() { Blobs = blobList }; - containerClient.BlobContent = Encoding.UTF8.GetBytes(json); + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", client => + { + client.Content = Encoding.UTF8.GetBytes(json); + }); var mockLogger = new Mock(); var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); @@ -648,7 +666,15 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobExists() "snapshot", new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList, BlobInfo = blobInfo, BlobClientUploadBlobException = new Exception("Upload should not be called") }, + + var mockContainerClient = new MockBlobContainerClient() { Blobs = blobList }; + mockContainerClient.AddBlobClient("fqns/name/group/checkpoint/1", client => + { + client.BlobInfo = blobInfo; + client.UploadBlobException = new Exception("Upload should not be called"); + }); + + var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -680,7 +706,9 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobDoesNotExist() "snapshot", new Dictionary {{BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}}) }; - var target = new BlobsCheckpointStore(new MockBlobContainerClient() { Blobs = blobList }, + var mockBlobContainerClient = new MockBlobContainerClient() { Blobs = blobList }; + mockBlobContainerClient.AddBlobClient("fqns/name/group/checkpoint/1", _ => { }); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -708,7 +736,13 @@ public void UpdateCheckpointLogsErrorsWhenTheBlobExists() var expectedException = new DllNotFoundException("BOOM!"); var mockLog = new Mock(); - var mockContainerClient = new MockBlobContainerClient() { BlobClientSetMetadataException = expectedException, BlobClientUploadBlobException = new Exception("Upload should not be called") }; + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/checkpoint/1", client => + { + client.BlobClientSetMetadataException = expectedException; + client.UploadBlobException = new Exception("Upload should not be called"); + }); + var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); target.Logger = mockLog.Object; @@ -735,7 +769,10 @@ public void UpdateCheckpointLogsErrorsWhenTheBlobDoesNotExist() var expectedException = new DllNotFoundException("BOOM!"); var mockLog = new Mock(); - var mockContainerClient = new MockBlobContainerClient() { BlobClientUploadBlobException = expectedException }; + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/checkpoint/1", client => + { + client.UploadBlobException = expectedException; + }); var target = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); target.Logger = mockLog.Object; @@ -760,7 +797,8 @@ public void UpdateCheckpointForMissingContainerThrowsAndLogsCheckpointUpdateErro }; var ex = new RequestFailedException(404, BlobErrorCode.ContainerNotFound.ToString(), BlobErrorCode.ContainerNotFound.ToString(), null); - var target = new BlobsCheckpointStore(new MockBlobContainerClient(blobClientUploadBlobException: ex), + var mockBlobContainerClient = new MockBlobContainerClient().AddBlobClient("fqns/name/group/checkpoint/1", client => client.UploadBlobException = ex); + var target = new BlobsCheckpointStore(mockBlobContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var mockLog = new Mock(); target.Logger = mockLog.Object; @@ -903,10 +941,7 @@ public void ClaimOwnershipAsyncRetriesAndSurfacesRetriableExceptionsWhenVersionI var expectedServiceCalls = (maximumRetries + 1); var serviceCalls = 0; - var mockRetryPolicy = new Mock(); - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); var ownership = new EventProcessorPartitionOwnership { @@ -921,11 +956,16 @@ public void ClaimOwnershipAsyncRetriesAndSurfacesRetriableExceptionsWhenVersionI .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - serviceCalls++; - throw exception; - }; + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -950,9 +990,6 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNull(E var expectedServiceCalls = 1; var serviceCalls = 0; - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var ownership = new EventProcessorPartitionOwnership { FullyQualifiedNamespace = "ns", @@ -962,11 +999,15 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNull(E PartitionId = "pid" }; - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - serviceCalls++; - throw exception; - }; + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -994,9 +1035,6 @@ public void ClaimOwnershipAsyncRetriesAndSurfacesRetriableExceptionsWhenVersionI var serviceCalls = 0; var mockRetryPolicy = new Mock(); - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); - var ownership = new EventProcessorPartitionOwnership { FullyQualifiedNamespace = "ns", @@ -1011,13 +1049,15 @@ public void ClaimOwnershipAsyncRetriesAndSurfacesRetriableExceptionsWhenVersionI .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - mockContainerClient.BlobInfo = Mock.Of(); - - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - serviceCalls++; - throw exception; - }; + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1041,10 +1081,6 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNotNul { var expectedServiceCalls = 1; var serviceCalls = 0; - - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var ownership = new EventProcessorPartitionOwnership { FullyQualifiedNamespace = "ns", @@ -1055,13 +1091,15 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNotNul Version = "eTag" }; - mockContainerClient.BlobInfo = Mock.Of(); - - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - serviceCalls++; - throw exception; - }; + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1084,9 +1122,6 @@ public void ClaimOwnershipAsyncSurfacesNonRetriableExceptionsWhenVersionIsNotNul [TestCase("eTag")] public async Task ClaimOwnershipAsyncDelegatesTheCancellationToken(string version) { - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var ownership = new EventProcessorPartitionOwnership { FullyQualifiedNamespace = "ns", @@ -1103,32 +1138,37 @@ public async Task ClaimOwnershipAsyncDelegatesTheCancellationToken(string versio // UploadAsync will be called if eTag is null; SetMetadataAsync is used otherwise. - if (version == null) + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + if (version == null) { - if (!stateBeforeCancellation.HasValue) + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => { - stateBeforeCancellation = token.IsCancellationRequested; - cancellationSource.Cancel(); - stateAfterCancellation = token.IsCancellationRequested; - } - }; - } - else - { - mockContainerClient.BlobInfo = Mock.Of(); - - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => + if (!stateBeforeCancellation.HasValue) + { + stateBeforeCancellation = token.IsCancellationRequested; + cancellationSource.Cancel(); + stateAfterCancellation = token.IsCancellationRequested; + } + }; + } + else { - if (!stateBeforeCancellation.HasValue) + client.BlobInfo = Mock.Of(); + + client.SetMetadataAsyncCallback = (metadata, conditions, token) => { - stateBeforeCancellation = token.IsCancellationRequested; - cancellationSource.Cancel(); - stateAfterCancellation = token.IsCancellationRequested; - } - }; - } + if (!stateBeforeCancellation.HasValue) + { + stateBeforeCancellation = token.IsCancellationRequested; + cancellationSource.Cancel(); + stateAfterCancellation = token.IsCancellationRequested; + } + }; + } + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); await checkpointStore.ClaimOwnershipAsync(new List() { ownership }, cancellationSource.Token); @@ -1289,7 +1329,18 @@ public void UpdateCheckpointAsyncRetriesAndSurfacesRetriableExceptionsWhenTheBlo var serviceCalls = 0; var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); - var mockContainerClient = new MockBlobContainerClient { BlobInfo = blobInfo, BlobClientUploadBlobException = new Exception("Upload should not be called") }; + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => + { + client.BlobInfo = blobInfo; + client.UploadBlobException = new Exception("Upload should not be called"); + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var mockRetryPolicy = new Mock(); var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); @@ -1305,12 +1356,6 @@ public void UpdateCheckpointAsyncRetriesAndSurfacesRetriableExceptionsWhenTheBlo .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => - { - serviceCalls++; - throw exception; - }; - // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1336,9 +1381,7 @@ public void UpdateCheckpointAsyncRetriesAndSurfacesRetriableExceptionsWhenTheBlo var expectedServiceCalls = (maximumRetries + 1); var serviceCalls = 0; - var mockContainerClient = new MockBlobContainerClient(); var mockRetryPolicy = new Mock(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); var checkpoint = new EventProcessorCheckpoint { @@ -1356,11 +1399,16 @@ public void UpdateCheckpointAsyncRetriesAndSurfacesRetriableExceptionsWhenTheBlo .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => { - serviceCalls++; - throw exception; - }; + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, mockRetryPolicy.Object); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1386,7 +1434,16 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobExists var serviceCalls = 0; var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); - var mockContainerClient = new MockBlobContainerClient { BlobInfo = blobInfo, BlobClientUploadBlobException = new Exception("Upload should not be called") }; + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => + { + client.BlobInfo = blobInfo; + client.UploadBlobException = new Exception("Upload should not be called"); + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + serviceCalls++; + throw exception; + }; + }); var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var checkpoint = new EventProcessorCheckpoint @@ -1397,12 +1454,6 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobExists PartitionId = "pid" }; - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => - { - serviceCalls++; - throw exception; - }; - // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1426,9 +1477,6 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobDoesNo var expectedServiceCalls = 1; var serviceCalls = 0; - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var checkpoint = new EventProcessorCheckpoint { FullyQualifiedNamespace = "ns", @@ -1437,11 +1485,15 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobDoesNo PartitionId = "pid" }; - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => { - serviceCalls++; - throw exception; - }; + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + { + serviceCalls++; + throw exception; + }; + }); + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); // To ensure that the test does not hang for the duration, set a timeout to force completion // after a shorter period of time. @@ -1463,7 +1515,25 @@ public void UpdateCheckpointAsyncSurfacesNonRetriableExceptionsWhenTheBlobDoesNo public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobExists() { var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); - var mockContainerClient = new MockBlobContainerClient { BlobInfo = blobInfo, BlobClientUploadBlobException = new Exception("Upload should not be called") }; + + using var cancellationSource = new CancellationTokenSource(); + var stateBeforeCancellation = default(bool?); + var stateAfterCancellation = default(bool?); + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => + { + client.BlobInfo = blobInfo; + client.UploadBlobException = new Exception("Upload should not be called"); + client.SetMetadataAsyncCallback = (metadata, conditions, token) => + { + if (!stateBeforeCancellation.HasValue) + { + stateBeforeCancellation = token.IsCancellationRequested; + cancellationSource.Cancel(); + stateAfterCancellation = token.IsCancellationRequested; + } + }; + }); var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); var checkpoint = new EventProcessorCheckpoint @@ -1474,19 +1544,6 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobE PartitionId = "pid" }; - using var cancellationSource = new CancellationTokenSource(); - var stateBeforeCancellation = default(bool?); - var stateAfterCancellation = default(bool?); - - mockContainerClient.BlobClientSetMetadataAsyncCallback = (metadata, conditions, token) => - { - if (!stateBeforeCancellation.HasValue) - { - stateBeforeCancellation = token.IsCancellationRequested; - cancellationSource.Cancel(); - stateAfterCancellation = token.IsCancellationRequested; - } - }; await checkpointStore.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), cancellationSource.Token); @@ -1504,9 +1561,6 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobE [Test] public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobDoesNotExist() { - var mockContainerClient = new MockBlobContainerClient(); - var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); - var checkpoint = new EventProcessorCheckpoint { FullyQualifiedNamespace = "ns", @@ -1519,15 +1573,21 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobD var stateBeforeCancellation = default(bool?); var stateAfterCancellation = default(bool?); - mockContainerClient.BlobClientUploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => + + var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => { - if (!stateBeforeCancellation.HasValue) + client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => { - stateBeforeCancellation = token.IsCancellationRequested; - cancellationSource.Cancel(); - stateAfterCancellation = token.IsCancellationRequested; - } - }; + if (!stateBeforeCancellation.HasValue) + { + stateBeforeCancellation = token.IsCancellationRequested; + cancellationSource.Cancel(); + stateAfterCancellation = token.IsCancellationRequested; + } + }; + }); + + var checkpointStore = new BlobsCheckpointStore(mockContainerClient, new BasicRetryPolicy(new EventHubsRetryOptions())); await checkpointStore.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), cancellationSource.Token); @@ -1567,22 +1627,15 @@ private class MockBlobContainerClient : BlobContainerClient public override string AccountName { get; } public override string Name { get; } internal IEnumerable Blobs; - internal BlobInfo BlobInfo; - internal byte[] BlobContent; - internal Exception BlobClientUploadBlobException; - internal Exception BlobClientSetMetadataException; internal Exception GetBlobsAsyncException; internal Action GetBlobsAsyncCallback; - internal Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> BlobClientUploadAsyncCallback; - internal Action, BlobRequestConditions, CancellationToken> BlobClientSetMetadataAsyncCallback; + internal Dictionary BlobClients = new (); public MockBlobContainerClient(string accountName = "blobAccount", string containerName = "container", - Exception getBlobsAsyncException = null, - Exception blobClientUploadBlobException = null) + Exception getBlobsAsyncException = null) { GetBlobsAsyncException = getBlobsAsyncException; - BlobClientUploadBlobException = blobClientUploadBlobException; Blobs = Enumerable.Empty(); AccountName = accountName; Name = containerName; @@ -1602,36 +1655,32 @@ public override AsyncPageable GetBlobsAsync(BlobTraits traits = BlobTr public override BlobClient GetBlobClient(string blobName) { - return new MockBlobClient(blobName, BlobInfo, BlobClientUploadBlobException, BlobClientSetMetadataException, BlobClientUploadAsyncCallback, BlobClientSetMetadataAsyncCallback, BlobContent); + return BlobClients[blobName]; + } + + internal MockBlobContainerClient AddBlobClient(string name, Action configure) + { + var client = new MockBlobClient(name); + configure(client); + BlobClients[name] = client; + return this; } } private class MockBlobClient : BlobClient { public override string Name { get; } + internal BlobInfo BlobInfo; - internal Exception BlobClientUploadBlobException; + internal Exception UploadBlobException; internal Exception BlobClientSetMetadataException; + internal byte[] Content; + internal Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> UploadAsyncCallback; + internal Action, BlobRequestConditions, CancellationToken> SetMetadataAsyncCallback; - private byte[] Content; - private Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> UploadAsyncCallback; - private Action, BlobRequestConditions, CancellationToken> SetMetadataAsyncCallback; - - public MockBlobClient(string blobName, - BlobInfo blobInfo = null, - Exception blobClientUploadBlobException = null, - Exception blobClientSetMetadataException = null, - Action, BlobRequestConditions, IProgress, AccessTier?, StorageTransferOptions, CancellationToken> uploadAsyncCallback = null, - Action, BlobRequestConditions, CancellationToken> setMetadataAsyncCallback = null, - byte[] content = null) + public MockBlobClient(string blobName) { - BlobClientUploadBlobException = blobClientUploadBlobException; - BlobClientSetMetadataException = blobClientSetMetadataException; - UploadAsyncCallback = uploadAsyncCallback; - SetMetadataAsyncCallback = setMetadataAsyncCallback; Name = blobName; - BlobInfo = blobInfo; - Content = content; } public override Task> SetMetadataAsync(IDictionary metadata, BlobRequestConditions conditions = null, CancellationToken cancellationToken = default(CancellationToken)) @@ -1660,9 +1709,9 @@ public override Task> UploadAsync(Stream content, Blob { UploadAsyncCallback?.Invoke(content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, cancellationToken); - if (BlobClientUploadBlobException != null) + if (UploadBlobException != null) { - throw BlobClientUploadBlobException; + throw UploadBlobException; } if (BlobInfo != null) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 2c01054e484f..7c043f6b89f2 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -5,6 +5,7 @@ using System.Collections.Generic; using System.Globalization; using System.IO; +using System.Linq; using System.Text.Json; using System.Text.RegularExpressions; using System.Threading; @@ -341,14 +342,27 @@ async Task> listCheckpointsAsync(CancellationToke return checkpoints; }; - async Task> listLegacyCheckpointsAsync(CancellationToken listCheckpointsToken) + async Task> listLegacyCheckpointsAsync(List existingCheckpoints, CancellationToken listCheckpointsToken) { var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant()); var checkpoints = new List(); await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(prefix: legacyPrefix, cancellationToken: listCheckpointsToken).ConfigureAwait(false)) { + // Skip empty blobs + if (blob.Properties.ContentLength == 0) + { + continue; + } + var partitionId = blob.Name.Substring(legacyPrefix.Length); + + // Check whether there is already a checkpoint for this partition id + if (existingCheckpoints.Any(existingCheckpoint => string.Equals(existingCheckpoint.PartitionId, partitionId, StringComparison.Ordinal))) + { + continue; + } + var startingPosition = default(EventPosition?); BlobBaseClient blobClient = ContainerClient.GetBlobClient(blob.Name); @@ -356,7 +370,7 @@ async Task> listLegacyCheckpointsAsync(Cancellati await blobClient.DownloadToAsync(memoryStream, listCheckpointsToken).ConfigureAwait(false); TryReadLegacyCheckpoint( - memoryStream.GetBuffer().AsSpan(0, (int) memoryStream.Length), + memoryStream.GetBuffer().AsSpan(0, (int)memoryStream.Length), out long? offset, out long? sequenceNumber); @@ -391,13 +405,14 @@ async Task> listLegacyCheckpointsAsync(Cancellati return checkpoints; }; - IList checkpoints = Array.Empty(); + List checkpoints = null; try { + checkpoints = await ApplyRetryPolicy(listCheckpointsAsync, cancellationToken).ConfigureAwait(false); - if (checkpoints.Count == 0 && ReadLegacyCheckpoints) + if (ReadLegacyCheckpoints) { - checkpoints = await ApplyRetryPolicy(listLegacyCheckpointsAsync, cancellationToken).ConfigureAwait(false); + checkpoints.AddRange(await ApplyRetryPolicy(ct => listLegacyCheckpointsAsync(checkpoints, ct), cancellationToken).ConfigureAwait(false)); } return checkpoints; } @@ -413,7 +428,7 @@ async Task> listLegacyCheckpointsAsync(Cancellati } finally { - ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpoints.Count); + ListCheckpointsComplete(fullyQualifiedNamespace, eventHubName, consumerGroup, checkpoints?.Count ?? 0); } } @@ -566,8 +581,21 @@ async Task wrapper(CancellationToken token) /// /// Attempts to read a legacy checkpoint JSON format and extract an offset and a sequence number - /// {"PartitionId":"0","Owner":"681d365b-de1b-4288-9733-76294e17daf0","Token":"2d0c4276-827d-4ca4-a345-729caeca3b82","Epoch":386,"Offset":"8591964920","SequenceNumber":960180} /// + /// The binary representation of the checkpoint JSON. + /// The parsed offset. null if not found. + /// The parsed sequence number. null if not found. + /// + /// Sample checkpoint JSON: + /// { + /// "PartitionId":"0", + /// "Owner":"681d365b-de1b-4288-9733-76294e17daf0", + /// "Token":"2d0c4276-827d-4ca4-a345-729caeca3b82", + /// "Epoch":386, + /// "Offset":"8591964920", + /// "SequenceNumber":960180 + /// } + /// private static void TryReadLegacyCheckpoint(Span data, out long? offset, out long? sequenceNumber) { offset = null; @@ -609,7 +637,7 @@ private static void TryReadLegacyCheckpoint(Span data, out long? offset, o } catch (JsonException) { - // ignore + // Ignore this because if the data is malformed, it will be treated as if the checkpoint didn't exist. } } From 364934cb68f69840c4cc81442c23f42ef7e912c6 Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 7 Dec 2020 11:15:04 -0800 Subject: [PATCH 4/8] implement per-parition reading --- .../BlobsCheckpointStoreTests.cs | 95 +++++++++++++++++++ .../BlobsCheckpointStore.cs | 4 +- 2 files changed, 97 insertions(+), 2 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index 90ab292ed104..aaae913912f2 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -440,6 +440,101 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe mockLogger.Verify(log => log.InvalidCheckpointFound(partitionId, FullyQualifiedNamespace, EventHubName, ConsumerGroup)); } + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsPreferredNewCheckpointOverLegacy() + { + string partitionId = Guid.NewGuid().ToString(); + var blobList = new List{ + + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag), contentLength: 0), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.SequenceNumber, "960182"}, + {BlobMetadataKey.Offset, "14"} + }), + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId}", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"Offset\":\"13\"," + + "\"SequenceNumber\":960180}"); + }); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + var checkpoints = await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken()); + + Assert.That(checkpoints, Has.One.Items, "A single checkpoint should have been returned."); + Assert.That(checkpoints.Single().StartingPosition, Is.EqualTo(EventPosition.FromOffset(14, false))); + Assert.That(checkpoints.Single().PartitionId, Is.EqualTo(partitionId)); + } + + /// + /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. + /// + /// + [Test] + public async Task ListCheckpointsMergesNewAndLegacyCheckpoints() + { + string partitionId1 = Guid.NewGuid().ToString(); + string partitionId2 = Guid.NewGuid().ToString(); + var blobList = new List{ + + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId1}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag), contentLength: 0), + "snapshot", + new Dictionary + { + {BlobMetadataKey.OwnerIdentifier, Guid.NewGuid().ToString()}, + {BlobMetadataKey.SequenceNumber, "960182"}, + {BlobMetadataKey.Offset, "14"} + }), + BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId2}", + false, + BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), + "snapshot") + }; + + var containerClient = new MockBlobContainerClient() { Blobs = blobList }; + containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId2}", client => + { + client.Content = Encoding.UTF8.GetBytes("{" + + "\"PartitionId\":\"0\"," + + "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + + "\"Epoch\":386," + + "\"Offset\":\"13\"," + + "\"SequenceNumber\":960180}"); + }); + + var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); + var checkpoints = (await target.ListCheckpointsAsync(FullyQualifiedNamespace, EventHubName, ConsumerGroup, new CancellationToken())).ToArray(); + + Assert.That(checkpoints, Has.Exactly(2).Items, "Two checkpoints should have been returned."); + Assert.That(checkpoints[0].StartingPosition, Is.EqualTo(EventPosition.FromOffset(14, false))); + Assert.That(checkpoints[0].PartitionId, Is.EqualTo(partitionId1)); + Assert.That(checkpoints[1].StartingPosition, Is.EqualTo(EventPosition.FromOffset(13, false))); + Assert.That(checkpoints[1].PartitionId, Is.EqualTo(partitionId2)); + } + /// /// Verifies basic functionality of ListCheckpointsAsync and ensures the starting position is set correctly. /// diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 7c043f6b89f2..989c5d82ecba 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -83,7 +83,7 @@ internal partial class BlobsCheckpointStore : StorageManager /// /// The client used to interact with the Azure Blob Storage service. /// The retry policy to use as the basis for interacting with the Storage Blobs service. - /// Indicates whether to read legacy checkpoints when no current version checkpoints are available. + /// Indicates whether to read legacy checkpoint when no current version checkpoint is available for a partition. /// public BlobsCheckpointStore(BlobContainerClient blobContainerClient, EventHubsRetryPolicy retryPolicy, @@ -349,7 +349,7 @@ async Task> listLegacyCheckpointsAsync(List Date: Mon, 7 Dec 2020 11:16:45 -0800 Subject: [PATCH 5/8] whitespace --- .../tests/CheckpointStore/BlobsCheckpointStoreTests.cs | 6 ------ 1 file changed, 6 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index aaae913912f2..3c702ccf1963 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -626,7 +626,6 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe "\"Epoch\":386}"); }); - var mockLogger = new Mock(); var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); @@ -1051,7 +1050,6 @@ public void ClaimOwnershipAsyncRetriesAndSurfacesRetriableExceptionsWhenVersionI .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => @@ -1233,7 +1231,6 @@ public async Task ClaimOwnershipAsyncDelegatesTheCancellationToken(string versio // UploadAsync will be called if eTag is null; SetMetadataAsync is used otherwise. - var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/ownership/pid", client => { if (version == null) @@ -1494,7 +1491,6 @@ public void UpdateCheckpointAsyncRetriesAndSurfacesRetriableExceptionsWhenTheBlo .Setup(policy => policy.CalculateRetryDelay(It.Is(value => value == exception), It.Is(value => value <= maximumRetries))) .Returns(TimeSpan.FromMilliseconds(5)); - var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => { client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => @@ -1639,7 +1635,6 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobE PartitionId = "pid" }; - await checkpointStore.UpdateCheckpointAsync(checkpoint, new EventData(Array.Empty()), cancellationSource.Token); Assert.That(stateBeforeCancellation.HasValue, Is.True, "State before cancellation should have been captured."); @@ -1668,7 +1663,6 @@ public async Task UpdateCheckpointAsyncDelegatesTheCancellationTokenWhenTheBlobD var stateBeforeCancellation = default(bool?); var stateAfterCancellation = default(bool?); - var mockContainerClient = new MockBlobContainerClient().AddBlobClient("ns/eh/cg/checkpoint/pid", client => { client.UploadAsyncCallback = (content, httpHeaders, metadata, conditions, progressHandler, accessTier, transferOptions, token) => From 485f25bbf257f23a71580b3a228b122977e085dc Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 7 Dec 2020 11:19:13 -0800 Subject: [PATCH 6/8] formatting --- .../tests/CheckpointStore/BlobsCheckpointStoreTests.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index 3c702ccf1963..c2f52f8e2417 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -88,7 +88,8 @@ public void ConstructorRequiresRetryPolicy() [Test] public async Task ListOwnershipLogsStartAndComplete() { - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), From 5627402fb4a1e1392bca8d67b288d008e0a2fafb Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 7 Dec 2020 11:31:30 -0800 Subject: [PATCH 7/8] formatting --- .../CheckpointStore/BlobsCheckpointStoreTests.cs | 12 ++++++++---- .../src/BlobCheckpointStore/BlobsCheckpointStore.cs | 1 - 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index c2f52f8e2417..29a3be5bf1bf 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -476,7 +476,8 @@ public async Task ListCheckpointsPreferredNewCheckpointOverLegacy() "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + "\"Epoch\":386," + "\"Offset\":\"13\"," + - "\"SequenceNumber\":960180}"); + "\"SequenceNumber\":960180" + + "}"); }); var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); @@ -523,7 +524,8 @@ public async Task ListCheckpointsMergesNewAndLegacyCheckpoints() "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + "\"Epoch\":386," + "\"Offset\":\"13\"," + - "\"SequenceNumber\":960180}"); + "\"SequenceNumber\":960180" + + "}"); }); var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); @@ -559,7 +561,8 @@ public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLeg "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + "\"Epoch\":386," + "\"Offset\":\"13\"," + - "\"SequenceNumber\":960180}"); + "\"SequenceNumber\":960180" + + "}"); }); var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); @@ -592,7 +595,8 @@ public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOf "\"Owner\":\"681d365b-de1b-4288-9733-76294e17daf0\"," + "\"Token\":\"2d0c4276-827d-4ca4-a345-729caeca3b82\"," + "\"Epoch\":386," + - "\"SequenceNumber\":960180}"); + "\"SequenceNumber\":960180" + + "}"); }); var target = new BlobsCheckpointStore(containerClient, new BasicRetryPolicy(new EventHubsRetryOptions()), readLegacyCheckpoints: true); diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs index 989c5d82ecba..2f27f1cc5455 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Shared/src/BlobCheckpointStore/BlobsCheckpointStore.cs @@ -408,7 +408,6 @@ async Task> listLegacyCheckpointsAsync(List checkpoints = null; try { - checkpoints = await ApplyRetryPolicy(listCheckpointsAsync, cancellationToken).ConfigureAwait(false); if (ReadLegacyCheckpoints) { From 68f916beb612b4cc889ed41c77d29a6c041a3d2d Mon Sep 17 00:00:00 2001 From: Pavel Krymets Date: Mon, 7 Dec 2020 11:44:03 -0800 Subject: [PATCH 8/8] more formatting --- .../BlobsCheckpointStoreTests.cs | 41 ++++++++++++------- 1 file changed, 26 insertions(+), 15 deletions(-) diff --git a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs index 29a3be5bf1bf..8fc6b663a657 100644 --- a/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs +++ b/sdk/eventhub/Azure.Messaging.EventHubs.Processor/tests/CheckpointStore/BlobsCheckpointStoreTests.cs @@ -329,7 +329,8 @@ public void ClaimOwnershipForMissingPartitionThrowsAndLogsOwnershipNotClaimable( [Test] public async Task ListCheckpointsLogsStartAndComplete() { - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -361,7 +362,8 @@ public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresent() var expectedOffset = 13; var expectedStartingPosition = EventPosition.FromOffset(expectedOffset, false); - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -390,7 +392,8 @@ public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOf var expectedSequence = 133; var expectedStartingPosition = EventPosition.FromSequenceNumber(expectedSequence, false); - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -417,7 +420,8 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe { var partitionId = "67"; - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -449,8 +453,8 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe public async Task ListCheckpointsPreferredNewCheckpointOverLegacy() { string partitionId = Guid.NewGuid().ToString(); - var blobList = new List{ - + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag), contentLength: 0), @@ -497,8 +501,8 @@ public async Task ListCheckpointsMergesNewAndLegacyCheckpoints() { string partitionId1 = Guid.NewGuid().ToString(); string partitionId2 = Guid.NewGuid().ToString(); - var blobList = new List{ - + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId1}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag), contentLength: 0), @@ -545,7 +549,8 @@ public async Task ListCheckpointsMergesNewAndLegacyCheckpoints() [Test] public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLegacyCheckpoint() { - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -580,7 +585,8 @@ public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresentInLeg [Test] public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOffsetIsPresentInLegacyCheckpoint() { - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -614,7 +620,8 @@ public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOf [Test] public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumberLegacyCheckpoint() { - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -653,7 +660,8 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe [TestCase("\0\0\0")] public async Task ListCheckpointsConsidersDataInvalidWithLegacyCheckpointBlobContainingInvalidJson(string json) { - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/0", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -707,7 +715,8 @@ public async Task ListCheckpointsLogsInvalidCheckpoint() { var partitionId = Guid.NewGuid().ToString(); - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -758,7 +767,8 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobExists() var blobInfo = BlobsModelFactory.BlobInfo(new ETag($@"""{MatchingEtag}"""), DateTime.UtcNow); - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)), @@ -798,7 +808,8 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobDoesNotExist() PartitionId = PartitionId, }; - var blobList = new List{ + var blobList = new List + { BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}", false, BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),