Skip to content

Commit

Permalink
feat: update subscription flow and refactor plugin receiver
Browse files Browse the repository at this point in the history
  • Loading branch information
mabdh committed Nov 4, 2022
1 parent cd15c56 commit 94f9fb2
Show file tree
Hide file tree
Showing 151 changed files with 5,483 additions and 8,442 deletions.
3 changes: 2 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ NAME="github.com/odpf/siren"
LAST_COMMIT := $(shell git rev-parse --short HEAD)
LAST_TAG := "$(shell git rev-list --tags --max-count=1)"
APP_VERSION := "$(shell git describe --tags ${LAST_TAG})-next"
PROTON_COMMIT := "d6ec8837cf20f75fbaa1e834789caa650ee4378d"
PROTON_COMMIT := "3e995d8f0898b07bd35576b00012718cd7682b2f"

.PHONY: all build test clean dist vet proto install

Expand All @@ -23,6 +23,7 @@ coverage: ## Print code coverage
go test -race -coverprofile coverage.out -covermode=atomic ./... && go tool cover -html=coverage.out

generate: ## run all go generate in the code base (including generating mock files)
find . -type d -name "mocks" | xargs rm -r
go generate ./...

lint: ## lint checker
Expand Down
92 changes: 22 additions & 70 deletions cli/deps.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
package cli

import (
"fmt"

"github.com/odpf/salt/log"
"github.com/odpf/siren/config"
"github.com/odpf/siren/core/alert"
Expand All @@ -15,8 +13,6 @@ import (
"github.com/odpf/siren/core/template"
"github.com/odpf/siren/internal/api"
"github.com/odpf/siren/internal/store/postgres"
"github.com/odpf/siren/pkg/httpclient"
"github.com/odpf/siren/pkg/retry"
"github.com/odpf/siren/pkg/secret"
"github.com/odpf/siren/plugins/providers/cortex"
"github.com/odpf/siren/plugins/receivers/file"
Expand All @@ -25,88 +21,56 @@ import (
"github.com/odpf/siren/plugins/receivers/slack"
)

type ReceiverClient struct {
SlackClient *slack.Client
PagerDutyClient *pagerduty.Client
HTTPReceiverClient *httpreceiver.Client
}

type ProviderClient struct {
CortexClient *cortex.Client
}

func InitAPIDeps(
logger log.Logger,
cfg config.Config,
pgClient *postgres.Client,
encryptor *secret.Crypto,
queue notification.Queuer,
) (*api.Deps, *ReceiverClient, *ProviderClient, map[string]notification.Notifier, error) {
) (*api.Deps, map[string]notification.Notifier, error) {
templateRepository := postgres.NewTemplateRepository(pgClient)
templateService := template.NewService(templateRepository)

alertRepository := postgres.NewAlertRepository(pgClient)
alertHistoryService := alert.NewService(alertRepository)

providerRepository := postgres.NewProviderRepository(pgClient)
providerService := provider.NewService(providerRepository)

namespaceRepository := postgres.NewNamespaceRepository(pgClient)
namespaceService := namespace.NewService(encryptor, namespaceRepository)

cortexClient, err := cortex.NewClient(cortex.Config{Address: cfg.Cortex.Address})
if err != nil {
return nil, nil, nil, nil, fmt.Errorf("failed to init cortex client: %w", err)
}
cortexProviderService := cortex.NewProviderService(cortexClient)
cortexPluginService := cortex.NewPluginService(logger, cfg.Providers.Cortex)

alertHistoryService := alert.NewService(alertRepository, map[string]alert.AlertTransformer{
provider.TypeCortex: cortexPluginService,
})
namespaceService := namespace.NewService(encryptor, namespaceRepository, providerService, map[string]namespace.ConfigSyncer{
provider.TypeCortex: cortexPluginService,
})

ruleRepository := postgres.NewRuleRepository(pgClient)
ruleService := rule.NewService(
ruleRepository,
templateService,
namespaceService,
map[string]rule.RuleUploader{
provider.TypeCortex: cortexProviderService,
provider.TypeCortex: cortexPluginService,
},
)

// plugin receiver services
slackHTTPClient := httpclient.New(cfg.Receivers.Slack.HTTPClient)
slackRetrier := retry.New(cfg.Receivers.Slack.Retry)
slackClient := slack.NewClient(
cfg.Receivers.Slack,
slack.ClientWithHTTPClient(slackHTTPClient),
slack.ClientWithRetrier(slackRetrier),
)
pagerdutyHTTPClient := httpclient.New(cfg.Receivers.Pagerduty.HTTPClient)
pagerdutyRetrier := retry.New(cfg.Receivers.Slack.Retry)
pagerdutyClient := pagerduty.NewClient(
cfg.Receivers.Pagerduty,
pagerduty.ClientWithHTTPClient(pagerdutyHTTPClient),
pagerduty.ClientWithRetrier(pagerdutyRetrier),
)
httpreceiverHTTPClient := httpclient.New(cfg.Receivers.HTTPReceiver.HTTPClient)
httpreceiverRetrier := retry.New(cfg.Receivers.Slack.Retry)
httpreceiverClient := httpreceiver.NewClient(
logger,
cfg.Receivers.HTTPReceiver,
httpreceiver.ClientWithHTTPClient(httpreceiverHTTPClient),
httpreceiver.ClientWithRetrier(httpreceiverRetrier),
)

slackReceiverService := slack.NewReceiverService(slackClient, encryptor)
httpReceiverService := httpreceiver.NewReceiverService()
pagerDutyReceiverService := pagerduty.NewReceiverService()
fileReceiverService := file.NewReceiverService()
slackPluginService := slack.NewPluginService(cfg.Receivers.Slack, encryptor)
pagerDutyPluginService := pagerduty.NewPluginService(cfg.Receivers.Pagerduty)
httpreceiverPluginService := httpreceiver.NewPluginService(logger, cfg.Receivers.HTTPReceiver)
filePluginService := file.NewPluginService()

receiverRepository := postgres.NewReceiverRepository(pgClient)
receiverService := receiver.NewService(
receiverRepository,
map[string]receiver.ConfigResolver{
receiver.TypeSlack: slackReceiverService,
receiver.TypeHTTP: httpReceiverService,
receiver.TypePagerDuty: pagerDutyReceiverService,
receiver.TypeFile: fileReceiverService,
receiver.TypeSlack: slackPluginService,
receiver.TypeHTTP: httpreceiverPluginService,
receiver.TypePagerDuty: pagerDutyPluginService,
receiver.TypeFile: filePluginService,
},
)

Expand All @@ -115,20 +79,14 @@ func InitAPIDeps(
subscriptionRepository,
namespaceService,
receiverService,
subscription.RegisterProviderPlugin(provider.TypeCortex, cortexProviderService),
)

// notification
slackNotificationService := slack.NewNotificationService(slackClient, encryptor)
pagerdutyNotificationService := pagerduty.NewNotificationService(pagerdutyClient)
httpreceiverNotificationService := httpreceiver.NewNotificationService(httpreceiverClient)
fileNotificationService := file.NewNotificationService()

notifierRegistry := map[string]notification.Notifier{
receiver.TypeSlack: slackNotificationService,
receiver.TypePagerDuty: pagerdutyNotificationService,
receiver.TypeHTTP: httpreceiverNotificationService,
receiver.TypeFile: fileNotificationService,
receiver.TypeSlack: slackPluginService,
receiver.TypePagerDuty: pagerDutyPluginService,
receiver.TypeHTTP: httpreceiverPluginService,
receiver.TypeFile: filePluginService,
}

notificationService := notification.NewService(logger, queue, receiverService, subscriptionService, notifierRegistry)
Expand All @@ -142,12 +100,6 @@ func InitAPIDeps(
ReceiverService: receiverService,
SubscriptionService: subscriptionService,
NotificationService: notificationService,
}, &ReceiverClient{
SlackClient: slackClient,
PagerDutyClient: pagerdutyClient,
HTTPReceiverClient: httpreceiverClient,
}, &ProviderClient{
CortexClient: cortexClient,
}, notifierRegistry,
nil
}
3 changes: 2 additions & 1 deletion cli/receiver.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,8 @@ func updateReceiverCmd(cmdxConfig *cmdx.Config) *cobra.Command {
Short: "Edit a receiver",
Long: heredoc.Doc(`
Edit an existing receiver.
Note: receiver type is immutable.
`),
Annotations: map[string]string{
"group:core": "true",
Expand Down Expand Up @@ -283,7 +285,6 @@ func updateReceiverCmd(cmdxConfig *cmdx.Config) *cobra.Command {
_, err = client.UpdateReceiver(ctx, &sirenv1beta1.UpdateReceiverRequest{
Id: id,
Name: receiverConfig.Name,
Type: receiverConfig.Type,
Configurations: grpcConfigurations,
Labels: receiverConfig.Labels,
})
Expand Down
1 change: 1 addition & 0 deletions cli/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ func New() *cobra.Command {
rootCmd.AddCommand(receiversCmd(cmdxConfig))
rootCmd.AddCommand(templatesCmd(cmdxConfig))
rootCmd.AddCommand(rulesCmd(cmdxConfig))
rootCmd.AddCommand(subscriptionsCmd(cmdxConfig))
rootCmd.AddCommand(alertsCmd(cmdxConfig))
rootCmd.AddCommand(jobCmd(cmdxConfig))
rootCmd.AddCommand(workerCmd())
Expand Down
2 changes: 1 addition & 1 deletion cli/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ func StartServer(ctx context.Context, cfg config.Config) error {
dlq = inmemory.New(logger, 10)
}

apiDeps, _, _, notifierRegistry, err := InitAPIDeps(logger, cfg, pgClient, encryptor, queue)
apiDeps, notifierRegistry, err := InitAPIDeps(logger, cfg, pgClient, encryptor, queue)
if err != nil {
return err
}
Expand Down
Loading

0 comments on commit 94f9fb2

Please sign in to comment.