Skip to content

Commit

Permalink
Handle rate limiting errors for GitHub API
Browse files Browse the repository at this point in the history
* We employ an optimistic concurrency control model to minimize goroutine blocking and reduce rate limit errors, thus preventing integration bans

* Each server replica maintains its own set of rate-limited clients, eliminating the need for synchronization across replicas. If a rate limit error occurs, the client self-blocks and is added to a cache for future requests

* Requests that return rate limit errors are retried following a backoff period

* The current watermill pub-sub implementation being used, i.e. SQL and go channels, are both a single channel with head-of-line blocking, which means that blocking in middleware/handler blocks all processing. So, currently, if a client is blocked, the entire processing will be blocked. This will change when we shift to other pub-sub implementations and have multiple workers

Signed-off-by: Vyom-Yadav <[email protected]>
  • Loading branch information
Vyom-Yadav committed Feb 14, 2024
1 parent afac66b commit ebea814
Show file tree
Hide file tree
Showing 23 changed files with 625 additions and 58 deletions.
6 changes: 3 additions & 3 deletions cmd/dev/app/container/cmd_verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func runCmdVerify(cmd *cobra.Command, _ []string) error {

token := viper.GetString("auth.token")

ghcli, err := buildGitHubClient(context.Background(), token)
ghcli, err := buildGitHubClient(token)
if err != nil {
return fmt.Errorf("cannot build github client: %w", err)
}
Expand All @@ -107,7 +107,7 @@ func runCmdVerify(cmd *cobra.Command, _ []string) error {
return nil
}

func buildGitHubClient(ctx context.Context, token string) (provifv1.GitHub, error) {
func buildGitHubClient(token string) (provifv1.GitHub, error) {
pbuild := providers.NewProviderBuilder(
&db.Provider{
Name: "test",
Expand All @@ -126,5 +126,5 @@ func buildGitHubClient(ctx context.Context, token string) (provifv1.GitHub, erro
token,
)

return pbuild.GetGitHub(ctx)
return pbuild.GetGitHub()
}
8 changes: 7 additions & 1 deletion cmd/server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import (
"github.com/stacklok/minder/internal/engine"
"github.com/stacklok/minder/internal/events"
"github.com/stacklok/minder/internal/logger"
"github.com/stacklok/minder/internal/providers/ratecache"
provtelemetry "github.com/stacklok/minder/internal/providers/telemetry"
"github.com/stacklok/minder/internal/reconcilers"
)
Expand Down Expand Up @@ -121,11 +122,13 @@ var serveCmd = &cobra.Command{

serverMetrics := controlplane.NewMetrics()
providerMetrics := provtelemetry.NewProviderMetrics()
restClientCache := ratecache.NewRestClientCache()

s, err := controlplane.NewServer(
store, evt, serverMetrics, cfg, vldtr,
controlplane.WithProviderMetrics(providerMetrics),
controlplane.WithAuthzClient(authzc),
controlplane.WithRestClientCache(restClientCache),
)
if err != nil {
return fmt.Errorf("unable to create server: %w", err)
Expand All @@ -141,14 +144,17 @@ var serveCmd = &cobra.Command{
engine.WithProviderMetrics(providerMetrics),
engine.WithMiddleware(aggr.AggregateMiddleware),
engine.WithMiddleware(tsmdw.TelemetryStoreMiddleware),
engine.WithRestClientCache(restClientCache),
)
if err != nil {
return fmt.Errorf("unable to create executor: %w", err)
}

s.ConsumeEvents(exec)

rec, err := reconcilers.NewReconciler(store, evt, &cfg.Auth, reconcilers.WithProviderMetrics(providerMetrics))
rec, err := reconcilers.NewReconciler(store, evt, &cfg.Auth,
reconcilers.WithProviderMetrics(providerMetrics),
reconcilers.WithRestClientCache(restClientCache))
if err != nil {
return fmt.Errorf("unable to create reconciler: %w", err)
}
Expand Down
10 changes: 6 additions & 4 deletions internal/controlplane/handlers_githubwebhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ func (s *Server) registerWebhookForRepository(
return nil, fmt.Errorf("provider %s is not supported for github webhook", pbuild.GetName())
}

client, err := pbuild.GetGitHub(ctx)
client, err := pbuild.GetGitHub()
if err != nil {
return nil, fmt.Errorf("error creating github provider: %w", err)
}
Expand Down Expand Up @@ -349,13 +349,14 @@ func (s *Server) deleteWebhookFromRepository(
) error {
pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(s.provMt),
providers.WithRestClientCache(s.restClientCache),
}
providerBuilder, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, pbOpts...)
if err != nil {
return status.Errorf(codes.Internal, "cannot get provider builder: %v", err)
}

client, err := providerBuilder.GetGitHub(ctx)
client, err := providerBuilder.GetGitHub()
if err != nil {
return status.Errorf(codes.Internal, "cannot create github client: %v", err)
}
Expand Down Expand Up @@ -408,6 +409,7 @@ func (s *Server) parseGithubEventForProcessing(

pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(s.provMt),
providers.WithRestClientCache(s.restClientCache),
}
provBuilder, err := providers.GetProviderBuilder(ctx, prov, dbRepo.ProjectID, s.store, s.cryptoEngine, pbOpts...)
if err != nil {
Expand Down Expand Up @@ -471,7 +473,7 @@ func (s *Server) parseArtifactPublishedEvent(
return nil
}

cli, err := prov.GetGitHub(ctx)
cli, err := prov.GetGitHub()
if err != nil {
log.Printf("error creating github provider: %v", err)
return err
Expand Down Expand Up @@ -523,7 +525,7 @@ func parsePullRequestModEvent(
return nil
}

cli, err := prov.GetGitHub(ctx)
cli, err := prov.GetGitHub()
if err != nil {
log.Printf("error creating github provider: %v", err)
return nil
Expand Down
4 changes: 3 additions & 1 deletion internal/controlplane/handlers_repositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ func (s *Server) RegisterRepository(ctx context.Context,

pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(s.provMt),
providers.WithRestClientCache(s.restClientCache),
}
p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, pbOpts...)
if err != nil {
Expand Down Expand Up @@ -404,6 +405,7 @@ func (s *Server) ListRemoteRepositoriesFromProvider(

pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(s.provMt),
providers.WithRestClientCache(s.restClientCache),
}
p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, pbOpts...)
if err != nil {
Expand All @@ -414,7 +416,7 @@ func (s *Server) ListRemoteRepositoriesFromProvider(
return nil, util.UserVisibleError(codes.Unimplemented, "provider does not implement repository listing")
}

client, err := p.GetRepoLister(ctx)
client, err := p.GetRepoLister()
if err != nil {
return nil, status.Errorf(codes.Internal, "cannot create github client: %v", err)
}
Expand Down
33 changes: 21 additions & 12 deletions internal/controlplane/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ import (
"github.com/stacklok/minder/internal/db"
"github.com/stacklok/minder/internal/events"
"github.com/stacklok/minder/internal/logger"
"github.com/stacklok/minder/internal/providers/ratecache"
provtelemetry "github.com/stacklok/minder/internal/providers/telemetry"
"github.com/stacklok/minder/internal/util"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
Expand All @@ -66,18 +67,19 @@ var (

// Server represents the controlplane server
type Server struct {
store db.Store
cfg *serverconfig.Config
evt *events.Eventer
mt *metrics
provMt provtelemetry.ProviderMetrics
grpcServer *grpc.Server
vldtr auth.JwtValidator
OAuth2 *oauth2.Config
ClientID string
ClientSecret string
authzClient authz.Client
cryptoEngine *crypto.Engine
store db.Store
cfg *serverconfig.Config
evt *events.Eventer
mt *metrics
provMt provtelemetry.ProviderMetrics
grpcServer *grpc.Server
vldtr auth.JwtValidator
OAuth2 *oauth2.Config
ClientID string
ClientSecret string
authzClient authz.Client
cryptoEngine *crypto.Engine
restClientCache ratecache.RestClientCache

// Implementations for service registration
pb.UnimplementedHealthServiceServer
Expand Down Expand Up @@ -107,6 +109,13 @@ func WithAuthzClient(c authz.Client) ServerOption {
}
}

// WithRestClientCache sets the rest client cache for the server
func WithRestClientCache(c ratecache.RestClientCache) ServerOption {
return func(s *Server) {
s.restClientCache = c
}
}

// NewServer creates a new server instance
func NewServer(
store db.Store,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,7 +152,7 @@ func NewSecurityAdvisoryAlert(
return nil, fmt.Errorf("cannot parse description template: %w", err)
}
// Get the GitHub client
cli, err := pbuild.GetGitHub(context.Background())
cli, err := pbuild.GetGitHub()
if err != nil {
return nil, fmt.Errorf("cannot get http client: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ func NewGhBranchProtectRemediator(
return nil, fmt.Errorf("cannot parse patch template: %w", err)
}

cli, err := pbuild.GetGitHub(context.Background())
cli, err := pbuild.GetGitHub()
if err != nil {
return nil, fmt.Errorf("cannot get http client: %w", err)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func NewPullRequestRemediate(
return nil, fmt.Errorf("cannot parse body template: %w", err)
}

ghCli, err := pbuild.GetGitHub(context.Background())
ghCli, err := pbuild.GetGitHub()
if err != nil {
return nil, fmt.Errorf("failed to get github client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/actions/remediate/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ func NewRestRemediate(actionType interfaces.ActionType, restCfg *pb.RestType,

method := util.HttpMethodFromString(restCfg.Method, http.MethodPatch)

cli, err := pbuild.GetHTTP(context.Background())
cli, err := pbuild.GetHTTP()
if err != nil {
return nil, fmt.Errorf("cannot get http client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/eval/trusty/trusty.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func NewTrustyEvaluator(
return nil, fmt.Errorf("endpoint is not set")
}

ghcli, err := pbuild.GetGitHub(context.Background())
ghcli, err := pbuild.GetGitHub()
if err != nil {
return nil, fmt.Errorf("failed to get github client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/eval/vulncheck/vulncheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func NewVulncheckEvaluator(_ *pb.RuleType_Definition_Eval_Vulncheck, pbuild *pro
return nil, fmt.Errorf("provider builder is nil")
}

ghcli, err := pbuild.GetGitHub(context.Background())
ghcli, err := pbuild.GetGitHub()
if err != nil {
return nil, fmt.Errorf("failed to get github client: %w", err)
}
Expand Down
10 changes: 10 additions & 0 deletions internal/engine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/stacklok/minder/internal/events"
minderlogger "github.com/stacklok/minder/internal/logger"
"github.com/stacklok/minder/internal/providers"
"github.com/stacklok/minder/internal/providers/ratecache"
providertelemetry "github.com/stacklok/minder/internal/providers/telemetry"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
)
Expand All @@ -55,6 +56,7 @@ type Executor struct {
// terminationcontext is used to terminate the executor
// when the server is shutting down.
terminationcontext context.Context
restClientCache ratecache.RestClientCache
}

// ExecutorOption is a function that modifies an executor
Expand All @@ -74,6 +76,13 @@ func WithMiddleware(mdw message.HandlerMiddleware) ExecutorOption {
}
}

// WithRestClientCache sets the rest client cache for the executor
func WithRestClientCache(cache ratecache.RestClientCache) ExecutorOption {
return func(e *Executor) {
e.restClientCache = cache
}
}

// NewExecutor creates a new executor
func NewExecutor(
ctx context.Context,
Expand Down Expand Up @@ -189,6 +198,7 @@ func (e *Executor) prepAndEvalEntityEvent(ctx context.Context, inf *entities.Ent

pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(e.provMt),
providers.WithRestClientCache(e.restClientCache),
}
cli, err := providers.GetProviderBuilder(ctx, provider, *projectID, e.querier, e.crypteng, pbOpts...)
if err != nil {
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/ingester/artifact/artifact.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ func NewArtifactDataIngest(
pbuild *providers.ProviderBuilder,
) (*Ingest, error) {

ghCli, err := pbuild.GetGitHub(context.Background())
ghCli, err := pbuild.GetGitHub()
if err != nil {
return nil, fmt.Errorf("failed to get github client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/ingester/diff/diff.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewDiffIngester(
return nil, fmt.Errorf("provider builder is nil")
}

cli, err := pbuild.GetGitHub(context.Background())
cli, err := pbuild.GetGitHub()
if err != nil {
return nil, fmt.Errorf("failed to get github client: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion internal/engine/ingester/rest/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ func NewRestRuleDataIngest(

method := util.HttpMethodFromString(restCfg.Method, http.MethodGet)

cli, err := pbuild.GetHTTP(context.Background())
cli, err := pbuild.GetHTTP()
if err != nil {
return nil, fmt.Errorf("cannot get http client: %w", err)
}
Expand Down
Loading

0 comments on commit ebea814

Please sign in to comment.