Skip to content

Commit

Permalink
[azeventhubs] Relinquish ownership of partitions when the Processor i…
Browse files Browse the repository at this point in the history
…s shut down (#21899)

Relinquish ownership of partitions when the Processor is shut down. This makes them immediately available instead of having to wait for them to expire.

Fixes #21868
  • Loading branch information
richardpark-msft authored Nov 7, 2023
1 parent 2ddf1e5 commit c0b4859
Show file tree
Hide file tree
Showing 11 changed files with 401 additions and 116 deletions.
8 changes: 2 additions & 6 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 1.0.2 (Unreleased)

### Features Added

### Breaking Changes
## 1.0.2 (2023-11-07)

### Bugs Fixed

### Other Changes
- Processor now relinquishes ownership of partitions when it shuts down, making them immediately available to other active Processor instances. (PR#TBD)

## 1.0.1 (2023-06-06)

Expand Down
31 changes: 24 additions & 7 deletions sdk/messaging/azeventhubs/checkpoints/blob_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ func (b *BlobStore) ListOwnership(ctx context.Context, fullyQualifiedNamespace s
PartitionID: partitionID,
}

if err := updateOwnership(blob, &o); err != nil {
if err := copyOwnershipPropsFromBlob(blob, &o); err != nil {
return nil, err
}

Expand Down Expand Up @@ -349,18 +349,35 @@ func newCheckpointBlobMetadata(cpd azeventhubs.Checkpoint) map[string]*string {
return m
}

func updateOwnership(b *container.BlobItem, destOwnership *azeventhubs.Ownership) error {
if b == nil || b.Metadata == nil || b.Properties == nil {
func copyOwnershipPropsFromBlob(b *container.BlobItem, destOwnership *azeventhubs.Ownership) error {
if b == nil || b.Properties == nil {
return fmt.Errorf("no ownership metadata for blob")
}

ownerID, ok := b.Metadata["ownerid"]
// there are two states for ownerID
// nil (empty string when mapped across): a partition that was owned but was relinquished.
// a valid string: the owner ID or instanceID of the owning partition client.
//
// By default we'll represent it as relinquished.
var ownerID string

// There's a bug in azblob where it omits metadata keys entirely if
// the value is nil. For now, I'll assume an empty metadata means
// we have a nil ownerid.
// https://github.com/Azure/azure-sdk-for-go/issues/21887
if b.Metadata != nil {
tmpOwnerID, ok := b.Metadata["ownerid"]

if !ok {
return errors.New("ownerid is missing from metadata")
}

if !ok || ownerID == nil {
return errors.New("ownerid is missing from metadata")
if tmpOwnerID != nil {
ownerID = *tmpOwnerID
}
}

destOwnership.OwnerID = *ownerID
destOwnership.OwnerID = ownerID
destOwnership.LastModifiedTime = *b.Properties.LastModified
destOwnership.ETag = b.Properties.ETag
return nil
Expand Down
150 changes: 71 additions & 79 deletions sdk/messaging/azeventhubs/checkpoints/blob_store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,13 @@ import (
)

func TestBlobStore_Checkpoints(t *testing.T) {
testData := getContainerClient(t)
defer testData.Cleanup()
testData := newBlobStoreTestData(t)

cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil)
require.NoError(t, err)

store, err := checkpoints.NewBlobStore(cc, nil)
require.NoError(t, err)

checkpoints, err := store.ListCheckpoints(context.Background(), "fully-qualified-namespace", "event-hub-name", "consumer-group", nil)
checkpoints, err := testData.BlobStore.ListCheckpoints(context.Background(), "fully-qualified-namespace", "event-hub-name", "consumer-group", nil)
require.NoError(t, err)
require.Empty(t, checkpoints)

err = store.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{
err = testData.BlobStore.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
FullyQualifiedNamespace: "ns.servicebus.windows.net",
Expand All @@ -43,7 +36,7 @@ func TestBlobStore_Checkpoints(t *testing.T) {
}, nil)
require.NoError(t, err)

checkpoints, err = store.ListCheckpoints(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil)
checkpoints, err = testData.BlobStore.ListCheckpoints(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil)
require.NoError(t, err)

require.Equal(t, azeventhubs.Checkpoint{
Expand All @@ -57,7 +50,7 @@ func TestBlobStore_Checkpoints(t *testing.T) {

// There's a code path to allow updating the blob after it's been created but without an etag
// in which case it just updates it.
err = store.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{
err = testData.BlobStore.SetCheckpoint(context.Background(), azeventhubs.Checkpoint{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
FullyQualifiedNamespace: "ns.servicebus.windows.net",
Expand All @@ -67,7 +60,7 @@ func TestBlobStore_Checkpoints(t *testing.T) {
}, nil)
require.NoError(t, err)

checkpoints, err = store.ListCheckpoints(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil)
checkpoints, err = testData.BlobStore.ListCheckpoints(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil)
require.NoError(t, err)

require.Equal(t, azeventhubs.Checkpoint{
Expand All @@ -81,28 +74,21 @@ func TestBlobStore_Checkpoints(t *testing.T) {
}

func TestBlobStore_Ownership(t *testing.T) {
testData := getContainerClient(t)
defer testData.Cleanup()

cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil)
require.NoError(t, err)

store, err := checkpoints.NewBlobStore(cc, nil)
require.NoError(t, err)
testData := newBlobStoreTestData(t)

ownerships, err := store.ListOwnership(context.Background(), "fully-qualified-namespace", "event-hub-name", "consumer-group", nil)
ownerships, err := testData.BlobStore.ListOwnership(context.Background(), "fully-qualified-namespace", "event-hub-name", "consumer-group", nil)
require.NoError(t, err)
require.Empty(t, ownerships, "no ownerships yet")

ownerships, err = store.ClaimOwnership(context.Background(), nil, nil)
ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), nil, nil)
require.NoError(t, err)
require.Empty(t, ownerships)

ownerships, err = store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{}, nil)
ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{}, nil)
require.NoError(t, err)
require.Empty(t, ownerships)

ownerships, err = store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{
ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{
{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
Expand All @@ -129,7 +115,7 @@ func TestBlobStore_Ownership(t *testing.T) {

// if we attempt to claim it with a non-matching etag it will fail to claim
// but not fail the call.
ownerships, err = store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{
ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{
{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
Expand All @@ -143,7 +129,7 @@ func TestBlobStore_Ownership(t *testing.T) {
require.Empty(t, ownerships, "we're out of date (based on the non-matching etag), so no ownerships were claimed")

// now we'll use the actual etag
ownerships, err = store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{
ownerships, err = testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{
{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
Expand Down Expand Up @@ -172,16 +158,9 @@ func TestBlobStore_Ownership(t *testing.T) {

func TestBlobStore_ListAndClaim(t *testing.T) {
// listing ownerships is a slightly different code path
testData := getContainerClient(t)
defer testData.Cleanup()

cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil)
require.NoError(t, err)
testData := newBlobStoreTestData(t)

store, err := checkpoints.NewBlobStore(cc, nil)
require.NoError(t, err)

claimedOwnerships, err := store.ClaimOwnership(context.Background(), []azeventhubs.Ownership{
claimedOwnerships, err := testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{
{
ConsumerGroup: "$Default",
EventHubName: "event-hub-name",
Expand All @@ -193,7 +172,7 @@ func TestBlobStore_ListAndClaim(t *testing.T) {
require.NoError(t, err)
require.NotEmpty(t, claimedOwnerships)

listedOwnerships, err := store.ListOwnership(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil)
listedOwnerships, err := testData.BlobStore.ListOwnership(context.Background(), "ns.servicebus.windows.net", "event-hub-name", "$Default", nil)
require.NoError(t, err)

require.Equal(t, "first-client", listedOwnerships[0].OwnerID)
Expand All @@ -206,26 +185,19 @@ func TestBlobStore_ListAndClaim(t *testing.T) {
require.Equal(t, "partition-id", listedOwnerships[0].PartitionID)

// update using the etag
claimedOwnerships, err = store.ClaimOwnership(context.Background(), listedOwnerships, nil)
claimedOwnerships, err = testData.BlobStore.ClaimOwnership(context.Background(), listedOwnerships, nil)
require.NoError(t, err)

require.Equal(t, "partition-id", claimedOwnerships[0].PartitionID)

// try to do it again and it'll fail since we don't have an updated etag
claimedOwnerships, err = store.ClaimOwnership(context.Background(), listedOwnerships, nil)
claimedOwnerships, err = testData.BlobStore.ClaimOwnership(context.Background(), listedOwnerships, nil)
require.NoError(t, err)
require.Empty(t, claimedOwnerships)
}

func TestBlobStore_OnlyOneOwnershipClaimSucceeds(t *testing.T) {
testData := getContainerClient(t)
defer testData.Cleanup()

cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil)
require.NoError(t, err)

store, err := checkpoints.NewBlobStore(cc, nil)
require.NoError(t, err)
testData := newBlobStoreTestData(t)

// we're going to make multiple calls to the blob store but only _one_ should succeed
// since it's "first one in wins"
Expand All @@ -240,7 +212,7 @@ func TestBlobStore_OnlyOneOwnershipClaimSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{
ownerships, err := testData.BlobStore.ClaimOwnership(ctx, []azeventhubs.Ownership{
{ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"},
}, nil)

Expand Down Expand Up @@ -275,14 +247,7 @@ func TestBlobStore_OnlyOneOwnershipClaimSucceeds(t *testing.T) {
}

func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) {
testData := getContainerClient(t)
defer testData.Cleanup()

cc, err := container.NewClientFromConnectionString(testData.ConnectionString, testData.ContainerName, nil)
require.NoError(t, err)

store, err := checkpoints.NewBlobStore(cc, nil)
require.NoError(t, err)
testData := newBlobStoreTestData(t)

// we're going to make multiple calls to the blob store but only _one_ should succeed
// since it's "first one in wins"
Expand All @@ -291,7 +256,7 @@ func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

ownerships, err := store.ClaimOwnership(ctx, []azeventhubs.Ownership{
ownerships, err := testData.BlobStore.ClaimOwnership(ctx, []azeventhubs.Ownership{
{ConsumerGroup: azeventhubs.DefaultConsumerGroup, EventHubName: "name", FullyQualifiedNamespace: "ns", PartitionID: "0", OwnerID: "ownerID"},
}, nil)
require.NoError(t, err)
Expand All @@ -305,7 +270,7 @@ func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) {
for i := 0; i < cap(claimsCh); i++ {
go func() {

ownerships, err := store.ClaimOwnership(ctx, ownerships, nil)
ownerships, err := testData.BlobStore.ClaimOwnership(ctx, ownerships, nil)

if err != nil {
claimsCh <- nil
Expand Down Expand Up @@ -337,22 +302,50 @@ func TestBlobStore_OnlyOneOwnershipUpdateSucceeds(t *testing.T) {
require.Equal(t, cap(claimsCh)-1, numFailedClaims, fmt.Sprintf("One of the 1/%d wins and the rest all fail to claim", cap(claimsCh)))
}

func getContainerClient(t *testing.T) struct {
ConnectionString string
ContainerName string
Cleanup func()
} {
func TestBlobStore_RelinquishClaim(t *testing.T) {
testData := newBlobStoreTestData(t)

initialClaims, err := testData.BlobStore.ClaimOwnership(context.Background(), []azeventhubs.Ownership{
{
ConsumerGroup: azeventhubs.DefaultConsumerGroup,
EventHubName: "eventhubname",
FullyQualifiedNamespace: "fullyQualifiedNamespace",
PartitionID: "partitionID",
OwnerID: "ownerID",
LastModifiedTime: time.Now().UTC(),
},
}, nil)
require.NoError(t, err)
require.Equal(t, "ownerID", initialClaims[0].OwnerID)

// relinquish our ownership claim
initialClaims[0].OwnerID = ""
relinquishedClaims, err := testData.BlobStore.ClaimOwnership(context.Background(), initialClaims, nil)
require.NoError(t, err)
require.Empty(t, relinquishedClaims[0].OwnerID)

// now be some other person and claim it.
relinquishedClaims[0].OwnerID = "new owner!"
lastClaimed, err := testData.BlobStore.ClaimOwnership(context.Background(), relinquishedClaims, nil)
require.NoError(t, err)
require.Equal(t, "new owner!", lastClaimed[0].OwnerID)
}

type blobStoreTestData struct {
CC *container.Client
BlobStore *checkpoints.BlobStore
}

// newBlobStoreTestData creates an Azure Blob storage container
// and returns the associated ContainerClient and BlobStore instance.
func newBlobStoreTestData(t *testing.T) blobStoreTestData {
_ = godotenv.Load("../.env")

storageCS := os.Getenv("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING")

if storageCS == "" {
t.Skipf("CHECKPOINTSTORE_STORAGE_CONNECTION_STRING is not defined in the environment. Skipping blob checkpoint store live tests")
return struct {
ConnectionString string
ContainerName string
Cleanup func()
}{}
return blobStoreTestData{}
}

nano := time.Now().UTC().UnixNano()
Expand All @@ -364,16 +357,15 @@ func getContainerClient(t *testing.T) struct {
_, err = client.Create(context.Background(), nil)
require.NoError(t, err)

return struct {
ConnectionString string
ContainerName string
Cleanup func()
}{
ConnectionString: storageCS,
ContainerName: containerName,
Cleanup: func() {
_, err := client.Delete(context.Background(), nil)
require.NoError(t, err)
},
blobStore, err := checkpoints.NewBlobStore(client, nil)
require.NoError(t, err)

t.Cleanup(func() {

})

return blobStoreTestData{
CC: client,
BlobStore: blobStore,
}
}
Loading

0 comments on commit c0b4859

Please sign in to comment.