diff --git a/fastly/resource_fastly_kvstore.go b/fastly/resource_fastly_kvstore.go index 5e1661c24..c39609e73 100644 --- a/fastly/resource_fastly_kvstore.go +++ b/fastly/resource_fastly_kvstore.go @@ -4,8 +4,9 @@ import ( "context" "fmt" "log" - "sort" + "sync" + "github.com/fastly/go-fastly/v9/fastly" gofastly "github.com/fastly/go-fastly/v9/fastly" "github.com/hashicorp/terraform-plugin-sdk/v2/diag" "github.com/hashicorp/terraform-plugin-sdk/v2/helper/schema" @@ -44,6 +45,18 @@ func resourceFastlyKVStore() *schema.Resource { Description: "A unique name to identify the KV Store. It is important to note that changing this attribute will delete and recreate the KV Store, and discard the current entries. You MUST first delete the associated resource_link block from your service before modifying this field.", ForceNew: true, }, + "delete_keys_pool_size": { + Type: schema.TypeInt, + Optional: true, + Default: 100, + Description: "Used only with `force_destroy` to define the size of the thread-pool used when deleting keys concurrently.", + }, + "delete_keys_max_errors": { + Type: schema.TypeInt, + Optional: true, + Default: 100, + Description: "Used only with `force_destroy` to define the buffer length of a channel holding any errors while deleting keys concurrently.", + }, }, } } @@ -107,37 +120,43 @@ func resourceFastlyKVStoreUpdate(_ context.Context, _ *schema.ResourceData, _ an func resourceFastlyKVStoreDelete(_ context.Context, d *schema.ResourceData, meta any) diag.Diagnostics { conn := meta.(*APIClient).conn - if !d.Get("force_destroy").(bool) { - mayDelete, err := isKVStoreEmpty(d.Id(), conn) - if err != nil { - return diag.FromErr(err) - } + storeEmpty, err := isKVStoreEmpty(d.Id(), conn) + if err != nil { + return diag.FromErr(err) + } - if !mayDelete { + if !storeEmpty { + if !d.Get("force_destroy").(bool) { return diag.FromErr(fmt.Errorf("cannot delete KV Store (%s), it is not empty. Either delete the entries first, or set force_destroy to true and apply it before making this change", d.Id())) } - } - - // IMPORTANT: We must delete all keys first before we can delete the store. - p := conn.NewListKVStoreKeysPaginator(&gofastly.ListKVStoreKeysInput{ - StoreID: d.Id(), - }) - for p.Next() { - keys := p.Keys() - sort.Strings(keys) - for _, key := range keys { - err := conn.DeleteKVStoreKey(&gofastly.DeleteKVStoreKeyInput{ - StoreID: d.Id(), - Key: key, - }) - if err != nil { - return diag.FromErr(fmt.Errorf("error during KV Store key cleanup: %w", err)) - } + maxErrors := d.Get("delete_keys_max_errors").(int) + poolSize := d.Get("delete_keys_pool_size").(int) + err := deleteAllKVStoreKeys(conn, d.Id(), maxErrors, poolSize) + if err != nil { + return diag.FromErr(fmt.Errorf("failed to delete all KV Store keys: %w", err)) } } - if err := p.Err(); err != nil { - return diag.FromErr(fmt.Errorf("error during KV Store cleanup pagination: %w", err)) - } + + // p := conn.NewListKVStoreKeysPaginator(&gofastly.ListKVStoreKeysInput{ + // StoreID: d.Id(), + // }) + // for p.Next() { + // keys := p.Keys() + // sort.Strings(keys) + // for _, key := range keys { + // err := conn.DeleteKVStoreKey(&gofastly.DeleteKVStoreKeyInput{ + // StoreID: d.Id(), + // Key: key, + // }) + // if err != nil { + // return diag.FromErr(fmt.Errorf("error during KV Store key cleanup: %w", err)) + // } + // } + // } + // if err := p.Err(); err != nil { + // return diag.FromErr(fmt.Errorf("error during KV Store cleanup pagination: %w", err)) + // } + // input := gofastly.DeleteKVStoreInput{ StoreID: d.Id(), @@ -145,13 +164,110 @@ func resourceFastlyKVStoreDelete(_ context.Context, d *schema.ResourceData, meta log.Printf("[DEBUG] DELETE: KV Store input: %#v", input) - err := conn.DeleteKVStore(&input) + err = conn.DeleteKVStore(&input) if err != nil { return diag.FromErr(err) } return nil } +// deleteAllKVStoreKeys deletes all keys within the specified KV Store. +func deleteAllKVStoreKeys(conn *gofastly.Client, storeID string, maxErrors, poolSize int) error { + p := conn.NewListKVStoreKeysPaginator(&fastly.ListKVStoreKeysInput{ + StoreID: storeID, + }) + + errorsCh := make(chan string, maxErrors) + keysCh := make(chan string, 1000) // number correlates to pagination page size + + var ( + failedKeys []string + mu sync.Mutex + wgProcessing sync.WaitGroup + wgErrorCh sync.WaitGroup + ) + + // We have three separate execution flows happening at once: + // + // 1. Pushing keys from pagination data into a key channel. + // 2. Pulling keys from error channel and appending to failedKeys slice. + // 3. Pulling keys from key channel and issuing API DELETE call. + // + // The second item is problematic, in that ranging over a channel only + // terminates when the channel is closed. So we need to ensure we close the + // errorsCh once we've finished processing the deletion of all the keys. + // + // To do that we need two sets of wait groups. + // + // The first is wgProcessing which keeps track of all goroutines related to + // processing the pagination data (e.g. the goroutine ranging over the + // paginator keys, and the goroutine ranging over the keysCh as part of the + // poolSize loop). + // + // The second wait group is wgErrorCh which tracks when the first + // (wgProcessing) has completed and then closes errorsCh. + + // The following goroutine finishes once all pagination keys have been + // processed. + wgProcessing.Add(1) + go func() { + defer wgProcessing.Done() + defer close(keysCh) + for p.Next() { + for _, key := range p.Keys() { + keysCh <- key + } + } + }() + + // The following goroutine finishes once the errorsCh is closed. + wgErrorCh.Add(1) + go func() { + defer wgErrorCh.Done() + for err := range errorsCh { + mu.Lock() + failedKeys = append(failedKeys, err) + mu.Unlock() + } + }() + + // The following goroutines close once they've pulled all data from keysCh. + for i := 1; i <= poolSize; i++ { + wgProcessing.Add(1) + go func() { + defer wgProcessing.Done() + for key := range keysCh { + err := conn.DeleteKVStoreKey(&fastly.DeleteKVStoreKeyInput{StoreID: storeID, Key: key}) + if err != nil { + select { + case errorsCh <- key: + default: + continue // the larger we make maxErrors the less likely we'll drop errors (obviously there's a memory trade-off to be made) + } + } + } + }() + } + + // The following goroutine is closed once the 'processing' goroutines are + // finished. + wgErrorCh.Add(1) + go func() { + defer wgErrorCh.Done() + wgProcessing.Wait() // Wait for all deletion and pagination tasks. + close(errorsCh) + }() + + // Wait for the error-handling goroutines to finish processing. + wgErrorCh.Wait() + + if len(failedKeys) > 0 { + return fmt.Errorf("failed to delete %d keys", len(failedKeys)) + } + + return nil +} + func isKVStoreEmpty(storeID string, conn *gofastly.Client) (bool, error) { keys, err := conn.ListKVStoreKeys(&gofastly.ListKVStoreKeysInput{ StoreID: storeID,