Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix asoctl aborting import when an error occurs #3212

Merged
merged 7 commits into from
Aug 29, 2023
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
96 changes: 51 additions & 45 deletions v2/cmd/asoctl/internal/importing/importable_arm_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package importing
import (
"context"
"fmt"
kerrors "k8s.io/apimachinery/pkg/util/errors"
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: Wrong ordering, should be below with the non-stdlib imports.

"net/http"
"reflect"
"strings"
Expand Down Expand Up @@ -68,7 +69,7 @@ func NewImportableARMResource(
}

// GroupKind returns the GroupKind of the resource being imported.
// (may be empty if the GK can't be determined)
// Returned value may be empty if the GK can't be determined.
func (i *importableARMResource) GroupKind() schema.GroupKind {
gk, _ := FindGroupKindForResourceType(i.armID.ResourceType.String())
return gk
Expand All @@ -94,11 +95,44 @@ func (i *importableARMResource) Resource() genruntime.MetaObject {
// ctx is the context to use for the import.
// Returns a slice of child resources needing to be imported (if any), and/or an error.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

minor: fix docs

// Both are returned to allow returning partial results in the case of a partial failure.
func (i *importableARMResource) Import(ctx context.Context, bar *mpb.Bar) ([]ImportableResource, error) {
var ref genruntime.ResourceReference
ref, err := i.importResource(ctx, i.armID)
func (i *importableARMResource) Import(ctx context.Context, bar *mpb.Bar) error {
// Create an importable blank object into which we capture the current state of the resource
importable, err := i.createImportableObjectFromID(i.owner, i.armID)
if err != nil {
// Error doesn't need additional context
return err
}

loader := i.createImportFunction(importable)
result, err := loader(ctx, importable, i.owner)
if err != nil {
return nil, err
return err
}

if because, skipped := result.Skipped(); skipped {
gk := importable.GetObjectKind().GroupVersionKind().GroupKind()
return NewImportSkippedError(gk, i.armID.Name, because)
}

i.resource = importable
bar.SetCurrent(1)

return nil
}

func (i *importableARMResource) FindChildren(ctx context.Context, bar *mpb.Bar) ([]ImportableResource, error) {
if i.resource == nil {
// Nothing to do
return nil, nil
}

gvk := i.resource.GetObjectKind().GroupVersionKind()

ref := genruntime.ResourceReference{
Group: gvk.Group,
Kind: gvk.Kind,
Name: i.resource.GetName(),
ARMID: i.armID.String(),
}

var result []ImportableResource
Expand All @@ -120,56 +154,28 @@ func (i *importableARMResource) Import(ctx context.Context, bar *mpb.Bar) ([]Imp
total := int64(len(childTypes) + 1)
bar.SetTotal(total, false)

bar.SetCurrent(1)

// While we're looking for subresources, we need to treat any errors that occur as independent.
// Some potential subresource types can have limited accessibility (e.g. the subscriber may not
// be onboarded to a preview API), so we don't want to fail the entire import if we can't import
// a single candidate subresource type.
var errs []error
for _, subType := range childTypes {
subResources, err := i.importChildResources(ctx, ref, subType)
if err != nil {
gk, _ := FindGroupKindForResourceType(subType) // If this was going to error, it would have already
return nil, errors.Wrapf(err, "importing %s/%s for resource %s", gk.Group, gk.Kind, i.armID)
errs = append(errs, errors.Wrapf(err, "importing %s/%s", gk.Group, gk.Kind))
continue
}

result = append(result, subResources...)
bar.Increment()
}

return result, nil
}

// importResource imports the actual resource, returning a reference to the resource
func (i *importableARMResource) importResource(
ctx context.Context,
id *arm.ResourceID,
) (genruntime.ResourceReference, error) {
// Create an importable blank object into which we capture the current state of the resource
importable, err := i.createImportableObjectFromID(i.owner, id)
if err != nil {
// Error doesn't need additional context
return genruntime.ResourceReference{}, err
}

loader := i.createImportFunction(importable)
result, err := loader(ctx, importable, i.owner)
if err != nil {
return genruntime.ResourceReference{}, err
}

if because, skipped := result.Skipped(); skipped {
gk := importable.GetObjectKind().GroupVersionKind().GroupKind()
return genruntime.ResourceReference{}, NewImportSkippedError(gk, id.Name, because)
}

gvk := importable.GetObjectKind().GroupVersionKind()
i.resource = importable

ref := genruntime.ResourceReference{
Group: gvk.Group,
Kind: gvk.Kind,
Name: importable.GetName(),
ARMID: i.armID.String(),
}

return ref, nil
return result,
errors.Wrapf(
kerrors.NewAggregate(errs),
"importing childresources of %s",
i.armID)
}

func (i *importableARMResource) createImportFunction(
Expand Down
8 changes: 7 additions & 1 deletion v2/cmd/asoctl/internal/importing/importable_resource.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,13 @@ type ImportableResource interface {
// Import does the actual import, updating the Spec on the wrapped resource.
// ctx allows for cancellation of the import.
// If there are any additional resources that also need to be imported, they should be returned.
Import(ctx context.Context, bar *mpb.Bar) ([]ImportableResource, error)
Import(ctx context.Context, bar *mpb.Bar) error

// FindChildren returns any child resources that need to be imported.
// ctx allows for cancellation of the import.
// Returns any additional resources that also need to be imported, as well as any errors that occur.
// Partial success is allowed, but the caller should be notified of any errors.
FindChildren(ctx context.Context, bar *mpb.Bar) ([]ImportableResource, error)
}

// importableResource is a core of common data and support methods for implementing ImportableResource
Expand Down
30 changes: 25 additions & 5 deletions v2/cmd/asoctl/internal/importing/resource_importer.go
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,10 @@ func (ri *ResourceImporter) ImportResource(ctx context.Context, rsrc ImportableR
bar.SetTotal(bar.Current(), true)
}()

pending, err := rsrc.Import(ctx, bar)
err := rsrc.Import(ctx, bar)
if err != nil {
// Something went wrong importing this resource.
// If it was simply skipped, we keep track of it but don't need to return an error
var skipped *ImportSkippedError
if errors.As(err, &skipped) {
ri.log.V(1).Info(
Expand All @@ -209,15 +211,25 @@ func (ri *ResourceImporter) ImportResource(ctx context.Context, rsrc ImportableR
return errors.Errorf("failed during import of %s", rsrc.Name())
}

// Successfully imported the resource,
// so now we look for any child resources that need importing too

ri.log.Info(
"Imported",
"kind", gk,
"name", rsrc.Name())

ri.report.AddSuccessfulImport(rsrc)
ri.Complete(rsrc)

ri.Complete(rsrc, pending)
return nil
pending, err := rsrc.FindChildren(ctx, bar)

// We always queue pending resources, even if there was an error looking
// for them, because we don't want an error in one part of a resource tree
// to impair importing other parts
ri.AddPending(pending)

return err
}

// DequeueResource returns the next resource to import.
Expand All @@ -240,13 +252,21 @@ func (ri *ResourceImporter) DequeueResource() (ImportableResource, bool) {
return importer, true
}

func (ri *ResourceImporter) Complete(importer ImportableResource, pending []ImportableResource) {
func (ri *ResourceImporter) Complete(importer ImportableResource) {
// Lock while we're modifying the maps
ri.lock.Lock()
defer ri.lock.Unlock()

// Add it to our map and our queue
// Add it to our map of completed importers
ri.imported[importer.Id()] = importer
}

func (ri *ResourceImporter) AddPending(pending []ImportableResource) {
// Lock while we're modifying the maps
ri.lock.Lock()
defer ri.lock.Unlock()

// Add to our queue of pending imports
for _, p := range pending {
ri.addImpl(p)
}
Expand Down