From c0b4859eb9a47ac79b03b2e83e7bd6c9bc42ef5f Mon Sep 17 00:00:00 2001 From: Richard Park <51494936+richardpark-msft@users.noreply.github.com> Date: Mon, 6 Nov 2023 17:23:50 -0800 Subject: [PATCH] [azeventhubs] Relinquish ownership of partitions when the Processor is shut down (#21899) Relinquish ownership of partitions when the Processor is shut down. This makes them immediately available instead of having to wait for them to expire. Fixes #21868 --- sdk/messaging/azeventhubs/CHANGELOG.md | 8 +- .../azeventhubs/checkpoints/blob_store.go | 31 +++- .../checkpoints/blob_store_test.go | 150 +++++++++--------- .../checkpoints/blob_store_unit_test.go | 91 +++++++++++ sdk/messaging/azeventhubs/go.mod | 12 +- sdk/messaging/azeventhubs/go.sum | 29 ++-- .../inmemory_checkpoint_store_test.go | 11 ++ sdk/messaging/azeventhubs/processor.go | 42 ++++- .../azeventhubs/processor_load_balancer.go | 7 +- .../processor_load_balancers_test.go | 34 ++++ sdk/messaging/azeventhubs/processor_test.go | 102 ++++++++++++ 11 files changed, 401 insertions(+), 116 deletions(-) create mode 100644 sdk/messaging/azeventhubs/checkpoints/blob_store_unit_test.go diff --git a/sdk/messaging/azeventhubs/CHANGELOG.md b/sdk/messaging/azeventhubs/CHANGELOG.md index fe84abe83435..6e9f3d28c5fb 100644 --- a/sdk/messaging/azeventhubs/CHANGELOG.md +++ b/sdk/messaging/azeventhubs/CHANGELOG.md @@ -1,14 +1,10 @@ # Release History -## 1.0.2 (Unreleased) - -### Features Added - -### Breaking Changes +## 1.0.2 (2023-11-07) ### Bugs Fixed -### Other Changes +- Processor now relinquishes ownership of partitions when it shuts down, making them immediately available to other active Processor instances. (PR#TBD) ## 1.0.1 (2023-06-06) diff --git a/sdk/messaging/azeventhubs/checkpoints/blob_store.go b/sdk/messaging/azeventhubs/checkpoints/blob_store.go index 5bb0fe17068b..ece760739e0d 100644 --- a/sdk/messaging/azeventhubs/checkpoints/blob_store.go +++ b/sdk/messaging/azeventhubs/checkpoints/blob_store.go @@ -172,7 +172,7 @@ func (b *BlobStore) ListOwnership(ctx context.Context, fullyQualifiedNamespace s PartitionID: partitionID, } - if err := updateOwnership(blob, &o); err != nil { + if err := copyOwnershipPropsFromBlob(blob, &o); err != nil { return nil, err } @@ -349,18 +349,35 @@ func newCheckpointBlobMetadata(cpd azeventhubs.Checkpoint) map[string]*string { return m } -func updateOwnership(b *container.BlobItem, destOwnership *azeventhubs.Ownership) error { - if b == nil || b.Metadata == nil || b.Properties == nil { +func copyOwnershipPropsFromBlob(b *container.BlobItem, destOwnership *azeventhubs.Ownership) error { + if b == nil || b.Properties == nil { return fmt.Errorf("no ownership metadata for blob") } - ownerID, ok := b.Metadata["ownerid"] + // there are two states for ownerID + // nil (empty string when mapped across): a partition that was owned but was relinquished. + // a valid string: the owner ID or instanceID of the owning partition client. + // + // By default we'll represent it as relinquished. + var ownerID string + + // There's a bug in azblob where it omits metadata keys entirely if + // the value is nil. For now, I'll assume an empty metadata means + // we have a nil ownerid. + // https://github.com/Azure/azure-sdk-for-go/issues/21887 + if b.Metadata != nil { + tmpOwnerID, ok := b.Metadata["ownerid"] + + if !ok { + return errors.New("ownerid is missing from metadata") + } - if !ok || ownerID == nil { - return errors.New("ownerid is missing from metadata") + if tmpOwnerID != nil { + ownerID = *tmpOwnerID + } } - destOwnership.OwnerID = *ownerID + destOwnership.OwnerID = ownerID destOwnership.LastModifiedTime = *b.Properties.LastModified destOwnership.ETag = b.Properties.ETag return nil diff --git a/sdk/messaging/azeventhubs/checkpoints/blob_store_test.go b/sdk/messaging/azeventhubs/checkpoints/blob_store_test.go index 49923e1f46a5..cccc6c2fe5a9 100644 --- a/sdk/messaging/azeventhubs/checkpoints/blob_store_test.go +++ b/sdk/messaging/azeventhubs/checkpoints/blob_store_test.go @@ -20,20 +20,13 @@ import ( ) func TestBlobStore_Checkpoints(t *testing.T) { - testData := getContainerClient(t) - defer testData.Cleanup() + testData := newBlobStoreTestData(t) - cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil) - require.NoError(t, err) - - store, err := checkpoints.NewBlobStore(cc, nil) - require.NoError(t, err) - - checkpoints, err := store.ListCheckpoints(context.Background(), "fully-qualified-namespace", "event-hub-name", "consumer-group", nil) + checkpoints, err := testData.BlobStore.ListCheckpoints(context.Background(), "fully-qualified-namespace", "event-hub-name", "consumer-group", nil) require.NoError(t, err) require.Empty(t, checkpoints) - err = store.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{ + err = testData.BlobStore.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{ ConsumerGroup: "$Default", EventHubName: "event-hub-name", FullyQualifiedNamespace: "ns.servicebus.windows.net", @@ -43,7 +36,7 @@ func TestBlobStore_Checkpoints(t *testing.T) { }, nil) require.NoError(t, err) - checkpoints, err = store.ListCheckpoints(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil) + checkpoints, err = testData.BlobStore.ListCheckpoints(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil) require.NoError(t, err) require.Equal(t, azeventhubs.Checkpoint{ @@ -57,7 +50,7 @@ func TestBlobStore_Checkpoints(t *testing.T) { // There's a code path to allow updating the blob after it's been created but without an etag // in which case it just updates it. - err = store.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{ + err = testData.BlobStore.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{ ConsumerGroup: "$Default", EventHubName: "event-hub-name", FullyQualifiedNamespace: "ns.servicebus.windows.net", @@ -67,7 +60,7 @@ func TestBlobStore_Checkpoints(t *testing.T) { }, nil) require.NoError(t, err) - checkpoints, err = store.ListCheckpoints(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil) + checkpoints, err = testData.BlobStore.ListCheckpoints(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil) require.NoError(t, err) require.Equal(t, azeventhubs.Checkpoint{ @@ -81,28 +74,21 @@ func TestBlobStore_Checkpoints(t *testing.T) { } func TestBlobStore_Ownership(t *testing.T) { - testData := getContainerClient(t) - defer testData.Cleanup() - - cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil) - require.NoError(t, err) - - store, err := checkpoints.NewBlobStore(cc, nil) - require.NoError(t, err) + testData := newBlobStoreTestData(t) - ownerships, err := store.ListOwnership(context.Background(), "fully-qualified-namespace", "event-hub-name", "consumer-group", nil) + ownerships, err := testData.BlobStore.ListOwnership(context.Background(), "fully-qualified-namespace", "event-hub-name", "consumer-group", nil) require.NoError(t, err) require.Empty(t, ownerships, "no ownerships yet") - ownerships, err = store.ClaimOwnership(context.Background(), nil, nil) + ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), nil, nil) require.NoError(t, err) require.Empty(t, ownerships) - ownerships, err = store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{}, nil) + ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{}, nil) require.NoError(t, err) require.Empty(t, ownerships) - ownerships, err = store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{ + ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{ { ConsumerGroup: "$Default", EventHubName: "event-hub-name", @@ -129,7 +115,7 @@ func TestBlobStore_Ownership(t *testing.T) { // if we attempt to claim it with a non-matching etag it will fail to claim // but not fail the call. - ownerships, err = store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{ + ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{ { ConsumerGroup: "$Default", EventHubName: "event-hub-name", @@ -143,7 +129,7 @@ func TestBlobStore_Ownership(t *testing.T) { require.Empty(t, ownerships, "we're out of date (based on the non-matching etag), so no ownerships were claimed") // now we'll use the actual etag - ownerships, err = store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{ + ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{ { ConsumerGroup: "$Default", EventHubName: "event-hub-name", @@ -172,16 +158,9 @@ func TestBlobStore_Ownership(t *testing.T) { func TestBlobStore_ListAndClaim(t *testing.T) { // listing ownerships is a slightly different code path - testData := getContainerClient(t) - defer testData.Cleanup() - - cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil) - require.NoError(t, err) + testData := newBlobStoreTestData(t) - store, err := checkpoints.NewBlobStore(cc, nil) - require.NoError(t, err) - - claimedOwnerships, err := store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{ + claimedOwnerships, err := testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{ { ConsumerGroup: "$Default", EventHubName: "event-hub-name", @@ -193,7 +172,7 @@ func TestBlobStore_ListAndClaim(t *testing.T) { require.NoError(t, err) require.NotEmpty(t, claimedOwnerships) - listedOwnerships, err := store.ListOwnership(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil) + listedOwnerships, err := testData.BlobStore.ListOwnership(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil) require.NoError(t, err) require.Equal(t, "first-client", listedOwnerships[0].OwnerID) @@ -206,26 +185,19 @@ func TestBlobStore_ListAndClaim(t *testing.T) { require.Equal(t, "partition-id", listedOwnerships[0].PartitionID) // update using the etag - claimedOwnerships, err = store.ClaimOwnership(context.Background(), listedOwnerships, nil) + claimedOwnerships, err = testData.BlobStore.ClaimOwnership(context.Background(), listedOwnerships, nil) require.NoError(t, err) require.Equal(t, "partition-id", claimedOwnerships[0].PartitionID) // try to do it again and it'll fail since we don't have an updated etag - claimedOwnerships, err = store.ClaimOwnership(context.Background(), listedOwnerships, nil) + claimedOwnerships, err = testData.BlobStore.ClaimOwnership(context.Background(), listedOwnerships, nil) require.NoError(t, err) require.Empty(t, claimedOwnerships) } func TestBlobStore_OnlyOneOwnershipClaimSucceeds(t *testing.T) { - testData := getContainerClient(t) - defer testData.Cleanup() - - cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil) - require.NoError(t, err) - - store, err := checkpoints.NewBlobStore(cc, nil) - require.NoError(t, err) + testData := newBlobStoreTestData(t) // we're going to make multiple calls to the blob store but only _one_ should succeed // since it's "first one in wins" @@ -240,7 +212,7 @@ func TestBlobStore_OnlyOneOwnershipClaimSucceeds(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{ + ownerships, err := testData.BlobStore.ClaimOwnership(ctx, []azeventhubs.Ownership{ {ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"}, }, nil) @@ -275,14 +247,7 @@ func TestBlobStore_OnlyOneOwnershipClaimSucceeds(t *testing.T) { } func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) { - testData := getContainerClient(t) - defer testData.Cleanup() - - cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil) - require.NoError(t, err) - - store, err := checkpoints.NewBlobStore(cc, nil) - require.NoError(t, err) + testData := newBlobStoreTestData(t) // we're going to make multiple calls to the blob store but only _one_ should succeed // since it's "first one in wins" @@ -291,7 +256,7 @@ func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() - ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{ + ownerships, err := testData.BlobStore.ClaimOwnership(ctx, []azeventhubs.Ownership{ {ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"}, }, nil) require.NoError(t, err) @@ -305,7 +270,7 @@ func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) { for i := 0; i < cap(claimsCh); i++ { go func() { - ownerships, err := store.ClaimOwnership(ctx, ownerships, nil) + ownerships, err := testData.BlobStore.ClaimOwnership(ctx, ownerships, nil) if err != nil { claimsCh <- nil @@ -337,22 +302,50 @@ func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) { require.Equal(t, cap(claimsCh)-1, numFailedClaims, fmt.Sprintf("One of the 1/%d wins and the rest all fail to claim", cap(claimsCh))) } -func getContainerClient(t *testing.T) struct { - ConnectionString string - ContainerName string - Cleanup func() -} { +func TestBlobStore_RelinquishClaim(t *testing.T) { + testData := newBlobStoreTestData(t) + + initialClaims, err := testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{ + { + ConsumerGroup: azeventhubs.DefaultConsumerGroup, + EventHubName: "eventhubname", + FullyQualifiedNamespace: "fullyQualifiedNamespace", + PartitionID: "partitionID", + OwnerID: "ownerID", + LastModifiedTime: time.Now().UTC(), + }, + }, nil) + require.NoError(t, err) + require.Equal(t, "ownerID", initialClaims[0].OwnerID) + + // relinquish our ownership claim + initialClaims[0].OwnerID = "" + relinquishedClaims, err := testData.BlobStore.ClaimOwnership(context.Background(), initialClaims, nil) + require.NoError(t, err) + require.Empty(t, relinquishedClaims[0].OwnerID) + + // now be some other person and claim it. + relinquishedClaims[0].OwnerID = "new owner!" + lastClaimed, err := testData.BlobStore.ClaimOwnership(context.Background(), relinquishedClaims, nil) + require.NoError(t, err) + require.Equal(t, "new owner!", lastClaimed[0].OwnerID) +} + +type blobStoreTestData struct { + CC *container.Client + BlobStore *checkpoints.BlobStore +} + +// newBlobStoreTestData creates an Azure Blob storage container +// and returns the associated ContainerClient and BlobStore instance. +func newBlobStoreTestData(t *testing.T) blobStoreTestData { _ = godotenv.Load("../.env") storageCS := os.Getenv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING") if storageCS == "" { t.Skipf("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING is not defined in the environment. Skipping blob checkpoint store live tests") - return struct { - ConnectionString string - ContainerName string - Cleanup func() - }{} + return blobStoreTestData{} } nano := time.Now().UTC().UnixNano() @@ -364,16 +357,15 @@ func getContainerClient(t *testing.T) struct { _, err = client.Create(context.Background(), nil) require.NoError(t, err) - return struct { - ConnectionString string - ContainerName string - Cleanup func() - }{ - ConnectionString: storageCS, - ContainerName: containerName, - Cleanup: func() { - _, err := client.Delete(context.Background(), nil) - require.NoError(t, err) - }, + blobStore, err := checkpoints.NewBlobStore(client, nil) + require.NoError(t, err) + + t.Cleanup(func() { + + }) + + return blobStoreTestData{ + CC: client, + BlobStore: blobStore, } } diff --git a/sdk/messaging/azeventhubs/checkpoints/blob_store_unit_test.go b/sdk/messaging/azeventhubs/checkpoints/blob_store_unit_test.go new file mode 100644 index 000000000000..606813f4f834 --- /dev/null +++ b/sdk/messaging/azeventhubs/checkpoints/blob_store_unit_test.go @@ -0,0 +1,91 @@ +// Copyright (c) Microsoft Corporation. All rights reserved. +// Licensed under the MIT License. +package checkpoints + +import ( + "testing" + "time" + + "github.com/Azure/azure-sdk-for-go/sdk/azcore" + "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" + "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs" + "github.com/Azure/azure-sdk-for-go/sdk/storage/azblob/container" + "github.com/stretchr/testify/require" +) + +func TestBlobStore_copyOwnershipPropsFromBlob(t *testing.T) { + t.Run("MetadataWorkaround", func(t *testing.T) { + now := time.Now() + blobItem := container.BlobItem{ + Properties: &container.BlobProperties{ + ETag: to.Ptr(azcore.ETag([]byte{1, 2, 3})), + LastModified: &now, + }, + } + ownership := &azeventhubs.Ownership{} + err := copyOwnershipPropsFromBlob(&blobItem, ownership) + require.NoError(t, err) + + // this is the workaround - if the metadata dictionary is empty then we + // just give you back an empty owner ID + require.Empty(t, ownership.OwnerID) + require.Equal(t, ownership.ETag, to.Ptr(azcore.ETag([]byte{1, 2, 3}))) + require.Equal(t, now, ownership.LastModifiedTime) + }) + + t.Run("WithMetadataAndOwnerID", func(t *testing.T) { + now := time.Now() + blobItem := container.BlobItem{ + Properties: &container.BlobProperties{ + ETag: to.Ptr(azcore.ETag([]byte{1, 2, 3})), + LastModified: &now, + }, + Metadata: map[string]*string{ + "ownerid": to.Ptr("owner id"), + }, + } + ownership := &azeventhubs.Ownership{} + err := copyOwnershipPropsFromBlob(&blobItem, ownership) + require.NoError(t, err) + + require.Equal(t, "owner id", ownership.OwnerID) + require.Equal(t, ownership.ETag, to.Ptr(azcore.ETag([]byte{1, 2, 3}))) + require.Equal(t, now, ownership.LastModifiedTime) + }) + + t.Run("WithMetadataNilOwnerID", func(t *testing.T) { + now := time.Now() + blobItem := container.BlobItem{ + Properties: &container.BlobProperties{ + ETag: to.Ptr(azcore.ETag([]byte{1, 2, 3})), + LastModified: &now, + }, + Metadata: map[string]*string{ + // In the future this is what I'd expect to see. + "ownerid": nil, + }, + } + ownership := &azeventhubs.Ownership{} + err := copyOwnershipPropsFromBlob(&blobItem, ownership) + require.NoError(t, err) + + require.Empty(t, ownership.OwnerID) + require.Equal(t, ownership.ETag, to.Ptr(azcore.ETag([]byte{1, 2, 3}))) + require.Equal(t, now, ownership.LastModifiedTime) + }) + + t.Run("WithMetadataNoOwnerIDFails", func(t *testing.T) { + now := time.Now() + blobItem := container.BlobItem{ + Properties: &container.BlobProperties{ + ETag: to.Ptr(azcore.ETag([]byte{1, 2, 3})), + LastModified: &now, + }, + Metadata: map[string]*string{}, // having metadata but no ownerid is incorrectly formed + } + ownership := &azeventhubs.Ownership{} + err := copyOwnershipPropsFromBlob(&blobItem, ownership) + require.EqualError(t, err, "ownerid is missing from metadata") + }) + +} diff --git a/sdk/messaging/azeventhubs/go.mod b/sdk/messaging/azeventhubs/go.mod index 027603efe4ac..9b8f2f53752b 100644 --- a/sdk/messaging/azeventhubs/go.mod +++ b/sdk/messaging/azeventhubs/go.mod @@ -4,10 +4,10 @@ go 1.18 require ( github.com/Azure/azure-sdk-for-go/sdk/azcore v1.8.0 - github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 + github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 github.com/Azure/azure-sdk-for-go/sdk/internal v1.4.0 github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0 - github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 + github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 github.com/Azure/go-amqp v1.0.2 github.com/golang/mock v1.6.0 github.com/joho/godotenv v1.4.0 @@ -17,14 +17,14 @@ require ( require ( code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c // indirect - github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 // indirect + github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/gofrs/uuid v3.3.0+incompatible // indirect - github.com/golang-jwt/jwt v3.2.1+incompatible // indirect - github.com/google/uuid v1.1.1 // indirect + github.com/golang-jwt/jwt/v4 v4.5.0 // indirect + github.com/google/uuid v1.3.0 // indirect github.com/klauspost/compress v1.10.3 // indirect github.com/kylelemons/godebug v1.1.0 // indirect - github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 // indirect + github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect golang.org/x/crypto v0.14.0 // indirect golang.org/x/net v0.17.0 // indirect diff --git a/sdk/messaging/azeventhubs/go.sum b/sdk/messaging/azeventhubs/go.sum index 6b91a92aed99..1dfe268854db 100644 --- a/sdk/messaging/azeventhubs/go.sum +++ b/sdk/messaging/azeventhubs/go.sum @@ -2,18 +2,19 @@ code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c h1:5eeuG0BHx1+DHe code.cloudfoundry.org/clock v0.0.0-20180518195852-02e53af36e6c/go.mod h1:QD9Lzhd/ux6eNQVUDVRJX/RKTigpewimNYBi7ivZKY8= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.8.0 h1:9kDVnTz3vbfweTqAUmk/a/pH5pWFCHtvRpHYC0G/dcA= github.com/Azure/azure-sdk-for-go/sdk/azcore v1.8.0/go.mod h1:3Ug6Qzto9anB6mGlEdgYMDF5zHQ+wwhEaYR4s17PHMw= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0 h1:QkAcEIAKbNL4KoFr4SathZPhDhF4mVwpBMFlYjyAqy8= -github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.1.0/go.mod h1:bhXu1AjYL+wutSL/kpSq6s7733q2Rb0yuot9Zgfqa/0= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0 h1:vcYCAze6p19qBW7MhZybIsqD8sMV8js0NyQM8JDnVtg= +github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.3.0/go.mod h1:OQeznEEkTZ9OrhHJoDD8ZDq51FHgXjqtP9z6bEwBq9U= github.com/Azure/azure-sdk-for-go/sdk/internal v1.4.0 h1:TuEMD+E+1aTjjLICGQOW6vLe8UWES7kopac9mUXL56Y= github.com/Azure/azure-sdk-for-go/sdk/internal v1.4.0/go.mod h1:s4kgfzA0covAXNicZHDMN58jExvcng2mC/DepXiF1EI= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0 h1:BWeAAEzkCnL0ABVJqs+4mYudNch7oFGPtTlSmIWL8ms= github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0/go.mod h1:Y3gnVwfaz8h6L1YHar+NfWORtBoVUSB5h4GlGkdeF7Q= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0 h1:u/LLAOFgsMv7HmNL4Qufg58y+qElGOt5qv0z1mURkRY= -github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.0.0/go.mod h1:2e8rMJtl2+2j+HXbTBwnyGpm5Nou7KhvSfxOq8JpTag= +github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.2.0 h1:Ma67P/GGprNwsslzEH6+Kb8nybI8jpDTm4Wmzu2ReK8= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0 h1:gggzg0SUMs6SQbEw+3LoSsYf9YMjkupeAnHMX8O9mmY= +github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.2.0/go.mod h1:+6KLcKIVgxoBDMqMO/Nvy7bZ9a0nbU3I1DtFQK3YvB4= github.com/Azure/go-amqp v1.0.2 h1:zHCHId+kKC7fO8IkwyZJnWMvtRXhYC0VJtD0GYkHc6M= github.com/Azure/go-amqp v1.0.2/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE= -github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1 h1:BWe8a+f/t+7KY7zH2mqygeUD0t8hNFXe08p1Pb3/jKE= -github.com/AzureAD/microsoft-authentication-library-for-go v0.5.1/go.mod h1:Vt9sXTKwMyGcOxSmLDMnGPgqsUg7m8pe215qMLrDXw4= +github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0 h1:OBhqkivkhkMqLPymWEppkm7vgPQY2XsHoEkaMQ0AdZY= +github.com/AzureAD/microsoft-authentication-library-for-go v1.0.0/go.mod h1:kgDmCTgBzIEPFElEF+FK0SdjAor06dRq2Go927dnQ6o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= @@ -39,9 +40,8 @@ github.com/gobwas/ws v1.0.2 h1:CoAavW/wd/kulfZmSIBt6p24n4j7tHgNVCjsfHVNUbo= github.com/gobwas/ws v1.0.2/go.mod h1:szmBTxLgaFppYjEmNtny/v3w89xOydFnnZMcgRRu/EM= github.com/gofrs/uuid v3.3.0+incompatible h1:8K4tyRfvU1CYPgJsveYFQMhpFd/wXNM7iK6rR7UHz84= github.com/gofrs/uuid v3.3.0+incompatible/go.mod h1:b2aQJv3Z4Fp6yNu3cdSllBxTCLRxnplIgP/c0N/04lM= -github.com/golang-jwt/jwt v3.2.1+incompatible h1:73Z+4BJcrTC+KczS6WvTPvRGOp1WmfEP4Q1lOd9Z/+c= -github.com/golang-jwt/jwt v3.2.1+incompatible/go.mod h1:8pz2t5EyA70fFQQSrl6XZXzqecmYZeUEB8OUGHkxJ+I= -github.com/golang-jwt/jwt/v4 v4.2.0 h1:besgBTC8w8HjP6NzQdxwKH9Z5oQMZ24ThTrHp3cZ8eU= +github.com/golang-jwt/jwt/v4 v4.5.0 h1:7cYmW1XlMY7h7ii7UhUyChSgS5wUJEnm9uZVTGqOWzg= +github.com/golang-jwt/jwt/v4 v4.5.0/go.mod h1:m21LjoU+eqJr34lmDMbreY2eSTRJ1cv77w39/MY0Ch0= github.com/golang/mock v1.6.0 h1:ErTB+efbowRARo13NNdxyJji2egdxLGQhRaY+DUumQc= github.com/golang/mock v1.6.0/go.mod h1:p6yTPP+5HYm5mzsMV8JkE6ZKdX+/wYM6Hr+LicevLPs= github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= @@ -51,8 +51,8 @@ github.com/golang/protobuf v1.3.5/go.mod h1:6O5/vntMXwX2lRkT1hjjk0nAC1IDOTvTlVgj github.com/google/go-cmp v0.4.0/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.5.9 h1:O2Tfq5qg4qc4AmwVlvv0oLiVAGB7enBSJ2x2DqQFi38= github.com/google/gofuzz v1.0.0/go.mod h1:dBl0BpW6vV/+mYPU4Po3pmUjxk6FQPldtuIdl/M65Eg= -github.com/google/uuid v1.1.1 h1:Gkbcsh/GbpXz7lPftLA3P6TYMwjCLYm83jiFQZF/3gY= -github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= +github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I= +github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo= github.com/gorilla/websocket v1.4.1 h1:q7AeDBpnBk8AogcD4DSag/Ukw/KV+YhzLj2bP5HvKCM= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= @@ -79,12 +79,11 @@ github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421 h1:ZqeYNhU3OH github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742 h1:Esafd1046DLDQ0W1YjYsBW+p8U2u7vzgW2SQVmlNazg= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= -github.com/montanaflynn/stats v0.6.6/go.mod h1:etXPPgVO6n31NxCd9KQUMvCM+ve0ruNzt6R8Bnaayow= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= -github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4 h1:Qj1ukM4GlMWXNdMBuXcXfz/Kw9s1qm0CLY32QxuSImI= -github.com/pkg/browser v0.0.0-20210115035449-ce105d075bb4/go.mod h1:N6UoU20jOqggOuDwUaBQpluzLNDqif3kq9z2wpdYEfQ= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8 h1:KoWmjvw+nsYOo29YJK9vDA65RGE3NrOnUtO7a+RF9HU= +github.com/pkg/browser v0.0.0-20210911075715-681adbf594b8/go.mod h1:HKlIX3XHQyzLZPlr7++PzdhaXEj94dEiJgZDTsxEqUI= github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= @@ -117,9 +116,9 @@ golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5h golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20201119102817-f84b799fce68/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.0.0-20210124154548-22da62e12c0c/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210330210617-4fbd30eecc44/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20210616045830-e2b7044e8c71/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.13.0 h1:Af8nKPmuFypiUBjVoU9V20FiaFXOcuZI21p0ycVYYGE= golang.org/x/sys v0.13.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= diff --git a/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go b/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go index 44aacfa626d0..e0b0f50fa65c 100644 --- a/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go +++ b/sdk/messaging/azeventhubs/inmemory_checkpoint_store_test.go @@ -187,6 +187,17 @@ func (cps *testCheckpointStore) ExpireOwnership(o Ownership) { cps.ownerships[key] = oldO } +func (cps *testCheckpointStore) ReqlinquishOwnership(o Ownership) { + key := strings.Join([]string{o.FullyQualifiedNamespace, o.EventHubName, o.ConsumerGroup, o.PartitionID}, "/") + + cps.ownershipMu.Lock() + defer cps.ownershipMu.Unlock() + + oldO := cps.ownerships[key] + oldO.OwnerID = "" + cps.ownerships[key] = oldO +} + func (cps *testCheckpointStore) ClaimOwnership(ctx context.Context, partitionOwnership []Ownership, options *ClaimOwnershipOptions) ([]Ownership, error) { var owned []Ownership diff --git a/sdk/messaging/azeventhubs/processor.go b/sdk/messaging/azeventhubs/processor.go index 5eae14183d4c..f6ca6173e0f5 100644 --- a/sdk/messaging/azeventhubs/processor.go +++ b/sdk/messaging/azeventhubs/processor.go @@ -8,6 +8,7 @@ import ( "fmt" "math/rand" "sync" + "sync/atomic" "time" "github.com/Azure/azure-sdk-for-go/sdk/azcore/to" @@ -100,6 +101,10 @@ type Processor struct { runCalled chan struct{} lb *processorLoadBalancer + + // claimedOwnerships is set to whatever our current ownerships are. The underlying + // value is a []Ownership. + currentOwnerships *atomic.Value } type consumerClientForProcessor interface { @@ -154,6 +159,9 @@ func newProcessorImpl(consumerClient consumerClientForProcessor, checkpointStore return nil, fmt.Errorf("invalid load balancing strategy '%s'", strategy) } + currentOwnerships := &atomic.Value{} + currentOwnerships.Store([]Ownership{}) + return &Processor{ ownershipUpdateInterval: updateInterval, consumerClient: consumerClient, @@ -167,6 +175,8 @@ func newProcessorImpl(consumerClient consumerClientForProcessor, checkpointStore consumerClientDetails: consumerClient.getDetails(), runCalled: make(chan struct{}), lb: newProcessorLoadBalancer(checkpointStore, consumerClient.getDetails(), strategy, partitionDurationExpiration), + currentOwnerships: currentOwnerships, + // `nextClients` will be initialized when the user calls Run() since it needs to query the # // of partitions on the Event Hub. }, nil @@ -224,7 +234,11 @@ func (p *Processor) Run(ctx context.Context) error { func (p *Processor) runImpl(ctx context.Context) error { consumers := &sync.Map{} - defer closeConsumers(ctx, consumers) + defer func() { + ctx, cancel := context.WithTimeout(context.Background(), time.Minute) + defer cancel() + p.closeConsumers(ctx, consumers) + }() // size the channel to the # of partitions. We can never exceed this size since // we'll never reclaim a partition that we already have ownership of. @@ -292,6 +306,12 @@ func (p *Processor) dispatch(ctx context.Context, eventHubProperties EventHubPro wg := sync.WaitGroup{} + // store off the set of ownerships we claimed this round - when the processor + // shuts down we'll clear them (if we still own them). + tmpOwnerships := make([]Ownership, len(ownerships)) + copy(tmpOwnerships, ownerships) + p.currentOwnerships.Store(tmpOwnerships) + for _, ownership := range ownerships { wg.Add(1) @@ -323,6 +343,8 @@ func (p *Processor) addPartitionClient(ctx context.Context, ownership Ownership, }, } + // RP: I don't want to accidentally end up doing this logic because the user was closing it as we + // were doing our next load balance. if _, alreadyExists := consumers.LoadOrStore(ownership.PartitionID, processorPartClient); alreadyExists { return nil } @@ -407,7 +429,7 @@ func (p *Processor) getCheckpointsMap(ctx context.Context) (map[string]Checkpoin return m, nil } -func closeConsumers(ctx context.Context, consumersMap *sync.Map) { +func (p *Processor) closeConsumers(ctx context.Context, consumersMap *sync.Map) { consumersMap.Range(func(key, value any) bool { client := value.(*ProcessorPartitionClient) @@ -417,4 +439,20 @@ func closeConsumers(ctx context.Context, consumersMap *sync.Map) { return true }) + + currentOwnerships := p.currentOwnerships.Load().([]Ownership) + + for i := 0; i < len(currentOwnerships); i++ { + currentOwnerships[i].OwnerID = relinquishedOwnershipID + } + + _, err := p.checkpointStore.ClaimOwnership(ctx, currentOwnerships, nil) + + if err != nil { + azlog.Writef(EventConsumer, "Failed to relinquish ownerships. New processors will have to wait for ownerships to expire: %s", err.Error()) + } } + +// relinquishedOwnershipID indicates that a partition is immediately available, similar to +// how we treat an ownership that is expired as available. +const relinquishedOwnershipID = "" diff --git a/sdk/messaging/azeventhubs/processor_load_balancer.go b/sdk/messaging/azeventhubs/processor_load_balancer.go index e708c6dd514b..7ab002a421f5 100644 --- a/sdk/messaging/azeventhubs/processor_load_balancer.go +++ b/sdk/messaging/azeventhubs/processor_load_balancer.go @@ -152,7 +152,7 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par var unownedOrExpired []Ownership // split out partitions by whether they're currently owned - // and if they're expired. + // and if they're expired/relinquished. for _, o := range ownerships { alreadyAdded[o.PartitionID] = true @@ -161,6 +161,11 @@ func (lb *processorLoadBalancer) getAvailablePartitions(ctx context.Context, par continue } + if o.OwnerID == relinquishedOwnershipID { + unownedOrExpired = append(unownedOrExpired, o) + continue + } + groupedByOwner[o.OwnerID] = append(groupedByOwner[o.OwnerID], o) } diff --git a/sdk/messaging/azeventhubs/processor_load_balancers_test.go b/sdk/messaging/azeventhubs/processor_load_balancers_test.go index ee9b56a86ded..40df95fc14d4 100644 --- a/sdk/messaging/azeventhubs/processor_load_balancers_test.go +++ b/sdk/messaging/azeventhubs/processor_load_balancers_test.go @@ -294,6 +294,40 @@ func TestProcessorLoadBalancers_InvalidStrategy(t *testing.T) { require.EqualError(t, err, "invalid load balancing strategy 'super-greedy'") } +func TestProcessorLoadBalancers_AnyStrategy_GrabRelinquishedPartition(t *testing.T) { + for _, strategy := range []ProcessorStrategy{ProcessorStrategyBalanced, ProcessorStrategyGreedy} { + t.Run(string(strategy), func(t *testing.T) { + cps := newCheckpointStoreForTest() + + const clientA = "clientA" + const clientB = "clientB" + const clientCWithExpiredPartition = "clientC" + + middleOwnership := newTestOwnership("2", clientCWithExpiredPartition) + + _, err := cps.ClaimOwnership(context.Background(), []Ownership{ + newTestOwnership("0", clientA), + newTestOwnership("1", clientA), + middleOwnership, + newTestOwnership("3", clientB), + newTestOwnership("4", clientB), + }, nil) + require.NoError(t, err) + + // expire the middle partition (simulating that ClientC died, so nobody's updated it's ownership in awhile) + cps.ReqlinquishOwnership(middleOwnership) + + lb := newProcessorLoadBalancer(cps, newTestConsumerDetails(clientB), strategy, time.Hour) + + ownerships, err := lb.LoadBalance(context.Background(), []string{"0", "1", "2", "3", "4"}) + require.NoError(t, err) + require.NotEmpty(t, mapToStrings(ownerships, extractPartitionID)) + + requireBalanced(t, cps, 5, 2) + }) + } +} + func mapToStrings[T any](src []T, fn func(t T) string) []string { var dest []string diff --git a/sdk/messaging/azeventhubs/processor_test.go b/sdk/messaging/azeventhubs/processor_test.go index e08771110439..73bdb5c2a785 100644 --- a/sdk/messaging/azeventhubs/processor_test.go +++ b/sdk/messaging/azeventhubs/processor_test.go @@ -21,6 +21,38 @@ import ( "github.com/stretchr/testify/require" ) +func TestProcessor_PartitionsAreReqlinquished(t *testing.T) { + res := mustCreateProcessorForTest(t, TestProcessorArgs{ + Prefix: "loadbalance", + ProcessorOptions: &azeventhubs.ProcessorOptions{ + LoadBalancingStrategy: azeventhubs.ProcessorStrategyGreedy, + }, + }) + + hubProps, err := res.Consumer.GetEventHubProperties(context.Background(), nil) + require.NoError(t, err) + + ctx, stopProcessor := context.WithCancel(context.Background()) + defer stopProcessor() + processorClosed := make(chan struct{}) + + go func() { + err := res.Processor.Run(ctx) + require.NoError(t, err) + close(processorClosed) + }() + + // we expect to own all the partitions so we'll just wait until they're all claimed. + for i := 0; i < len(hubProps.PartitionIDs); i++ { + _ = res.Processor.NextPartitionClient(context.Background()) + } + + stopProcessor() + <-processorClosed + + requireAllOwnershipsRelinquished(t, res) +} + func TestProcessor_Balanced(t *testing.T) { testWithLoadBalancer(t, azeventhubs.ProcessorStrategyBalanced) } @@ -466,3 +498,73 @@ func printOwnerships(ctx context.Context, t *testing.T, cps azeventhubs.Checkpoi max, sb.String()) } + +type TestProcessorArgs struct { + Prefix string + ProcessorOptions *azeventhubs.ProcessorOptions + ConsumerOptions *azeventhubs.ConsumerClientOptions +} + +type TestProcessorResult struct { + ContainerName string + TestParams test.ConnectionParamsForTest + ContainerClient *container.Client + CheckpointStore azeventhubs.CheckpointStore + Processor *azeventhubs.Processor + Consumer *azeventhubs.ConsumerClient +} + +func mustCreateProcessorForTest(t *testing.T, args TestProcessorArgs) TestProcessorResult { + require.NotEmpty(t, args.Prefix) + + testParams := test.GetConnectionParamsForTest(t) + + containerName := test.RandomString(args.Prefix, 10) + cc, err := container.NewClientFromConnectionString(testParams.StorageConnectionString, containerName, nil) + require.NoError(t, err) + + t.Logf("Creating storage container %s", containerName) + _, err = cc.Create(context.Background(), nil) + require.NoError(t, err) + + t.Cleanup(func() { + t.Logf("Deleting storage container") + _, err = cc.Delete(context.Background(), nil) + require.NoError(t, err) + }) + + // Create the checkpoint store + // NOTE: the container must exist before the checkpoint store can be used. + t.Logf("Checkpoint store created") + checkpointStore, err := checkpoints.NewBlobStore(cc, nil) + require.NoError(t, err) + + t.Logf("Consumer client created") + consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString(testParams.ConnectionString, testParams.EventHubName, azeventhubs.DefaultConsumerGroup, args.ConsumerOptions) + require.NoError(t, err) + + t.Cleanup(func() { test.RequireClose(t, consumerClient) }) + + t.Logf("Processor created") + processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, args.ProcessorOptions) + require.NoError(t, err) + + return TestProcessorResult{ + CheckpointStore: checkpointStore, + ContainerClient: cc, + ContainerName: containerName, + TestParams: testParams, + Consumer: consumerClient, + Processor: processor, + } +} + +func requireAllOwnershipsRelinquished(t *testing.T, res TestProcessorResult) { + // now check that the ownerships exist but were all cleared out. + ownerships, err := res.CheckpointStore.ListOwnership(context.Background(), res.TestParams.EventHubNamespace, res.TestParams.EventHubName, azeventhubs.DefaultConsumerGroup, nil) + require.NoError(t, err) + + for _, o := range ownerships { + require.Empty(t, o.OwnerID) + } +}