From 23b1b30d8a091995e7d5f64915fc59265c6e159b Mon Sep 17 00:00:00 2001 From: Don Browne Date: Tue, 7 May 2024 15:46:05 +0100 Subject: [PATCH] Remove ProviderBuilder from engine Relates to #2845 Refactor engine to use ProviderManager. Various bits of refactoring along the way, including renaming some struct fields and variables for clarity. --- cmd/dev/app/rule_type/rttst.go | 59 ++++---- cmd/dev/app/testserver/testserver.go | 4 +- cmd/server/app/serve.go | 11 +- internal/engine/actions/actions.go | 15 ++- internal/engine/actions/alert/alert.go | 16 ++- .../gh_branch_protect_test.go | 3 +- .../pull_request/pull_request_test.go | 3 +- .../engine/actions/remediate/remediate.go | 40 +++--- .../actions/remediate/remediate_test.go | 87 +++++++----- .../actions/remediate/rest/rest_test.go | 3 +- internal/engine/eval/eval.go | 27 ++-- internal/engine/executor.go | 127 ++++-------------- internal/engine/executor_test.go | 55 +++++++- .../engine/ingester/artifact/artifact_test.go | 3 +- internal/engine/ingester/ingester.go | 21 +-- internal/engine/ingester/ingester_test.go | 44 ++---- internal/engine/ingester/rest/rest_test.go | 3 +- internal/engine/rule_type_engine.go | 66 ++++----- internal/service/service.go | 17 ++- 19 files changed, 292 insertions(+), 312 deletions(-) diff --git a/cmd/dev/app/rule_type/rttst.go b/cmd/dev/app/rule_type/rttst.go index 07ed931719..5cd56177c8 100644 --- a/cmd/dev/app/rule_type/rttst.go +++ b/cmd/dev/app/rule_type/rttst.go @@ -17,7 +17,6 @@ package rule_type import ( "bytes" "context" - "database/sql" "encoding/json" "fmt" "os" @@ -38,8 +37,10 @@ import ( "github.com/stacklok/minder/internal/engine/eval/rego" engif "github.com/stacklok/minder/internal/engine/interfaces" "github.com/stacklok/minder/internal/logger" - "github.com/stacklok/minder/internal/providers" "github.com/stacklok/minder/internal/providers/credentials" + "github.com/stacklok/minder/internal/providers/github/clients" + "github.com/stacklok/minder/internal/providers/ratecache" + "github.com/stacklok/minder/internal/providers/telemetry" "github.com/stacklok/minder/internal/util/jsonyaml" minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" ) @@ -97,24 +98,24 @@ func testCmdRun(cmd *cobra.Command, _ []string) error { cmd.Println("If the rule you're testing is rego-based, you will not be able to use `print` statements for debugging.") } - rt, err := readRuleTypeFromFile(rtpath.Value.String()) + ruletype, err := readRuleTypeFromFile(rtpath.Value.String()) if err != nil { return fmt.Errorf("error reading rule type from file: %w", err) } provider := "test" rootProject := "00000000-0000-0000-0000-000000000002" - rt.Context = &minderv1.Context{ + ruletype.Context = &minderv1.Context{ Provider: &provider, Project: &rootProject, } - ent, err := readEntityFromFile(epath.Value.String(), minderv1.EntityFromString(rt.Def.InEntity)) + ent, err := readEntityFromFile(epath.Value.String(), minderv1.EntityFromString(ruletype.Def.InEntity)) if err != nil { return fmt.Errorf("error reading entity from file: %w", err) } - p, err := engine.ReadProfileFromFile(ppath.Value.String()) + profile, err := engine.ReadProfileFromFile(ppath.Value.String()) if err != nil { return fmt.Errorf("error reading fragment from file: %w", err) } @@ -148,38 +149,30 @@ func testCmdRun(cmd *cobra.Command, _ []string) error { // Disable actions off := "off" - p.Alert = &off + profile.Alert = &off - rules, err := engine.GetRulesFromProfileOfType(p, rt) + rules, err := engine.GetRulesFromProfileOfType(profile, ruletype) if err != nil { return fmt.Errorf("error getting relevant fragment: %w", err) } - // TODO: Read this from a providers file instead so we can make it pluggable - eng, err := engine.NewRuleTypeEngine(context.Background(), p, rt, providers.NewProviderBuilder( - &db.Provider{ - Name: "test", - Version: "v1", - Implements: []db.ProviderType{ - "rest", - "repo-lister", - "git", - "github", - }, - Definition: json.RawMessage(`{ - "github-app": {} - }`), - }, - sql.NullString{}, - false, + // TODO: Whenever we add more Provider classes, we will need to rethink this + client, err := clients.NewGitHubAppProvider( + &minderv1.GitHubAppProviderConfig{}, + &serverconfig.GitHubAppConfig{AppName: "test"}, + &ratecache.NoopRestClientCache{}, credentials.NewGitHubTokenCredential(token), - &serverconfig.ProviderConfig{ - GitHubApp: &serverconfig.GitHubAppConfig{ - AppName: "test", - }, - }, - nil, // this is unused here - )) + nil, + clients.NewGitHubClientFactory(telemetry.NewNoopMetrics()), + false, + ) + if err != nil { + return fmt.Errorf("error instantiating github provider: %w", err) + } + + // TODO: use cobra context here + eng, err := engine.NewRuleTypeEngine(context.Background(), profile, ruletype, client) + inf := &entities.EntityInfoWrapper{ Entity: ent, ExecutionID: &uuid.Nil, @@ -189,7 +182,7 @@ func testCmdRun(cmd *cobra.Command, _ []string) error { } if len(rules) == 0 { - return fmt.Errorf("no rules found with type %s", rt.Name) + return fmt.Errorf("no rules found with type %s", ruletype.Name) } return runEvaluationForRules(cmd, eng, inf, remediateStatus, remMetadata, rules) diff --git a/cmd/dev/app/testserver/testserver.go b/cmd/dev/app/testserver/testserver.go index ea4f399e4c..15f9ac53a6 100644 --- a/cmd/dev/app/testserver/testserver.go +++ b/cmd/dev/app/testserver/testserver.go @@ -23,6 +23,7 @@ import ( "os" "os/signal" + "github.com/ThreeDotsLabs/watermill/message" "github.com/google/go-github/v61/github" "github.com/rs/zerolog" "github.com/spf13/cobra" @@ -35,7 +36,6 @@ import ( serverconfig "github.com/stacklok/minder/internal/config/server" "github.com/stacklok/minder/internal/controlplane/metrics" "github.com/stacklok/minder/internal/db/embedded" - "github.com/stacklok/minder/internal/engine" "github.com/stacklok/minder/internal/logger" "github.com/stacklok/minder/internal/providers/ratecache" provtelemetry "github.com/stacklok/minder/internal/providers/telemetry" @@ -101,6 +101,6 @@ func runTestServer(cmd *cobra.Command, _ []string) error { &auth.IdentityClient{}, metrics.NewNoopMetrics(), provtelemetry.NewNoopMetrics(), - []engine.ExecutorOption{}, + []message.HandlerMiddleware{}, ) } diff --git a/cmd/server/app/serve.go b/cmd/server/app/serve.go index fab866264b..46151941eb 100644 --- a/cmd/server/app/serve.go +++ b/cmd/server/app/serve.go @@ -21,6 +21,7 @@ import ( "os" "os/signal" + "github.com/ThreeDotsLabs/watermill/message" "github.com/rs/zerolog" "github.com/rs/zerolog/log" "github.com/spf13/cobra" @@ -33,7 +34,6 @@ import ( serverconfig "github.com/stacklok/minder/internal/config/server" cpmetrics "github.com/stacklok/minder/internal/controlplane/metrics" "github.com/stacklok/minder/internal/db" - "github.com/stacklok/minder/internal/engine" "github.com/stacklok/minder/internal/logger" "github.com/stacklok/minder/internal/providers/ratecache" provtelemetry "github.com/stacklok/minder/internal/providers/telemetry" @@ -118,12 +118,7 @@ var serveCmd = &cobra.Command{ restClientCache := ratecache.NewRestClientCache(ctx) defer restClientCache.Close() - tsmdw := logger.NewTelemetryStoreWMMiddleware(l) - executorOpts := []engine.ExecutorOption{ - engine.WithProviderMetrics(providerMetrics), - engine.WithMiddleware(tsmdw.TelemetryStoreMiddleware), - } - + telemetryMiddleware := logger.NewTelemetryStoreWMMiddleware(l) return service.AllInOneServerService( ctx, cfg, @@ -134,7 +129,7 @@ var serveCmd = &cobra.Command{ idClient, cpmetrics.NewMetrics(), providerMetrics, - executorOpts, + []message.HandlerMiddleware{telemetryMiddleware.TelemetryStoreMiddleware}, ) }, } diff --git a/internal/engine/actions/actions.go b/internal/engine/actions/actions.go index 5fc0821c84..5e67f5abf6 100644 --- a/internal/engine/actions/actions.go +++ b/internal/engine/actions/actions.go @@ -33,8 +33,8 @@ import ( "github.com/stacklok/minder/internal/engine/actions/remediate/pull_request" enginerr "github.com/stacklok/minder/internal/engine/errors" engif "github.com/stacklok/minder/internal/engine/interfaces" - "github.com/stacklok/minder/internal/providers" minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + provinfv1 "github.com/stacklok/minder/pkg/providers/v1" ) // RuleActionsEngine is the engine responsible for processing all actions i.e., remediation and alerts @@ -44,16 +44,19 @@ type RuleActionsEngine struct { } // NewRuleActions creates a new rule actions engine -func NewRuleActions(p *minderv1.Profile, rt *minderv1.RuleType, pbuild *providers.ProviderBuilder, +func NewRuleActions( + profile *minderv1.Profile, + ruletype *minderv1.RuleType, + provider provinfv1.Provider, ) (*RuleActionsEngine, error) { // Create the remediation engine - remEngine, err := remediate.NewRuleRemediator(rt, pbuild) + remEngine, err := remediate.NewRuleRemediator(ruletype, provider) if err != nil { return nil, fmt.Errorf("cannot create rule remediator: %w", err) } // Create the alert engine - alertEngine, err := alert.NewRuleAlert(rt, pbuild) + alertEngine, err := alert.NewRuleAlert(ruletype, provider) if err != nil { return nil, fmt.Errorf("cannot create rule alerter: %w", err) } @@ -66,8 +69,8 @@ func NewRuleActions(p *minderv1.Profile, rt *minderv1.RuleType, pbuild *provider // The on/off state of the actions is an integral part of the action engine // and should be set upon creation. actionsOnOff: map[engif.ActionType]engif.ActionOpt{ - remEngine.Class(): remEngine.GetOnOffState(p), - alertEngine.Class(): alertEngine.GetOnOffState(p), + remEngine.Class(): remEngine.GetOnOffState(profile), + alertEngine.Class(): alertEngine.GetOnOffState(profile), }, }, nil } diff --git a/internal/engine/actions/alert/alert.go b/internal/engine/actions/alert/alert.go index 78bdf33b9d..8b15d749c9 100644 --- a/internal/engine/actions/alert/alert.go +++ b/internal/engine/actions/alert/alert.go @@ -18,21 +18,25 @@ package alert import ( + "errors" "fmt" "github.com/stacklok/minder/internal/engine/actions/alert/noop" "github.com/stacklok/minder/internal/engine/actions/alert/security_advisory" engif "github.com/stacklok/minder/internal/engine/interfaces" - "github.com/stacklok/minder/internal/providers" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + provinfv1 "github.com/stacklok/minder/pkg/providers/v1" ) // ActionType is the type of the alert engine const ActionType engif.ActionType = "alert" // NewRuleAlert creates a new rule alert engine -func NewRuleAlert(rt *pb.RuleType, pbuild *providers.ProviderBuilder) (engif.Action, error) { - alertCfg := rt.Def.GetAlert() +func NewRuleAlert( + ruletype *pb.RuleType, + provider provinfv1.Provider, +) (engif.Action, error) { + alertCfg := ruletype.Def.GetAlert() if alertCfg == nil { return noop.NewNoopAlert(ActionType) } @@ -43,11 +47,11 @@ func NewRuleAlert(rt *pb.RuleType, pbuild *providers.ProviderBuilder) (engif.Act if alertCfg.GetSecurityAdvisory() == nil { return nil, fmt.Errorf("alert engine missing security-advisory configuration") } - client, err := pbuild.GetGitHub() + client, err := provinfv1.As[provinfv1.GitHub](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement git trait") } - return security_advisory.NewSecurityAdvisoryAlert(ActionType, rt.GetSeverity(), alertCfg.GetSecurityAdvisory(), client) + return security_advisory.NewSecurityAdvisoryAlert(ActionType, ruletype.GetSeverity(), alertCfg.GetSecurityAdvisory(), client) } return nil, fmt.Errorf("unknown alert type: %s", alertCfg.GetType()) diff --git a/internal/engine/actions/remediate/gh_branch_protect/gh_branch_protect_test.go b/internal/engine/actions/remediate/gh_branch_protect/gh_branch_protect_test.go index b9cefc23ad..12368f29be 100644 --- a/internal/engine/actions/remediate/gh_branch_protect/gh_branch_protect_test.go +++ b/internal/engine/actions/remediate/gh_branch_protect/gh_branch_protect_test.go @@ -31,6 +31,7 @@ import ( "github.com/stacklok/minder/internal/providers/credentials" "github.com/stacklok/minder/internal/providers/github/clients" mock_ghclient "github.com/stacklok/minder/internal/providers/github/mock" + "github.com/stacklok/minder/internal/providers/ratecache" "github.com/stacklok/minder/internal/providers/telemetry" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" provifv1 "github.com/stacklok/minder/pkg/providers/v1" @@ -55,7 +56,7 @@ func testGithubProvider(baseURL string) (provifv1.GitHub, error) { &pb.GitHubProviderConfig{ Endpoint: baseURL, }, - nil, + &ratecache.NoopRestClientCache{}, credentials.NewGitHubTokenCredential("token"), clients.NewGitHubClientFactory(telemetry.NewNoopMetrics()), "", diff --git a/internal/engine/actions/remediate/pull_request/pull_request_test.go b/internal/engine/actions/remediate/pull_request/pull_request_test.go index efeea4285e..608a29e401 100644 --- a/internal/engine/actions/remediate/pull_request/pull_request_test.go +++ b/internal/engine/actions/remediate/pull_request/pull_request_test.go @@ -43,6 +43,7 @@ import ( "github.com/stacklok/minder/internal/providers/credentials" "github.com/stacklok/minder/internal/providers/github/clients" mockghclient "github.com/stacklok/minder/internal/providers/github/mock" + "github.com/stacklok/minder/internal/providers/ratecache" "github.com/stacklok/minder/internal/providers/telemetry" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" provifv1 "github.com/stacklok/minder/pkg/providers/v1" @@ -92,7 +93,7 @@ func testGithubProvider() (provifv1.GitHub, error) { &pb.GitHubProviderConfig{ Endpoint: ghApiUrl + "/", }, - nil, + &ratecache.NoopRestClientCache{}, credentials.NewGitHubTokenCredential("token"), clients.NewGitHubClientFactory(telemetry.NewNoopMetrics()), "", diff --git a/internal/engine/actions/remediate/remediate.go b/internal/engine/actions/remediate/remediate.go index 9b965e2550..efccea79be 100644 --- a/internal/engine/actions/remediate/remediate.go +++ b/internal/engine/actions/remediate/remediate.go @@ -18,6 +18,7 @@ package remediate import ( + "errors" "fmt" "github.com/stacklok/minder/internal/engine/actions/remediate/gh_branch_protect" @@ -25,53 +26,56 @@ import ( "github.com/stacklok/minder/internal/engine/actions/remediate/pull_request" "github.com/stacklok/minder/internal/engine/actions/remediate/rest" engif "github.com/stacklok/minder/internal/engine/interfaces" - "github.com/stacklok/minder/internal/providers" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + provinfv1 "github.com/stacklok/minder/pkg/providers/v1" ) // ActionType is the type of the remediation engine const ActionType engif.ActionType = "remediate" // NewRuleRemediator creates a new rule remediator -func NewRuleRemediator(rt *pb.RuleType, pbuild *providers.ProviderBuilder) (engif.Action, error) { - rem := rt.Def.GetRemediate() - if rem == nil { +func NewRuleRemediator( + rt *pb.RuleType, + provider provinfv1.Provider, +) (engif.Action, error) { + remediate := rt.Def.GetRemediate() + if remediate == nil { return noop.NewNoopRemediate(ActionType) } // nolint:revive // let's keep the switch here, it would be nicer to extend a switch in the future - switch rem.GetType() { + switch remediate.GetType() { case rest.RemediateType: - client, err := pbuild.GetHTTP() + client, err := provinfv1.As[provinfv1.REST](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement rest trait") } - if rem.GetRest() == nil { + if remediate.GetRest() == nil { return nil, fmt.Errorf("remediations engine missing rest configuration") } - return rest.NewRestRemediate(ActionType, rem.GetRest(), client) + return rest.NewRestRemediate(ActionType, remediate.GetRest(), client) case gh_branch_protect.RemediateType: - client, err := pbuild.GetGitHub() + client, err := provinfv1.As[provinfv1.GitHub](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement git trait") } - if rem.GetGhBranchProtection() == nil { + if remediate.GetGhBranchProtection() == nil { return nil, fmt.Errorf("remediations engine missing gh_branch_protection configuration") } - return gh_branch_protect.NewGhBranchProtectRemediator(ActionType, rem.GetGhBranchProtection(), client) + return gh_branch_protect.NewGhBranchProtectRemediator(ActionType, remediate.GetGhBranchProtection(), client) case pull_request.RemediateType: - client, err := pbuild.GetGitHub() + client, err := provinfv1.As[provinfv1.GitHub](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement git trait") } - if rem.GetPullRequest() == nil { + if remediate.GetPullRequest() == nil { return nil, fmt.Errorf("remediations engine missing pull request configuration") } - return pull_request.NewPullRequestRemediate(ActionType, rem.GetPullRequest(), client) + return pull_request.NewPullRequestRemediate(ActionType, remediate.GetPullRequest(), client) } - return nil, fmt.Errorf("unknown remediation type: %s", rem.GetType()) + return nil, fmt.Errorf("unknown remediation type: %s", remediate.GetType()) } diff --git a/internal/engine/actions/remediate/remediate_test.go b/internal/engine/actions/remediate/remediate_test.go index c6aff20aa9..f6b97e5690 100644 --- a/internal/engine/actions/remediate/remediate_test.go +++ b/internal/engine/actions/remediate/remediate_test.go @@ -17,57 +17,36 @@ package remediate_test import ( - "database/sql" - "encoding/json" "testing" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" - serverconfig "github.com/stacklok/minder/internal/config/server" - "github.com/stacklok/minder/internal/db" "github.com/stacklok/minder/internal/engine/actions/remediate" "github.com/stacklok/minder/internal/engine/actions/remediate/noop" "github.com/stacklok/minder/internal/engine/actions/remediate/rest" engif "github.com/stacklok/minder/internal/engine/interfaces" - "github.com/stacklok/minder/internal/providers" "github.com/stacklok/minder/internal/providers/credentials" + "github.com/stacklok/minder/internal/providers/git" + httpclient "github.com/stacklok/minder/internal/providers/http" + "github.com/stacklok/minder/internal/providers/telemetry" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" provifv1 "github.com/stacklok/minder/pkg/providers/v1" ) var ( - simpleBodyTemplate = "{\"foo\": \"bar\"}" - validProviderBuilder = providers.NewProviderBuilder( - &db.Provider{ - Name: "github", - Version: provifv1.V1, - Implements: []db.ProviderType{ - db.ProviderTypeRest, - }, - Definition: json.RawMessage(`{ - "rest": { - "base_url": "https://api.github.com/" - } -}`), - }, - sql.NullString{}, - false, - credentials.NewGitHubTokenCredential("token"), - &serverconfig.ProviderConfig{}, - nil, // this is unused here - ) + simpleBodyTemplate = "{\"foo\": \"bar\"}" ) func TestNewRuleRemediator(t *testing.T) { t.Parallel() tests := []struct { - name string - ruleType *pb.RuleType - wantError bool - wantType engif.Action - provBuilder *providers.ProviderBuilder + name string + ruleType *pb.RuleType + wantError bool + wantType engif.Action + provider func() (provifv1.Provider, error) }{ { name: "Test Noop Remediate", @@ -91,9 +70,26 @@ func TestNewRuleRemediator(t *testing.T) { }, }, }, - provBuilder: validProviderBuilder, - wantError: false, // Expecting a NoopRemediate instance (or whichever condition you check for) - wantType: &rest.Remediator{}, + provider: HTTPProvider, + wantError: false, // Expecting a NoopRemediate instance (or whichever condition you check for) + wantType: &rest.Remediator{}, + }, + { + name: "Test REST Remediate with wrong provider type", + ruleType: &pb.RuleType{ + Def: &pb.RuleType_Definition{ + Remediate: &pb.RuleType_Definition_Remediate{ + Type: rest.RemediateType, + Rest: &pb.RestType{ + Method: "POST", + Endpoint: "{{.Profile.endpoint}}", + Body: &simpleBodyTemplate, + }, + }, + }, + }, + provider: GitProvider, + wantError: true, }, { name: "Test Rest Remediate Without Config", @@ -104,8 +100,8 @@ func TestNewRuleRemediator(t *testing.T) { }, }, }, - provBuilder: validProviderBuilder, - wantError: true, + provider: HTTPProvider, + wantError: true, }, { name: "Test made up remediator", @@ -127,7 +123,13 @@ func TestNewRuleRemediator(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - result, err := remediate.NewRuleRemediator(tt.ruleType, tt.provBuilder) + var err error + var provider provifv1.Provider + if tt.provider != nil { + provider, err = tt.provider() + require.NoError(t, err) + } + result, err := remediate.NewRuleRemediator(tt.ruleType, provider) if tt.wantError { require.Error(t, err) return @@ -138,3 +140,16 @@ func TestNewRuleRemediator(t *testing.T) { }) } } + +func HTTPProvider() (provifv1.Provider, error) { + cfg := pb.RESTProviderConfig{BaseUrl: "https://api.github.com/"} + return httpclient.NewREST( + &cfg, + telemetry.NewNoopMetrics(), + credentials.NewGitHubTokenCredential("token"), + ) +} + +func GitProvider() (provifv1.Provider, error) { + return git.NewGit(credentials.NewEmptyCredential()), nil +} diff --git a/internal/engine/actions/remediate/rest/rest_test.go b/internal/engine/actions/remediate/rest/rest_test.go index 4a4404f07b..8c111f61e0 100644 --- a/internal/engine/actions/remediate/rest/rest_test.go +++ b/internal/engine/actions/remediate/rest/rest_test.go @@ -34,6 +34,7 @@ import ( "github.com/stacklok/minder/internal/providers/credentials" "github.com/stacklok/minder/internal/providers/github/clients" httpclient "github.com/stacklok/minder/internal/providers/http" + "github.com/stacklok/minder/internal/providers/ratecache" "github.com/stacklok/minder/internal/providers/telemetry" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" provifv1 "github.com/stacklok/minder/pkg/providers/v1" @@ -55,7 +56,7 @@ func testGithubProvider(baseURL string) (provifv1.REST, error) { &pb.GitHubProviderConfig{ Endpoint: baseURL, }, - nil, + &ratecache.NoopRestClientCache{}, credentials.NewGitHubTokenCredential("token"), clients.NewGitHubClientFactory(telemetry.NewNoopMetrics()), "", diff --git a/internal/engine/eval/eval.go b/internal/engine/eval/eval.go index fbbbd9e44d..05ca911457 100644 --- a/internal/engine/eval/eval.go +++ b/internal/engine/eval/eval.go @@ -19,6 +19,7 @@ package eval import ( "context" + "errors" "fmt" "github.com/stacklok/minder/internal/engine/eval/homoglyphs/application" @@ -27,49 +28,49 @@ import ( "github.com/stacklok/minder/internal/engine/eval/trusty" "github.com/stacklok/minder/internal/engine/eval/vulncheck" engif "github.com/stacklok/minder/internal/engine/interfaces" - "github.com/stacklok/minder/internal/providers" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + provinfv1 "github.com/stacklok/minder/pkg/providers/v1" ) // NewRuleEvaluator creates a new rule data evaluator func NewRuleEvaluator( ctx context.Context, - rt *pb.RuleType, - pbuild *providers.ProviderBuilder, + ruletype *pb.RuleType, + provider provinfv1.Provider, ) (engif.Evaluator, error) { - e := rt.Def.GetEval() + e := ruletype.Def.GetEval() if e == nil { return nil, fmt.Errorf("rule type missing eval configuration") } // TODO: make this more generic and/or use constants - switch rt.Def.Eval.Type { + switch ruletype.Def.Eval.Type { case "jq": - if rt.Def.Eval.GetJq() == nil { + if ruletype.Def.Eval.GetJq() == nil { return nil, fmt.Errorf("rule type engine missing jq configuration") } return jq.NewJQEvaluator(e.GetJq()) case rego.RegoEvalType: return rego.NewRegoEvaluator(e.GetRego()) case vulncheck.VulncheckEvalType: - client, err := pbuild.GetGitHub() + client, err := provinfv1.As[provinfv1.GitHub](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement git trait") } return vulncheck.NewVulncheckEvaluator(client) case trusty.TrustyEvalType: - client, err := pbuild.GetGitHub() + client, err := provinfv1.As[provinfv1.GitHub](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement git trait") } return trusty.NewTrustyEvaluator(ctx, client) case application.HomoglyphsEvalType: - client, err := pbuild.GetGitHub() + client, err := provinfv1.As[provinfv1.GitHub](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement git trait") } return application.NewHomoglyphsEvaluator(e.GetHomoglyphs(), client) default: - return nil, fmt.Errorf("unsupported rule type engine: %s", rt.Def.Eval.Type) + return nil, fmt.Errorf("unsupported rule type engine: %s", ruletype.Def.Eval.Type) } } diff --git a/internal/engine/executor.go b/internal/engine/executor.go index b4b78ab3ba..da058b94b9 100644 --- a/internal/engine/executor.go +++ b/internal/engine/executor.go @@ -21,12 +21,9 @@ import ( "time" "github.com/ThreeDotsLabs/watermill/message" - gogithub "github.com/google/go-github/v61/github" "github.com/google/uuid" "github.com/rs/zerolog" - serverconfig "github.com/stacklok/minder/internal/config/server" - "github.com/stacklok/minder/internal/crypto" "github.com/stacklok/minder/internal/db" "github.com/stacklok/minder/internal/engine/actions/alert" "github.com/stacklok/minder/internal/engine/actions/remediate" @@ -36,11 +33,9 @@ import ( engif "github.com/stacklok/minder/internal/engine/interfaces" "github.com/stacklok/minder/internal/events" minderlogger "github.com/stacklok/minder/internal/logger" - "github.com/stacklok/minder/internal/providers" - "github.com/stacklok/minder/internal/providers/github" - "github.com/stacklok/minder/internal/providers/ratecache" - providertelemetry "github.com/stacklok/minder/internal/providers/telemetry" + "github.com/stacklok/minder/internal/providers/manager" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + provinfv1 "github.com/stacklok/minder/pkg/providers/v1" ) const ( @@ -56,78 +51,35 @@ const ( type Executor struct { querier db.Store evt events.Publisher - crypteng crypto.Engine - provMt providertelemetry.ProviderMetrics - mdws []message.HandlerMiddleware + handlerMiddleware []message.HandlerMiddleware wgEntityEventExecution *sync.WaitGroup // terminationcontext is used to terminate the executor // when the server is shutting down. - terminationcontext context.Context - restClientCache ratecache.RestClientCache - provCfg *serverconfig.ProviderConfig - providerStore providers.ProviderStore - fallbackTokenClient *gogithub.Client -} - -// ExecutorOption is a function that modifies an executor -type ExecutorOption func(*Executor) - -// WithProviderMetrics sets the provider metrics for the executor -func WithProviderMetrics(mt providertelemetry.ProviderMetrics) ExecutorOption { - return func(e *Executor) { - e.provMt = mt - } -} - -// WithMiddleware sets the aggregator middleware for the executor -func WithMiddleware(mdw message.HandlerMiddleware) ExecutorOption { - return func(e *Executor) { - e.mdws = append(e.mdws, mdw) - } + terminationcontext context.Context + providerManager manager.ProviderManager } // NewExecutor creates a new executor func NewExecutor( ctx context.Context, querier db.Store, - authCfg *serverconfig.AuthConfig, - provCfg *serverconfig.ProviderConfig, evt events.Publisher, - providerStore providers.ProviderStore, - restClientCache ratecache.RestClientCache, - opts ...ExecutorOption, -) (*Executor, error) { - crypteng, err := crypto.EngineFromAuthConfig(authCfg) - if err != nil { - return nil, err - } - - fallbackTokenClient := github.NewFallbackTokenClient(*provCfg) - - e := &Executor{ + providerManager manager.ProviderManager, + handlerMiddleware []message.HandlerMiddleware, +) *Executor { + return &Executor{ querier: querier, - crypteng: crypteng, - provMt: providertelemetry.NewNoopMetrics(), evt: evt, wgEntityEventExecution: &sync.WaitGroup{}, terminationcontext: ctx, - mdws: []message.HandlerMiddleware{}, - provCfg: provCfg, - providerStore: providerStore, - fallbackTokenClient: fallbackTokenClient, - restClientCache: restClientCache, - } - - for _, opt := range opts { - opt(e) + handlerMiddleware: handlerMiddleware, + providerManager: providerManager, } - - return e, nil } // Register implements the Consumer interface. func (e *Executor) Register(r events.Registrar) { - r.Register(events.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.mdws...) + r.Register(events.TopicQueueEntityEvaluate, e.HandleEntityEvent, e.handlerMiddleware...) } // Wait waits for all the entity executions to finish. @@ -135,6 +87,10 @@ func (e *Executor) Wait() { e.wgEntityEventExecution.Wait() } +// TODO: We should consider decoupling the event processing from the business +// logic - if there is a failure in the business logic, it can cause the tests +// to hang instead of failing. + // HandleEntityEvent handles events coming from webhooks/signals // as well as the init event. func (e *Executor) HandleEntityEvent(msg *message.Message) error { @@ -169,7 +125,7 @@ func (e *Executor) HandleEntityEvent(msg *message.Message) error { return } - err := e.prepAndEvalEntityEvent(ctx, inf) + err := e.evalEntityEvent(ctx, inf) // record telemetry regardless of error. We explicitly record telemetry // here even though we also record it in the middleware because the evaluation @@ -192,45 +148,20 @@ func (e *Executor) HandleEntityEvent(msg *message.Message) error { return nil } -func (e *Executor) prepAndEvalEntityEvent(ctx context.Context, inf *entities.EntityInfoWrapper) error { - provider, err := e.providerStore.GetByID(ctx, inf.ProviderID) - if err != nil { - return fmt.Errorf("error getting provider: %w", err) - } - - pbOpts := []providers.ProviderBuilderOption{ - providers.WithProviderMetrics(e.provMt), - providers.WithRestClientCache(e.restClientCache), - } - cli, err := providers.GetProviderBuilder(ctx, *provider, e.querier, e.crypteng, e.provCfg, e.fallbackTokenClient, pbOpts...) - if err != nil { - return fmt.Errorf("error building client: %w", err) - } - - ectx := &EntityContext{ - Project: Project{ - ID: inf.ProjectID, - }, - Provider: Provider{ - Name: provider.Name, - }, - } - - return e.evalEntityEvent(ctx, inf, ectx, cli) -} -func (e *Executor) evalEntityEvent( - ctx context.Context, - inf *entities.EntityInfoWrapper, - ectx *EntityContext, - cli *providers.ProviderBuilder, -) error { +func (e *Executor) evalEntityEvent(ctx context.Context, inf *entities.EntityInfoWrapper) error { logger := zerolog.Ctx(ctx).Info(). Str("entity_type", inf.Type.ToString()). Str("execution_id", inf.ExecutionID.String()). Str("provider_id", inf.ProviderID.String()). Str("project_id", inf.ProjectID.String()) logger.Msg("entity evaluation - started") + + provider, err := e.providerManager.InstantiateFromID(ctx, inf.ProviderID) + if err != nil { + return fmt.Errorf("could not instantiate provider: %w", err) + } + // This is a cache, so we can avoid querying the ingester upstream // for every rule. We use a sync.Map because it's safe for concurrent // access. @@ -262,7 +193,7 @@ func (e *Executor) evalEntityEvent( // Let's evaluate all the rules for this profile err = TraverseRules(relevant, func(rule *pb.Profile_Rule) error { // Get the engine evaluator for this rule type - evalParams, rte, err := e.getEvaluator(ctx, inf, ectx, cli, profile, rule, ingestCache) + evalParams, rte, err := e.getEvaluator(ctx, inf, provider, profile, rule, ingestCache) if err != nil { return err } @@ -298,9 +229,7 @@ func (e *Executor) evalEntityEvent( func (e *Executor) getEvaluator( ctx context.Context, inf *entities.EntityInfoWrapper, - ectx *EntityContext, - - cli *providers.ProviderBuilder, + provider provinfv1.Provider, profile *pb.Profile, rule *pb.Profile_Rule, ingestCache ingestcache.Cache, @@ -315,7 +244,7 @@ func (e *Executor) getEvaluator( // TODO(jaosorior): Rule types should be cached in memory so // we don't have to query the database for each rule. dbrt, err := e.querier.GetRuleTypeByName(ctx, db.GetRuleTypeByNameParams{ - ProjectID: ectx.Project.ID, + ProjectID: inf.ProjectID, Name: rule.Type, }) if err != nil { @@ -337,7 +266,7 @@ func (e *Executor) getEvaluator( params.RuleType = rt // Create the rule type engine - rte, err := NewRuleTypeEngine(ctx, profile, rt, cli) + rte, err := NewRuleTypeEngine(ctx, profile, rt, provider) if err != nil { return nil, nil, fmt.Errorf("error creating rule type engine: %w", err) } diff --git a/internal/engine/executor_test.go b/internal/engine/executor_test.go index fd22ded394..be23f49552 100644 --- a/internal/engine/executor_test.go +++ b/internal/engine/executor_test.go @@ -22,6 +22,7 @@ import ( "testing" "time" + "github.com/ThreeDotsLabs/watermill/message" "github.com/google/uuid" "github.com/sqlc-dev/pqtype" "github.com/stretchr/testify/require" @@ -31,6 +32,7 @@ import ( mockdb "github.com/stacklok/minder/database/mock" serverconfig "github.com/stacklok/minder/internal/config/server" + "github.com/stacklok/minder/internal/controlplane/metrics" "github.com/stacklok/minder/internal/crypto" "github.com/stacklok/minder/internal/db" "github.com/stacklok/minder/internal/engine" @@ -40,9 +42,15 @@ import ( "github.com/stacklok/minder/internal/events" "github.com/stacklok/minder/internal/logger" "github.com/stacklok/minder/internal/providers" + "github.com/stacklok/minder/internal/providers/github/clients" + ghmanager "github.com/stacklok/minder/internal/providers/github/manager" + ghService "github.com/stacklok/minder/internal/providers/github/service" + "github.com/stacklok/minder/internal/providers/manager" "github.com/stacklok/minder/internal/providers/ratecache" + "github.com/stacklok/minder/internal/providers/telemetry" "github.com/stacklok/minder/internal/util/testqueue" minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + provinfv1 "github.com/stacklok/minder/pkg/providers/v1" ) const ( @@ -108,6 +116,12 @@ func TestExecutor_handleEntityEvent(t *testing.T) { ID: providerID, Name: providerName, ProjectID: projectID, + Class: db.ProviderClassGithub, + Version: provinfv1.V1, + Implements: []db.ProviderType{ + db.ProviderTypeGithub, + }, + Definition: json.RawMessage(`{"github": {}}`), }, nil) // get access token @@ -150,7 +164,7 @@ func TestExecutor_handleEntityEvent(t *testing.T) { Valid: true, }, ContextualRules: pqtype.NullRawMessage{ - RawMessage: json.RawMessage(marshalledCRS), + RawMessage: marshalledCRS, Valid: true, }, }, @@ -188,7 +202,7 @@ default allow = true`, ID: ruleTypeID, Name: passthroughRuleType, ProjectID: projectID, - Definition: json.RawMessage(marshalledRTD), + Definition: marshalledRTD, }, nil) ruleEvalId := uuid.New() @@ -292,13 +306,42 @@ default allow = true`, ctx, cancel := context.WithTimeout(context.Background(), testTimeout) defer cancel() - e, err := engine.NewExecutor(ctx, + // Needed to keep these tests working as-is. + // In future, beef up unit test coverage in the dependencies + // of this code, and refactor these tests to use stubs. + eng, err := crypto.EngineFromAuthConfig(&serverconfig.AuthConfig{TokenKey: tokenKeyPath}) + require.NoError(t, err) + ghProviderService := ghService.NewGithubProviderService( mockStore, - &serverconfig.AuthConfig{TokenKey: tokenKeyPath}, + eng, + metrics.NewNoopMetrics(), + // These nil dependencies do not matter for the current tests + nil, + nil, + clients.NewGitHubClientFactory(telemetry.NewNoopMetrics()), + ) + + githubProviderManager := ghmanager.NewGitHubProviderClassManager( + &ratecache.NoopRestClientCache{}, + clients.NewGitHubClientFactory(telemetry.NewNoopMetrics()), &serverconfig.ProviderConfig{}, + nil, + eng, + nil, + mockStore, + ghProviderService, + ) + + providerStore := providers.NewProviderStore(mockStore) + providerManager, err := manager.NewProviderManager(providerStore, githubProviderManager) + require.NoError(t, err) + + e := engine.NewExecutor( + ctx, + mockStore, evt, - providers.NewProviderStore(mockStore), - &ratecache.NoopRestClientCache{}, + providerManager, + []message.HandlerMiddleware{}, ) require.NoError(t, err, "expected no error") diff --git a/internal/engine/ingester/artifact/artifact_test.go b/internal/engine/ingester/artifact/artifact_test.go index 9e15c3c41e..5ff7fb110b 100644 --- a/internal/engine/ingester/artifact/artifact_test.go +++ b/internal/engine/ingester/artifact/artifact_test.go @@ -29,6 +29,7 @@ import ( "github.com/stacklok/minder/internal/providers/credentials" "github.com/stacklok/minder/internal/providers/github/clients" mockghclient "github.com/stacklok/minder/internal/providers/github/mock" + "github.com/stacklok/minder/internal/providers/ratecache" "github.com/stacklok/minder/internal/providers/telemetry" "github.com/stacklok/minder/internal/verifier/verifyif" mockverify "github.com/stacklok/minder/internal/verifier/verifyif/mock" @@ -47,7 +48,7 @@ func testGithubProvider() (provinfv1.GitHub, error) { &pb.GitHubProviderConfig{ Endpoint: baseURL, }, - nil, + &ratecache.NoopRestClientCache{}, credentials.NewGitHubTokenCredential("token"), clients.NewGitHubClientFactory(telemetry.NewNoopMetrics()), "", diff --git a/internal/engine/ingester/ingester.go b/internal/engine/ingester/ingester.go index 72c40fbf74..617f3d2097 100644 --- a/internal/engine/ingester/ingester.go +++ b/internal/engine/ingester/ingester.go @@ -18,6 +18,7 @@ package ingester import ( + "errors" "fmt" "github.com/stacklok/minder/internal/engine/ingester/artifact" @@ -26,8 +27,8 @@ import ( "github.com/stacklok/minder/internal/engine/ingester/git" "github.com/stacklok/minder/internal/engine/ingester/rest" engif "github.com/stacklok/minder/internal/engine/interfaces" - "github.com/stacklok/minder/internal/providers" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + provinfv1 "github.com/stacklok/minder/pkg/providers/v1" ) // test that the ingester implementations implements the interface @@ -38,7 +39,7 @@ var _ engif.Ingester = (*rest.Ingestor)(nil) // NewRuleDataIngest creates a new rule data ingest based no the given rule // type definition. -func NewRuleDataIngest(rt *pb.RuleType, pbuild *providers.ProviderBuilder) (engif.Ingester, error) { +func NewRuleDataIngest(rt *pb.RuleType, provider provinfv1.Provider) (engif.Ingester, error) { ing := rt.Def.GetIngest() switch ing.GetType() { @@ -46,9 +47,9 @@ func NewRuleDataIngest(rt *pb.RuleType, pbuild *providers.ProviderBuilder) (engi if rt.Def.Ingest.GetRest() == nil { return nil, fmt.Errorf("rule type engine missing rest configuration") } - client, err := pbuild.GetHTTP() + client, err := provinfv1.As[provinfv1.REST](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement rest trait") } return rest.NewRestRuleDataIngest(ing.GetRest(), client) @@ -62,22 +63,22 @@ func NewRuleDataIngest(rt *pb.RuleType, pbuild *providers.ProviderBuilder) (engi if rt.Def.Ingest.GetArtifact() == nil { return nil, fmt.Errorf("rule type engine missing artifact configuration") } - client, err := pbuild.GetGitHub() + client, err := provinfv1.As[provinfv1.GitHub](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement github trait") } return artifact.NewArtifactDataIngest(client) case git.GitRuleDataIngestType: - client, err := pbuild.GetGit() + client, err := provinfv1.As[provinfv1.Git](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement git trait") } return git.NewGitIngester(ing.GetGit(), client) case diff.DiffRuleDataIngestType: - client, err := pbuild.GetGitHub() + client, err := provinfv1.As[provinfv1.GitHub](provider) if err != nil { - return nil, fmt.Errorf("could not instantiate provider: %w", err) + return nil, errors.New("provider does not implement github trait") } return diff.NewDiffIngester(ing.GetDiff(), client) default: diff --git a/internal/engine/ingester/ingester_test.go b/internal/engine/ingester/ingester_test.go index dea1c93d59..b7c215cbe3 100644 --- a/internal/engine/ingester/ingester_test.go +++ b/internal/engine/ingester/ingester_test.go @@ -18,22 +18,19 @@ package ingester import ( - "database/sql" - "encoding/json" "testing" "github.com/stretchr/testify/require" - serverconfig "github.com/stacklok/minder/internal/config/server" - "github.com/stacklok/minder/internal/db" "github.com/stacklok/minder/internal/engine/ingester/artifact" "github.com/stacklok/minder/internal/engine/ingester/builtin" "github.com/stacklok/minder/internal/engine/ingester/git" "github.com/stacklok/minder/internal/engine/ingester/rest" - "github.com/stacklok/minder/internal/providers" "github.com/stacklok/minder/internal/providers/credentials" + "github.com/stacklok/minder/internal/providers/github/clients" + "github.com/stacklok/minder/internal/providers/ratecache" + "github.com/stacklok/minder/internal/providers/telemetry" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" - provifv1 "github.com/stacklok/minder/pkg/providers/v1" ) func TestNewRuleDataIngest(t *testing.T) { @@ -161,37 +158,24 @@ func TestNewRuleDataIngest(t *testing.T) { t.Run(tt.name, func(t *testing.T) { t.Parallel() - got, err := NewRuleDataIngest(tt.args.rt, providers.NewProviderBuilder( - &db.Provider{ - Name: "github", - Version: provifv1.V1, - Implements: []db.ProviderType{ - "rest", - "git", - "github", - }, - Definition: json.RawMessage(`{ - "rest": { - "endpoint": "https://api.github.com/repos/Foo/Bar" - }, - "git": {}, - "github": {} -}`), - }, - sql.NullString{}, - false, + client, err := clients.NewRestClient( + &pb.GitHubProviderConfig{}, + &ratecache.NoopRestClientCache{}, credentials.NewGitHubTokenCredential("token"), - &serverconfig.ProviderConfig{}, - nil, // this is unused here - )) + clients.NewGitHubClientFactory(telemetry.NewNoopMetrics()), + "", + ) + require.NoError(t, err) + + ingester, err := NewRuleDataIngest(tt.args.rt, client) if tt.wantErr { require.Error(t, err, "Expected error") - require.Nil(t, got, "Expected nil") + require.Nil(t, ingester, "Expected nil") return } require.NoError(t, err, "Unexpected error") - require.NotNil(t, got, "Expected non-nil") + require.NotNil(t, ingester, "Expected non-nil") }) } } diff --git a/internal/engine/ingester/rest/rest_test.go b/internal/engine/ingester/rest/rest_test.go index 8a479a2d3c..9f444f8f4a 100644 --- a/internal/engine/ingester/rest/rest_test.go +++ b/internal/engine/ingester/rest/rest_test.go @@ -31,6 +31,7 @@ import ( "github.com/stacklok/minder/internal/providers/credentials" "github.com/stacklok/minder/internal/providers/github/clients" httpclient "github.com/stacklok/minder/internal/providers/http" + "github.com/stacklok/minder/internal/providers/ratecache" "github.com/stacklok/minder/internal/providers/telemetry" pb "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" provifv1 "github.com/stacklok/minder/pkg/providers/v1" @@ -112,7 +113,7 @@ func testGithubProviderBuilder(baseURL string) (provifv1.REST, error) { &pb.GitHubProviderConfig{ Endpoint: baseURL, }, - nil, + &ratecache.NoopRestClientCache{}, credentials.NewGitHubTokenCredential("token"), clients.NewGitHubClientFactory(telemetry.NewNoopMetrics()), "", diff --git a/internal/engine/rule_type_engine.go b/internal/engine/rule_type_engine.go index 23e3c34fb8..3b1d48a65b 100644 --- a/internal/engine/rule_type_engine.go +++ b/internal/engine/rule_type_engine.go @@ -31,8 +31,8 @@ import ( "github.com/stacklok/minder/internal/engine/ingestcache" "github.com/stacklok/minder/internal/engine/ingester" engif "github.com/stacklok/minder/internal/engine/interfaces" - "github.com/stacklok/minder/internal/providers" minderv1 "github.com/stacklok/minder/pkg/api/protobuf/go/minder/v1" + provinfv1 "github.com/stacklok/minder/pkg/providers/v1" ) // RuleMeta is the metadata for a rule @@ -60,20 +60,20 @@ func (r *RuleMeta) String() string { type RuleTypeEngine struct { Meta RuleMeta - // rdi is the rule data ingest engine - rdi engif.Ingester + // ingester is the rule data ingest engine + ingester engif.Ingester - // reval is the rule evaluator - reval engif.Evaluator + // ruleEvaluator is the rule evaluator + ruleEvaluator engif.Evaluator - // rae is the rule actions engine - rae *actions.RuleActionsEngine + // actionsEngine is the rule actions engine + actionsEngine *actions.RuleActionsEngine - rval *RuleValidator + ruleValidator *RuleValidator - rt *minderv1.RuleType + ruletype *minderv1.RuleType - cli *providers.ProviderBuilder + //provider provinfv1.Provider ingestCache ingestcache.Cache } @@ -81,45 +81,45 @@ type RuleTypeEngine struct { // NewRuleTypeEngine creates a new rule type engine func NewRuleTypeEngine( ctx context.Context, - p *minderv1.Profile, - rt *minderv1.RuleType, - cli *providers.ProviderBuilder, + profile *minderv1.Profile, + ruletype *minderv1.RuleType, + provider provinfv1.Provider, ) (*RuleTypeEngine, error) { - rval, err := NewRuleValidator(rt) + rval, err := NewRuleValidator(ruletype) if err != nil { return nil, fmt.Errorf("cannot create rule validator: %w", err) } - rdi, err := ingester.NewRuleDataIngest(rt, cli) + rdi, err := ingester.NewRuleDataIngest(ruletype, provider) if err != nil { return nil, fmt.Errorf("cannot create rule data ingest: %w", err) } - reval, err := eval.NewRuleEvaluator(ctx, rt, cli) + reval, err := eval.NewRuleEvaluator(ctx, ruletype, provider) if err != nil { return nil, fmt.Errorf("cannot create rule evaluator: %w", err) } - ae, err := actions.NewRuleActions(p, rt, cli) + ae, err := actions.NewRuleActions(profile, ruletype, provider) if err != nil { return nil, fmt.Errorf("cannot create rule actions engine: %w", err) } rte := &RuleTypeEngine{ Meta: RuleMeta{ - Name: rt.Name, + Name: ruletype.Name, }, - rval: rval, - rdi: rdi, - reval: reval, - rae: ae, - rt: rt, - cli: cli, + ruleValidator: rval, + ingester: rdi, + ruleEvaluator: reval, + actionsEngine: ae, + ruletype: ruletype, + //cli: cli, ingestCache: ingestcache.NewNoopCache(), } - if rt.Context.Project != nil && *rt.Context.Project != "" { - prj := strings.Clone(*rt.Context.Project) + if ruletype.Context.Project != nil && *ruletype.Context.Project != "" { + prj := strings.Clone(*ruletype.Context.Project) rte.Meta.Project = &prj } else { return nil, fmt.Errorf("rule type context must have a project") @@ -143,7 +143,7 @@ func (r *RuleTypeEngine) GetID() string { // GetRuleInstanceValidator returns the rule instance validator for this rule type. // By instance we mean a rule that has been instantiated in a profile from a given rule type. func (r *RuleTypeEngine) GetRuleInstanceValidator() *RuleValidator { - return r.rval + return r.ruleValidator } // Eval runs the rule type engine against the given entity @@ -154,17 +154,17 @@ func (r *RuleTypeEngine) Eval(ctx context.Context, inf *entities.EntityInfoWrapp logger.Info().Msg("entity evaluation - ingest started") // Try looking at the ingesting cache first - result, ok := r.ingestCache.Get(r.rdi, inf.Entity, params.GetRule().Params) + result, ok := r.ingestCache.Get(r.ingester, inf.Entity, params.GetRule().Params) if !ok { var err error // Ingest the data needed for the rule evaluation - result, err = r.rdi.Ingest(ctx, inf.Entity, params.GetRule().Params.AsMap()) + result, err = r.ingester.Ingest(ctx, inf.Entity, params.GetRule().Params.AsMap()) if err != nil { // Ingesting failed, so we can't evaluate the rule. // Note that for some types of ingesting the evalErr can already be set from the ingester. return fmt.Errorf("error ingesting data: %w", err) } - r.ingestCache.Set(r.rdi, inf.Entity, params.GetRule().Params, result) + r.ingestCache.Set(r.ingester, inf.Entity, params.GetRule().Params, result) } else { logger.Info().Str("id", r.GetID()).Msg("entity evaluation - ingest using cache") } @@ -173,7 +173,7 @@ func (r *RuleTypeEngine) Eval(ctx context.Context, inf *entities.EntityInfoWrapp // Process evaluation logger.Info().Msg("entity evaluation - evaluation started") - err := r.reval.Eval(ctx, params.GetRule().Def.AsMap(), result) + err := r.ruleEvaluator.Eval(ctx, params.GetRule().Def.AsMap(), result) logger.Info().Msg("entity evaluation - evaluation completed") return err } @@ -185,12 +185,12 @@ func (r *RuleTypeEngine) Actions( params engif.ActionsParams, ) enginerr.ActionsError { // Process actions - return r.rae.DoActions(ctx, inf.Entity, params) + return r.actionsEngine.DoActions(ctx, inf.Entity, params) } // GetActionsOnOff returns the on/off state of the actions func (r *RuleTypeEngine) GetActionsOnOff() map[engif.ActionType]engif.ActionOpt { - return r.rae.GetOnOffState() + return r.actionsEngine.GetOnOffState() } // RuleDefFromDB converts a rule type definition from the database to a protobuf diff --git a/internal/service/service.go b/internal/service/service.go index 440a275085..4116051ada 100644 --- a/internal/service/service.go +++ b/internal/service/service.go @@ -20,6 +20,7 @@ import ( "context" "fmt" + "github.com/ThreeDotsLabs/watermill/message" "golang.org/x/sync/errgroup" "github.com/stacklok/minder/internal/auth" @@ -63,7 +64,7 @@ func AllInOneServerService( idClient auth.Resolver, serverMetrics metrics.Metrics, providerMetrics provtelemetry.ProviderMetrics, - executorOpts []engine.ExecutorOption, + executorMiddleware []message.HandlerMiddleware, ) error { errg, ctx := errgroup.WithContext(ctx) @@ -149,13 +150,15 @@ func AllInOneServerService( evt.ConsumeEvents(aggr) // prepend the aggregator to the executor options - executorOpts = append([]engine.ExecutorOption{engine.WithMiddleware(aggr.AggregateMiddleware)}, - executorOpts...) + executorMiddleware = append([]message.HandlerMiddleware{aggr.AggregateMiddleware}, executorMiddleware...) - exec, err := engine.NewExecutor(ctx, store, &cfg.Auth, &cfg.Provider, evt, providerStore, restClientCache, executorOpts...) - if err != nil { - return fmt.Errorf("unable to create executor: %w", err) - } + exec := engine.NewExecutor( + ctx, + store, + evt, + providerManager, + executorMiddleware, + ) evt.ConsumeEvents(exec)