Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable legacy checkpoint reading in WebJobs #17392

Merged
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,12 @@ namespace Azure.Messaging.EventHubs.Processor.Tests
///
public class BlobsCheckpointStoreTests
{
private const string FullyQualifiedNamespace = "fqns";
private const string EventHubName = "name";
private const string ConsumerGroup = "group";
private const string FullyQualifiedNamespace = "FqNs";
private const string EventHubName = "Name";
private const string ConsumerGroup = "Group";
private const string FullyQualifiedNamespaceLowercase = "fqns";
private const string EventHubNameLowercase = "name";
private const string ConsumerGroupLowercase = "group";
private const string MatchingEtag = "etag";
private const string WrongEtag = "wrongEtag";
private const string PartitionId = "1";
Expand Down Expand Up @@ -90,7 +93,7 @@ public async Task ListOwnershipLogsStartAndComplete()
{
var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/ownership/{Guid.NewGuid().ToString()}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
Expand Down Expand Up @@ -331,7 +334,7 @@ public async Task ListCheckpointsLogsStartAndComplete()
{
var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{Guid.NewGuid().ToString()}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
Expand Down Expand Up @@ -364,7 +367,7 @@ public async Task ListCheckpointsUsesOffsetAsTheStartingPositionWhenPresent()

var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{Guid.NewGuid().ToString()}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
Expand Down Expand Up @@ -394,7 +397,7 @@ public async Task ListCheckpointsUsesSequenceNumberAsTheStartingPositionWhenNoOf

var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{Guid.NewGuid().ToString()}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{Guid.NewGuid().ToString()}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
Expand Down Expand Up @@ -422,7 +425,7 @@ public async Task ListCheckpointsConsidersDataInvalidWithNoOffsetOrSequenceNumbe

var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{partitionId}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
Expand Down Expand Up @@ -455,7 +458,7 @@ public async Task ListCheckpointsPreferredNewCheckpointOverLegacy()
string partitionId = Guid.NewGuid().ToString();
var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{partitionId}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag), contentLength: 0),
"snapshot",
Expand All @@ -465,14 +468,14 @@ public async Task ListCheckpointsPreferredNewCheckpointOverLegacy()
{BlobMetadataKey.SequenceNumber, "960182"},
{BlobMetadataKey.Offset, "14"}
}),
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/{partitionId}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot")
};

var containerClient = new MockBlobContainerClient() { Blobs = blobList };
containerClient.AddBlobClient($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/{partitionId}", client =>
containerClient.AddBlobClient($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/{partitionId}", client =>
{
client.Content = Encoding.UTF8.GetBytes("{" +
"\"PartitionId\":\"0\"," +
Expand Down Expand Up @@ -503,7 +506,7 @@ public async Task ListCheckpointsMergesNewAndLegacyCheckpoints()
string partitionId2 = Guid.NewGuid().ToString();
var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId1}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{partitionId1}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag), contentLength: 0),
"snapshot",
Expand Down Expand Up @@ -717,7 +720,7 @@ public async Task ListCheckpointsLogsInvalidCheckpoint()

var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/checkpoint/{partitionId}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/checkpoint/{partitionId}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
Expand Down Expand Up @@ -769,7 +772,7 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobExists()

var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/ownership/{Guid.NewGuid().ToString()}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
Expand Down Expand Up @@ -810,7 +813,7 @@ public async Task UpdateCheckpointLogsStartAndCompleteWhenTheBlobDoesNotExist()

var blobList = new List<BlobItem>
{
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespace}/{EventHubName}/{ConsumerGroup}/ownership/{Guid.NewGuid().ToString()}",
BlobsModelFactory.BlobItem($"{FullyQualifiedNamespaceLowercase}/{EventHubNameLowercase}/{ConsumerGroupLowercase}/ownership/{Guid.NewGuid().ToString()}",
false,
BlobsModelFactory.BlobItemProperties(true, lastModified: DateTime.UtcNow, eTag: new ETag(MatchingEtag)),
"snapshot",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -344,7 +344,8 @@ async Task<List<EventProcessorCheckpoint>> listCheckpointsAsync(CancellationToke

async Task<List<EventProcessorCheckpoint>> listLegacyCheckpointsAsync(List<EventProcessorCheckpoint> existingCheckpoints, CancellationToken listCheckpointsToken)
{
var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace.ToLowerInvariant(), eventHubName.ToLowerInvariant(), consumerGroup.ToLowerInvariant());
// Legacy checkpoints are not normalized to lowercase
var legacyPrefix = string.Format(CultureInfo.InvariantCulture, LegacyCheckpointPrefix, fullyQualifiedNamespace, eventHubName, consumerGroup);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without this change, are the legacy checkpoints skipped? Do we have a test for this?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's an interesting question... I think that Storage is case-sensitive for naming, but I'm not 100% sure.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, I changed the existing tests to use the expected casing both to T1 and T2 checkpoints. Storage is case-sensitive.

var checkpoints = new List<EventProcessorCheckpoint>();

await foreach (BlobItem blob in ContainerClient.GetBlobsAsync(prefix: legacyPrefix, cancellationToken: listCheckpointsToken).ConfigureAwait(false))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ protected BlobsCheckpointStore()
public BlobsCheckpointStore(BlobContainerClient blobContainerClient,
EventHubsRetryPolicy retryPolicy,
string functionId,
ILogger logger): this(blobContainerClient, retryPolicy)
ILogger logger): this(blobContainerClient, retryPolicy, readLegacyCheckpoints: true)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we consider renaming to something like initializeWithLegacyCheckpoints to reflect that the intent is to read them once and then replace them?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, will do.

{
_functionId = functionId;
_logger = logger;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ public async Task ListCheckpointsAsync_LogsOnInvalidCheckpoints()
{
Page<BlobItem>.FromValues(new[]
{
BlobsModelFactory.BlobItem("testnamespace/testeventhubname/testconsumergroup/checkpoint/0", false, BlobsModelFactory.BlobItemProperties(false), metadata: new Dictionary<string, string>())
BlobsModelFactory.BlobItem("testnamespace/testeventhubname/testconsumergroup/checkpoint/0", false, BlobsModelFactory.BlobItemProperties(false, contentLength: 0), metadata: new Dictionary<string, string>())
Copy link
Member

@JoshLove-msft JoshLove-msft Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we pass contentLength of 0?

Copy link
Member

@jsquire jsquire Dec 8, 2020

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is no content for a checkpoint blob; its a zero-byte file. The data exists entirely in the metadata. I'm not sure why the text is explicitly setting this, as no logic relies on it, but it does mirror what is expected for the actual service return.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ah, fair enough. I was thinking in terms of the T2 checkpoints. Makes sense in the legacy context.

}, null, Mock.Of<Response>())
}));

Expand Down