Skip to content

Commit

Permalink
WIP: Handle rate limiting errors for GitHub API
Browse files Browse the repository at this point in the history
* Optimistic concurrency model is followed to block goroutines for the least time possible and reduce rate limit errors leading to an integration ban

* Every server replica would maintain its own set of rate-limited tokens. There is no need to synchronize rate-limited tokens across server replicas. If a rate-limit error is returned, the token will be blocked, and the +1 request won't be a problem on the other replica

* Requests returning rate limit errors are retried after a backoff

Signed-off-by: Vyom-Yadav <[email protected]>
  • Loading branch information
Vyom-Yadav committed Feb 5, 2024
1 parent b6b4581 commit a0e7322
Show file tree
Hide file tree
Showing 25 changed files with 622 additions and 60 deletions.
2 changes: 2 additions & 0 deletions cmd/dev/app/container/cmd_verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (

"github.com/stacklok/minder/internal/db"
"github.com/stacklok/minder/internal/providers"
"github.com/stacklok/minder/internal/tokenstatus"
"github.com/stacklok/minder/internal/verifier"
"github.com/stacklok/minder/internal/verifier/sigstore"
"github.com/stacklok/minder/internal/verifier/sigstore/container"
Expand Down Expand Up @@ -125,6 +126,7 @@ func buildGitHubClient(ctx context.Context, token string) (provifv1.GitHub, erro
}`),
},
db.ProviderAccessToken{},
tokenstatus.NewStore(),
token,
)

Expand Down
2 changes: 2 additions & 0 deletions cmd/dev/app/rule_type/rttst.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
"github.com/stacklok/minder/internal/engine/eval/rego"
engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/providers"
"github.com/stacklok/minder/internal/tokenstatus"
"github.com/stacklok/minder/internal/util/jsonyaml"
minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
)
Expand Down Expand Up @@ -132,6 +133,7 @@ func testCmdRun(cmd *cobra.Command, _ []string) error {
}`),
},
db.ProviderAccessToken{},
tokenstatus.NewStore(),
token,
))
inf := &entities.EntityInfoWrapper{
Expand Down
9 changes: 6 additions & 3 deletions cmd/server/app/serve.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/stacklok/minder/internal/logger"
provtelemetry "github.com/stacklok/minder/internal/providers/telemetry"
"github.com/stacklok/minder/internal/reconcilers"
"github.com/stacklok/minder/internal/tokenstatus"
)

var serveCmd = &cobra.Command{
Expand Down Expand Up @@ -122,8 +123,10 @@ var serveCmd = &cobra.Command{
serverMetrics := controlplane.NewMetrics()
providerMetrics := provtelemetry.NewProviderMetrics()

tokenStatStore := tokenstatus.NewStore()

s, err := controlplane.NewServer(
store, evt, serverMetrics, cfg, vldtr,
store, evt, serverMetrics, cfg, tokenStatStore, vldtr,
controlplane.WithProviderMetrics(providerMetrics),
controlplane.WithAuthzClient(authzc),
)
Expand All @@ -137,7 +140,7 @@ var serveCmd = &cobra.Command{

tsmdw := logger.NewTelemetryStoreWMMiddleware(l)

exec, err := engine.NewExecutor(ctx, store, &cfg.Auth, evt,
exec, err := engine.NewExecutor(ctx, store, &cfg.Auth, tokenStatStore, evt,
engine.WithProviderMetrics(providerMetrics),
engine.WithMiddleware(aggr.AggregateMiddleware),
engine.WithMiddleware(tsmdw.TelemetryStoreMiddleware),
Expand All @@ -148,7 +151,7 @@ var serveCmd = &cobra.Command{

s.ConsumeEvents(exec)

rec, err := reconcilers.NewReconciler(store, evt, &cfg.Auth, reconcilers.WithProviderMetrics(providerMetrics))
rec, err := reconcilers.NewReconciler(store, evt, &cfg.Auth, tokenStatStore, reconcilers.WithProviderMetrics(providerMetrics))
if err != nil {
return fmt.Errorf("unable to create reconciler: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions internal/controlplane/handlers_githubwebhooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -367,7 +367,8 @@ func (s *Server) deleteWebhookFromRepository(
pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(s.provMt),
}
providerBuilder, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, pbOpts...)
providerBuilder, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store,
s.cryptoEngine, s.tokenStatStore, pbOpts...)
if err != nil {
return status.Errorf(codes.Internal, "cannot get provider builder: %v", err)
}
Expand Down Expand Up @@ -426,7 +427,8 @@ func (s *Server) parseGithubEventForProcessing(
pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(s.provMt),
}
provBuilder, err := providers.GetProviderBuilder(ctx, prov, dbRepo.ProjectID, s.store, s.cryptoEngine, pbOpts...)
provBuilder, err := providers.GetProviderBuilder(ctx, prov, dbRepo.ProjectID, s.store,
s.cryptoEngine, s.tokenStatStore, pbOpts...)
if err != nil {
return fmt.Errorf("error building client: %w", err)
}
Expand Down
4 changes: 2 additions & 2 deletions internal/controlplane/handlers_repositories.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (s *Server) RegisterRepository(ctx context.Context,
pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(s.provMt),
}
p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, pbOpts...)
p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, s.tokenStatStore, pbOpts...)
if err != nil {
return nil, status.Errorf(codes.Internal, "cannot get provider builder: %v", err)
}
Expand Down Expand Up @@ -405,7 +405,7 @@ func (s *Server) ListRemoteRepositoriesFromProvider(
pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(s.provMt),
}
p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, pbOpts...)
p, err := providers.GetProviderBuilder(ctx, provider, projectID, s.store, s.cryptoEngine, s.tokenStatStore, pbOpts...)
if err != nil {
return nil, status.Errorf(codes.Internal, "cannot get provider builder: %v", err)
}
Expand Down
5 changes: 3 additions & 2 deletions internal/controlplane/handlers_user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/stacklok/minder/internal/crypto"
"github.com/stacklok/minder/internal/db"
"github.com/stacklok/minder/internal/events"
"github.com/stacklok/minder/internal/tokenstatus"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
)

Expand Down Expand Up @@ -230,7 +231,7 @@ func TestCreateUser_gRPC(t *testing.T) {
Auth: serverconfig.AuthConfig{
TokenKey: generateTokenKey(t),
},
}, mockJwtValidator)
}, tokenstatus.NewStore(), mockJwtValidator)
require.NoError(t, err, "failed to create test server")

resp, err := server.CreateUser(ctx, tc.req)
Expand Down Expand Up @@ -421,7 +422,7 @@ func TestDeleteUser_gRPC(t *testing.T) {
ClientSecret: "client-secret",
},
},
}, mockJwtValidator)
}, tokenstatus.NewStore(), mockJwtValidator)
require.NoError(t, err, "failed to create test server")

resp, err := server.DeleteUser(ctx, tc.req)
Expand Down
42 changes: 23 additions & 19 deletions internal/controlplane/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ import (
"github.com/stacklok/minder/internal/events"
"github.com/stacklok/minder/internal/logger"
provtelemetry "github.com/stacklok/minder/internal/providers/telemetry"
"github.com/stacklok/minder/internal/tokenstatus"
"github.com/stacklok/minder/internal/util"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
)
Expand All @@ -66,18 +67,19 @@ var (

// Server represents the controlplane server
type Server struct {
store db.Store
cfg *serverconfig.Config
evt *events.Eventer
mt *metrics
provMt provtelemetry.ProviderMetrics
grpcServer *grpc.Server
vldtr auth.JwtValidator
OAuth2 *oauth2.Config
ClientID string
ClientSecret string
authzClient authz.Client
cryptoEngine *crypto.Engine
store db.Store
cfg *serverconfig.Config
evt *events.Eventer
mt *metrics
provMt provtelemetry.ProviderMetrics
grpcServer *grpc.Server
vldtr auth.JwtValidator
OAuth2 *oauth2.Config
ClientID string
ClientSecret string
authzClient authz.Client
cryptoEngine *crypto.Engine
tokenStatStore *tokenstatus.Store

// Implementations for service registration
pb.UnimplementedHealthServiceServer
Expand Down Expand Up @@ -112,6 +114,7 @@ func NewServer(
evt *events.Eventer,
cpm *metrics,
cfg *serverconfig.Config,
tokenStatStore *tokenstatus.Store,
vldtr auth.JwtValidator,
opts ...ServerOption,
) (*Server, error) {
Expand All @@ -120,13 +123,14 @@ func NewServer(
return nil, fmt.Errorf("failed to create crypto engine: %w", err)
}
s := &Server{
store: store,
cfg: cfg,
evt: evt,
cryptoEngine: eng,
vldtr: vldtr,
mt: cpm,
provMt: provtelemetry.NewNoopMetrics(),
store: store,
cfg: cfg,
evt: evt,
cryptoEngine: eng,
tokenStatStore: tokenStatStore,
vldtr: vldtr,
mt: cpm,
provMt: provtelemetry.NewNoopMetrics(),
// TODO: this currently always returns authorized as a transitionary measure.
// When OpenFGA is fully rolled out, we may want to make this a hard error or set to false.
authzClient: &mock.NoopClient{Authorized: true},
Expand Down
3 changes: 2 additions & 1 deletion internal/controlplane/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (
mockjwt "github.com/stacklok/minder/internal/auth/mock"
serverconfig "github.com/stacklok/minder/internal/config/server"
"github.com/stacklok/minder/internal/events"
"github.com/stacklok/minder/internal/tokenstatus"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
)

Expand Down Expand Up @@ -87,7 +88,7 @@ func newDefaultServer(t *testing.T, mockStore *mockdb.MockStore) *Server {
defer ctrl.Finish()
mockJwt := mockjwt.NewMockJwtValidator(ctrl)

server, err := NewServer(mockStore, evt, NewMetrics(), c, mockJwt)
server, err := NewServer(mockStore, evt, NewMetrics(), c, tokenstatus.NewStore(), mockJwt)
require.NoError(t, err, "failed to create server")
return server
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/providers"
mock_ghclient "github.com/stacklok/minder/internal/providers/github/mock"
"github.com/stacklok/minder/internal/tokenstatus"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
)
Expand Down Expand Up @@ -65,6 +66,7 @@ func testGithubProviderBuilder(baseURL string) *providers.ProviderBuilder {
Definition: json.RawMessage(definitionJSON),
},
db.ProviderAccessToken{},
tokenstatus.NewStore(),
"token",
)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/providers"
mock_ghclient "github.com/stacklok/minder/internal/providers/github/mock"
"github.com/stacklok/minder/internal/tokenstatus"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
)
Expand Down Expand Up @@ -108,6 +109,7 @@ func testGithubProviderBuilder() *providers.ProviderBuilder {
Definition: json.RawMessage(definitionJSON),
},
db.ProviderAccessToken{},
tokenstatus.NewStore(),
"token",
)
}
Expand Down
2 changes: 2 additions & 0 deletions internal/engine/actions/remediate/remediate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/stacklok/minder/internal/engine/actions/remediate/rest"
engif "github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/providers"
"github.com/stacklok/minder/internal/tokenstatus"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
)
Expand All @@ -49,6 +50,7 @@ var (
}`),
},
db.ProviderAccessToken{},
tokenstatus.NewStore(),
"token",
)
)
Expand Down
4 changes: 4 additions & 0 deletions internal/engine/actions/remediate/rest/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (
"github.com/stacklok/minder/internal/db"
"github.com/stacklok/minder/internal/engine/interfaces"
"github.com/stacklok/minder/internal/providers"
"github.com/stacklok/minder/internal/tokenstatus"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
provifv1 "github.com/stacklok/minder/pkg/providers/v1"
)
Expand All @@ -55,6 +56,7 @@ var (
}`),
},
db.ProviderAccessToken{},
tokenstatus.NewStore(),
"token",
)
invalidProviderBuilder = providers.NewProviderBuilder(
Expand All @@ -70,6 +72,7 @@ var (
}`),
},
db.ProviderAccessToken{},
tokenstatus.NewStore(),
"token",
)
TestActionTypeValid interfaces.ActionType = "remediate-test"
Expand All @@ -94,6 +97,7 @@ func testGithubProviderBuilder(baseURL string) *providers.ProviderBuilder {
Definition: json.RawMessage(definitionJSON),
},
db.ProviderAccessToken{},
tokenstatus.NewStore(),
"token",
)
}
Expand Down
6 changes: 5 additions & 1 deletion internal/engine/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
minderlogger "github.com/stacklok/minder/internal/logger"
"github.com/stacklok/minder/internal/providers"
providertelemetry "github.com/stacklok/minder/internal/providers/telemetry"
"github.com/stacklok/minder/internal/tokenstatus"
pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
)

Expand All @@ -55,6 +56,7 @@ type Executor struct {
// terminationcontext is used to terminate the executor
// when the server is shutting down.
terminationcontext context.Context
tokenStatStore *tokenstatus.Store
}

// ExecutorOption is a function that modifies an executor
Expand All @@ -79,6 +81,7 @@ func NewExecutor(
ctx context.Context,
querier db.Store,
authCfg *serverconfig.AuthConfig,
tokenStatStore *tokenstatus.Store,
evt *events.Eventer,
opts ...ExecutorOption,
) (*Executor, error) {
Expand All @@ -91,6 +94,7 @@ func NewExecutor(
querier: querier,
crypteng: crypteng,
provMt: providertelemetry.NewNoopMetrics(),
tokenStatStore: tokenStatStore,
evt: evt,
executions: &sync.WaitGroup{},
terminationcontext: ctx,
Expand Down Expand Up @@ -190,7 +194,7 @@ func (e *Executor) prepAndEvalEntityEvent(ctx context.Context, inf *entities.Ent
pbOpts := []providers.ProviderBuilderOption{
providers.WithProviderMetrics(e.provMt),
}
cli, err := providers.GetProviderBuilder(ctx, provider, *projectID, e.querier, e.crypteng, pbOpts...)
cli, err := providers.GetProviderBuilder(ctx, provider, *projectID, e.querier, e.crypteng, e.tokenStatStore, pbOpts...)
if err != nil {
return fmt.Errorf("error building client: %w", err)
}
Expand Down
3 changes: 2 additions & 1 deletion internal/engine/executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ import (
"github.com/stacklok/minder/internal/engine/entities"
"github.com/stacklok/minder/internal/events"
"github.com/stacklok/minder/internal/logger"
"github.com/stacklok/minder/internal/tokenstatus"
"github.com/stacklok/minder/internal/util/testqueue"
minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1"
)
Expand Down Expand Up @@ -292,7 +293,7 @@ default allow = true`,

e, err := engine.NewExecutor(ctx, mockStore, &serverconfig.AuthConfig{
TokenKey: tokenKeyPath,
}, evt)
}, tokenstatus.NewStore(), evt)
require.NoError(t, err, "expected no error")

eiw := entities.NewEntityInfoWrapper().
Expand Down
Loading

0 comments on commit a0e7322

Please sign in to comment.