Skip to content

Commit

Permalink
notifier: do AffectedManifests calls in chunks
Browse files Browse the repository at this point in the history
Signed-off-by: Hank Donnay <[email protected]>
  • Loading branch information
hdonnay committed Mar 10, 2021
1 parent bd50a95 commit 9962202
Show file tree
Hide file tree
Showing 2 changed files with 71 additions and 21 deletions.
61 changes: 54 additions & 7 deletions notifier/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/quay/claircore/libvuln/driver"
"github.com/quay/claircore/pkg/distlock"
"github.com/rs/zerolog"
"golang.org/x/sync/errgroup"

clairerror "github.com/quay/clair/v4/clair-error"
"github.com/quay/clair/v4/indexer"
Expand Down Expand Up @@ -71,7 +72,7 @@ func (p *Processor) process(ctx context.Context, c <-chan Event) {
for {
select {
case <-ctx.Done():
log.Info().Msg("ctx canceld. ending event processing")
log.Info().Msg("context canceled: ending event processing")
case e := <-c:
uoid := e.uo.Ref.String()
log := zerolog.Ctx(ctx).With().
Expand Down Expand Up @@ -124,14 +125,22 @@ func (p *Processor) create(ctx context.Context, e Event, prev uuid.UUID) error {
return fmt.Errorf("failed to get update diff: %v", err)
}
log.Debug().Int("removed", len(diff.Removed)).Int("added", len(diff.Added)).Msg("diff results")
added, err := p.indexer.AffectedManifests(ctx, diff.Added)
if err != nil {
return fmt.Errorf("failed to get added affected manifests: %v", err)

added := &claircore.AffectedManifests{
Vulnerabilities: make(map[string]*claircore.Vulnerability),
VulnerableManifests: make(map[string][]string),
}
removed, err := p.indexer.AffectedManifests(ctx, diff.Removed)
if err != nil {
return fmt.Errorf("failed to get removed affected manifests: %v", err)
removed := &claircore.AffectedManifests{
Vulnerabilities: make(map[string]*claircore.Vulnerability),
VulnerableManifests: make(map[string][]string),
}
eg, wctx := errgroup.WithContext(ctx)
eg.Go(getAffected(wctx, p.indexer, diff.Added, added))
eg.Go(getAffected(wctx, p.indexer, diff.Removed, removed))
if err := eg.Wait(); err != nil {
return fmt.Errorf("failed to get affected manifests: %v", err)
}

log.Debug().Int("added", len(added.VulnerableManifests)).Int("removed", len(removed.VulnerableManifests)).Msg("affected manifest counts")

if len(added.VulnerableManifests) == 0 && len(removed.VulnerableManifests) == 0 {
Expand Down Expand Up @@ -197,6 +206,44 @@ func (p *Processor) create(ctx context.Context, e Event, prev uuid.UUID) error {
return nil
}

func min(a, b int) int {
if a < b {
return a
}
return b
}

// GetAffected issues AffectedManifest calls in chunks and merges the result.
//
// Its signature is weird to make use in an errgroup a little bit nicer.
func getAffected(ctx context.Context, ic indexer.Service, vs []claircore.Vulnerability, out *claircore.AffectedManifests) func() error {
const chunk = 1000
return func() error {
var s []claircore.Vulnerability
for len(vs) > 0 {
select {
case <-ctx.Done():
return ctx.Err()
default:
}
s = vs[:min(chunk, len(vs))]
vs = vs[len(s):]
a, err := ic.AffectedManifests(ctx, s)
if err != nil {
return err
}
for k, v := range a.Vulnerabilities {
out.Vulnerabilities[k] = v
}
for k, v := range a.VulnerableManifests {
out.VulnerableManifests[k] = append(out.VulnerableManifests[k], v...)
}
}
out.Sort()
return nil
}
}

// safe guards against situations where creating notifications is
// incorrect.
//
Expand Down
31 changes: 17 additions & 14 deletions notifier/processor_create_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,11 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/google/go-cmp/cmp/cmpopts"
"github.com/google/uuid"
"github.com/quay/clair/v4/indexer"
"github.com/quay/clair/v4/matcher"
"github.com/quay/claircore"
"github.com/quay/claircore/libvuln/driver"

"github.com/quay/clair/v4/indexer"
"github.com/quay/clair/v4/matcher"
)

var (
Expand Down Expand Up @@ -84,8 +85,8 @@ func testProcessorStoreErr(t *testing.T) {
mm := &matcher.Mock{
UpdateDiff_: func(context.Context, uuid.UUID, uuid.UUID) (*driver.UpdateDiff, error) {
return &driver.UpdateDiff{
Added: []claircore.Vulnerability{},
Removed: []claircore.Vulnerability{},
Added: []claircore.Vulnerability{*vulnAdd},
Removed: []claircore.Vulnerability{*vulnRemoved},
}, nil
},
}
Expand Down Expand Up @@ -129,8 +130,8 @@ func testProcessorIndexerErr(t *testing.T) {
mm := &matcher.Mock{
UpdateDiff_: func(context.Context, uuid.UUID, uuid.UUID) (*driver.UpdateDiff, error) {
return &driver.UpdateDiff{
Added: []claircore.Vulnerability{},
Removed: []claircore.Vulnerability{},
Added: []claircore.Vulnerability{*vulnAdd},
Removed: []claircore.Vulnerability{*vulnRemoved},
}, nil
},
}
Expand Down Expand Up @@ -209,23 +210,25 @@ func testProcessorCreate(t *testing.T) {
mm := &matcher.Mock{
UpdateDiff_: func(context.Context, uuid.UUID, uuid.UUID) (*driver.UpdateDiff, error) {
return &driver.UpdateDiff{
Added: []claircore.Vulnerability{},
Removed: []claircore.Vulnerability{},
Added: []claircore.Vulnerability{*vulnAdd},
Removed: []claircore.Vulnerability{*vulnRemoved},
}, nil
},
}
count := 0
im := &indexer.Mock{
AffectedManifests_: func(ctx context.Context, vulns []claircore.Vulnerability) (*claircore.AffectedManifests, error) {
switch count {
case 0:
count++
if count > 1 {
return nil, fmt.Errorf("unexpected number of calls")
}
count++
switch vulns[0].ID {
case "0":
return affectedManifestsAdd, nil
case 1:
case "1":
return affectedManifestsRemoved, nil
default:
return nil, fmt.Errorf("unexpected number of calls")
}
return nil, fmt.Errorf("unexpected call")
},
}
// perform bulk of checks in this mock method.
Expand Down

0 comments on commit 9962202

Please sign in to comment.