Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor(kvstore): improve key deletion performance #892

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
172 changes: 144 additions & 28 deletions fastly/resource_fastly_kvstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@
"context"
"fmt"
"log"
"sort"
"sync"

"github.com/fastly/go-fastly/v9/fastly"
gofastly "github.com/fastly/go-fastly/v9/fastly"
"github.com/hashicorp/terraform-plugin-sdk/v2/diag"
"github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema"
Expand All @@ -21,7 +22,7 @@
Importer: &schema.ResourceImporter{
StateContext: schema.ImportStatePassthroughContext,
},
Schema: map[string]*schema.Schema{

Check failure on line 25 in fastly/resource_fastly_kvstore.go

View workflow job for this annotation

GitHub Actions / lint

XS002: schema attributes should be in alphabetical order
"force_destroy": {
Type: schema.TypeBool,
Default: false,
Expand All @@ -44,6 +45,18 @@
Description: "A unique name to identify the KV Store. It is important to note that changing this attribute will delete and recreate the KV Store, and discard the current entries. You MUST first delete the associated resource_link block from your service before modifying this field.",
ForceNew: true,
},
"delete_keys_pool_size": {
Type: schema.TypeInt,
Optional: true,
Default: 100,
Description: "Used only with `force_destroy` to define the size of the thread-pool used when deleting keys concurrently.",
},
"delete_keys_max_errors": {
Type: schema.TypeInt,
Optional: true,
Default: 100,
Description: "Used only with `force_destroy` to define the buffer length of a channel holding any errors while deleting keys concurrently.",
},
},
}
}
Expand Down Expand Up @@ -107,51 +120,154 @@
func resourceFastlyKVStoreDelete(_ context.Context, d *schema.ResourceData, meta any) diag.Diagnostics {
conn := meta.(*APIClient).conn

if !d.Get("force_destroy").(bool) {
mayDelete, err := isKVStoreEmpty(d.Id(), conn)
if err != nil {
return diag.FromErr(err)
}
storeEmpty, err := isKVStoreEmpty(d.Id(), conn)
if err != nil {
return diag.FromErr(err)
}

if !mayDelete {
if !storeEmpty {
if !d.Get("force_destroy").(bool) {
return diag.FromErr(fmt.Errorf("cannot delete KV Store (%s), it is not empty. Either delete the entries first, or set force_destroy to true and apply it before making this change", d.Id()))
}
}

// IMPORTANT: We must delete all keys first before we can delete the store.
p := conn.NewListKVStoreKeysPaginator(&gofastly.ListKVStoreKeysInput{
StoreID: d.Id(),
})
for p.Next() {
keys := p.Keys()
sort.Strings(keys)
for _, key := range keys {
err := conn.DeleteKVStoreKey(&gofastly.DeleteKVStoreKeyInput{
StoreID: d.Id(),
Key: key,
})
if err != nil {
return diag.FromErr(fmt.Errorf("error during KV Store key cleanup: %w", err))
}
maxErrors := d.Get("delete_keys_max_errors").(int)
poolSize := d.Get("delete_keys_pool_size").(int)
err := deleteAllKVStoreKeys(conn, d.Id(), maxErrors, poolSize)
if err != nil {
return diag.FromErr(fmt.Errorf("failed to delete all KV Store keys: %w", err))
}
}
if err := p.Err(); err != nil {
return diag.FromErr(fmt.Errorf("error during KV Store cleanup pagination: %w", err))
}

// p := conn.NewListKVStoreKeysPaginator(&gofastly.ListKVStoreKeysInput{
// StoreID: d.Id(),
// })
// for p.Next() {
// keys := p.Keys()
// sort.Strings(keys)
// for _, key := range keys {
// err := conn.DeleteKVStoreKey(&gofastly.DeleteKVStoreKeyInput{
// StoreID: d.Id(),
// Key: key,
// })
// if err != nil {
// return diag.FromErr(fmt.Errorf("error during KV Store key cleanup: %w", err))
// }
// }
// }
// if err := p.Err(); err != nil {
// return diag.FromErr(fmt.Errorf("error during KV Store cleanup pagination: %w", err))
// }
//

input := gofastly.DeleteKVStoreInput{
StoreID: d.Id(),
}

log.Printf("[DEBUG] DELETE: KV Store input: %#v", input)

err := conn.DeleteKVStore(&input)
err = conn.DeleteKVStore(&input)
if err != nil {
return diag.FromErr(err)
}
return nil
}

// deleteAllKVStoreKeys deletes all keys within the specified KV Store.
func deleteAllKVStoreKeys(conn *gofastly.Client, storeID string, maxErrors, poolSize int) error {
p := conn.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{
StoreID: storeID,
})

errorsCh := make(chan string, maxErrors)
keysCh := make(chan string, 1000) // number correlates to pagination page size

var (
failedKeys []string
mu sync.Mutex
wgProcessing sync.WaitGroup
wgErrorCh sync.WaitGroup
)

// We have three separate execution flows happening at once:
//
// 1. Pushing keys from pagination data into a key channel.
// 2. Pulling keys from error channel and appending to failedKeys slice.
// 3. Pulling keys from key channel and issuing API DELETE call.
//
// The second item is problematic, in that ranging over a channel only
// terminates when the channel is closed. So we need to ensure we close the
// errorsCh once we've finished processing the deletion of all the keys.
//
// To do that we need two sets of wait groups.
//
// The first is wgProcessing which keeps track of all goroutines related to
// processing the pagination data (e.g. the goroutine ranging over the
// paginator keys, and the goroutine ranging over the keysCh as part of the
// poolSize loop).
//
// The second wait group is wgErrorCh which tracks when the first
// (wgProcessing) has completed and then closes errorsCh.

// The following goroutine finishes once all pagination keys have been
// processed.
wgProcessing.Add(1)
go func() {
defer wgProcessing.Done()
defer close(keysCh)
for p.Next() {
for _, key := range p.Keys() {
keysCh <- key
}
}
}()

// The following goroutine finishes once the errorsCh is closed.
wgErrorCh.Add(1)
go func() {
defer wgErrorCh.Done()
for err := range errorsCh {
mu.Lock()
failedKeys = append(failedKeys, err)
mu.Unlock()
}
}()

// The following goroutines close once they've pulled all data from keysCh.
for i := 1; i <= poolSize; i++ {
wgProcessing.Add(1)
go func() {
defer wgProcessing.Done()
for key := range keysCh {
err := conn.DeleteKVStoreKey(&fastly.DeleteKVStoreKeyInput{StoreID: storeID, Key: key})
if err != nil {
select {
case errorsCh <- key:
default:
continue // the larger we make maxErrors the less likely we'll drop errors (obviously there's a memory trade-off to be made)
}
}
}
}()
}

// The following goroutine is closed once the 'processing' goroutines are
// finished.
wgErrorCh.Add(1)
go func() {
defer wgErrorCh.Done()
wgProcessing.Wait() // Wait for all deletion and pagination tasks.
close(errorsCh)
}()

// Wait for the error-handling goroutines to finish processing.
wgErrorCh.Wait()

if len(failedKeys) > 0 {
return fmt.Errorf("failed to delete %d keys", len(failedKeys))
}

return nil
}

func isKVStoreEmpty(storeID string, conn *gofastly.Client) (bool, error) {
keys, err := conn.ListKVStoreKeys(&gofastly.ListKVStoreKeysInput{
StoreID: storeID,
Expand Down
Loading