Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds concurrency to hydration requests #304

Merged
merged 8 commits into from
Mar 22, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ require (
github.com/urfave/cli/v2 v2.25.0
golang.org/x/exp v0.0.0-20230314191032-db074128a8ec
golang.org/x/mod v0.9.0
golang.org/x/sync v0.1.0
golang.org/x/term v0.6.0
golang.org/x/tools v0.7.0
golang.org/x/vuln v0.0.0-20230303230808-d3042fecc4e3
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o=
golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20191026070338-33540a1f6037/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200302150141-5c8b2ff67527/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
Expand Down
54 changes: 44 additions & 10 deletions pkg/osv/osv.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,17 @@ package osv

import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"log"
"net/http"
"time"

"github.com/google/osv-scanner/pkg/lockfile"
"github.com/google/osv-scanner/pkg/models"
"golang.org/x/sync/semaphore"
)

const (
Expand All @@ -21,7 +24,8 @@ const (
BaseVulnerabilityURL = "https://osv.dev/"
// maxQueriesPerRequest splits up querybatch into multiple requests if
// number of queries exceed this number
maxQueriesPerRequest = 1000
maxQueriesPerRequest = 1000
maxConcurrentRequests = 25
)

// Package represents a package identifier for OSV.
Expand Down Expand Up @@ -188,20 +192,50 @@ func Get(id string) (*models.Vulnerability, error) {
// Hydrate fills the results of the batched response with the full
// Vulnerability details.
func Hydrate(resp *BatchedResponse) (*HydratedBatchedResponse, error) {
// TODO(ochang): Parallelize requests, or implement batch GET.
hydrated := HydratedBatchedResponse{}
ctx := context.TODO()
// Preallocate the array to avoid slice reallocations when inserting later
hydrated.Results = make([]Response, len(resp.Results))
for idx := range hydrated.Results {
hydrated.Results[idx].Vulns =
make([]models.Vulnerability, len(resp.Results[idx].Vulns))
}

errChan := make(chan error)
rateLimiter := semaphore.NewWeighted(maxConcurrentRequests)

for _, response := range resp.Results {
result := Response{}
for _, vuln := range response.Vulns {
vuln, err := Get(vuln.ID)
if err != nil {
return nil, err
for batchIdx, response := range resp.Results {
for resultIdx, vuln := range response.Vulns {
if err := rateLimiter.Acquire(ctx, 1); err != nil {
log.Panicf("Failed to acquire semaphore: %v", err)
}

result.Vulns = append(result.Vulns, *vuln)
go func(id string, batchIdx int, resultIdx int) {
vuln, err := Get(id)
if err != nil {
errChan <- err
} else {
hydrated.Results[batchIdx].Vulns[resultIdx] = *vuln
}

rateLimiter.Release(1)
}(vuln.ID, batchIdx, resultIdx)
}
}

// Close error channel when all semaphores are released
go func() {
if err := rateLimiter.Acquire(ctx, maxConcurrentRequests); err != nil {
log.Panicf("Failed to acquire semaphore: %v", err)
}
hydrated.Results = append(hydrated.Results, result)
// Always close the error channel
close(errChan)
}()

// Range will exit when channel is closed.
// Channel will be closed when all semaphores are freed.
for err := range errChan {
return nil, err
}

return &hydrated, nil
Expand Down