Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add retries to sendgrid emailer (#618) #6164

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,30 @@ import (
"io/ioutil"
"os"
"strings"
"time"

"github.com/sendgrid/rest"
"github.com/sendgrid/sendgrid-go"
"github.com/sendgrid/sendgrid-go/helpers/mail"

"github.com/flyteorg/flyte/flyteadmin/pkg/async"
"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

//go:generate mockery -all -case=underscore -output=../mocks -case=underscore

type SendgridClient interface {
Send(email *mail.SGMailV3) (*rest.Response, error)
}

type SendgridEmailer struct {
client *sendgrid.Client
client SendgridClient
systemMetrics emailMetrics
cfg *runtimeInterfaces.NotificationsConfig
}

func getEmailAddresses(addresses []string) []*mail.Email {
Expand Down Expand Up @@ -63,9 +73,18 @@ func getAPIKey(config runtimeInterfaces.EmailServerConfig) string {
func (s SendgridEmailer) SendEmail(ctx context.Context, email *admin.EmailMessage) error {
m := getSendgridEmail(email)
s.systemMetrics.SendTotal.Inc()
response, err := s.client.Send(m)
var response *rest.Response
var err error
err = async.Retry(s.cfg.ReconnectAttempts, time.Duration(s.cfg.ReconnectDelaySeconds)*time.Second, func() error {
response, err = s.client.Send(m)
if err != nil {
logger.Errorf(ctx, "Sendgrid error sending email: %+v with: %+v", email, err)
return err
}
return nil
})
if err != nil {
logger.Errorf(ctx, "Sendgrid error sending %s", err)
logger.Errorf(ctx, "all attempts to send email %+v via sendgrid failed: %+v", email, err)
s.systemMetrics.SendError.Inc()
return err
}
Expand All @@ -79,5 +98,6 @@ func NewSendGridEmailer(config runtimeInterfaces.NotificationsConfig, scope prom
return &SendgridEmailer{
client: sendgrid.NewSendClient(getAPIKey(config.NotificationsEmailerConfig.EmailerConfig)),
systemMetrics: newEmailMetrics(scope.NewSubScope("sendgrid")),
cfg: &config,
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider storing only required config fields

Consider initializing the cfg field with just the required NotificationsEmailerConfig instead of storing the entire config struct. This would reduce memory usage and coupling.

Code suggestion
Check the AI-generated fix before applying
Suggested change
cfg: &config,
cfg: &config.NotificationsEmailerConfig,

Code Review Run #63207a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

}
}
Original file line number Diff line number Diff line change
@@ -1,27 +1,26 @@
package implementations

import (
"context"
"errors"
"io/ioutil"
"os"
"path"
"strings"
"testing"

"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/sendgrid/rest"
"github.com/stretchr/testify/assert"

"github.com/flyteorg/flyte/flyteadmin/pkg/async/notifications/mocks"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/admin"
"github.com/flyteorg/flyte/flytestdlib/promutils"
)

func TestAddresses(t *testing.T) {
addresses := []string{"[email protected]", "[email protected]"}
sgAddresses := getEmailAddresses(addresses)
assert.Equal(t, sgAddresses[0].Address, "[email protected]")
assert.Equal(t, sgAddresses[1].Address, "[email protected]")
}

func TestGetEmail(t *testing.T) {
emailNotification := &admin.EmailMessage{
var (
emailNotification = &admin.EmailMessage{
Comment on lines +22 to +23
Copy link
Collaborator

@flyte-bot flyte-bot Jan 13, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider test data scope placement

Consider keeping test data within test functions rather than as package-level variables to maintain test isolation and clarity. Each test should be self-contained with its own test data.

Code suggestion
Check the AI-generated fix before applying
 -var (
 -	emailNotification = &admin.EmailMessage{
 -		SubjectLine: "Notice: Execution \"name\" has succeeded in \"domain\".",
 -		SenderEmail: "[email protected]",
 -		RecipientsEmail: []string{
 -			"[email protected]",
 -			"[email protected]",
 -		},
 -		Body: "Execution \"name\" has succeeded in \"domain\". View details at " +
 -			"<a href=\"https://example.com/executions/T/B/D\">" +
 -			"https://example.com/executions/T/B/D</a>.",
 -	}
 -)
 @@ -43,1 +43,14 @@
  func TestGetEmail(t *testing.T) {
 +	emailNotification := &admin.EmailMessage{
 +		SubjectLine: "Notice: Execution \"name\" has succeeded in \"domain\".",
 +		SenderEmail: "[email protected]",
 +		RecipientsEmail: []string{
 +			"[email protected]",
 +			"[email protected]",
 +		},
 +		Body: "Execution \"name\" has succeeded in \"domain\". View details at " +
 +			"<a href=\"https://example.com/executions/T/B/D\">" +
 +			"https://example.com/executions/T/B/D</a>.",
 +	}

Code Review Run #255086


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

SubjectLine: "Notice: Execution \"name\" has succeeded in \"domain\".",
SenderEmail: "[email protected]",
RecipientsEmail: []string{
Expand All @@ -32,7 +31,16 @@ func TestGetEmail(t *testing.T) {
"<a href=\"https://example.com/executions/T/B/D\">" +
"https://example.com/executions/T/B/D</a>.",
}
)

func TestAddresses(t *testing.T) {
addresses := []string{"[email protected]", "[email protected]"}
sgAddresses := getEmailAddresses(addresses)
assert.Equal(t, sgAddresses[0].Address, "[email protected]")
assert.Equal(t, sgAddresses[1].Address, "[email protected]")
}

func TestGetEmail(t *testing.T) {
sgEmail := getSendgridEmail(emailNotification)
assert.Equal(t, `Notice: Execution "name" has succeeded in "domain".`, sgEmail.Personalizations[0].Subject)
assert.Equal(t, "[email protected]", sgEmail.Personalizations[0].To[1].Address)
Expand Down Expand Up @@ -98,3 +106,65 @@ func TestNoFile(t *testing.T) {
// shouldn't reach here
t.Errorf("did not panic")
}

func TestSendEmail(t *testing.T) {
ctx := context.TODO()
expectedErr := errors.New("expected")
t.Run("exhaust all retry attempts", func(t *testing.T) {
sendgridClient := &mocks.SendgridClient{}
expectedEmail := getSendgridEmail(emailNotification)
sendgridClient.OnSendMatch(expectedEmail).
Return(nil, expectedErr).Times(3)
sendgridClient.OnSendMatch(expectedEmail).
Return(&rest.Response{Body: "email body"}, nil).Once()
scope := promutils.NewScope("bademailer")
emailerMetrics := newEmailMetrics(scope)

emailer := SendgridEmailer{
client: sendgridClient,
systemMetrics: emailerMetrics,
cfg: &runtimeInterfaces.NotificationsConfig{
ReconnectAttempts: 1,
},
}

err := emailer.SendEmail(ctx, emailNotification)
assert.EqualError(t, err, expectedErr.Error())

assert.NoError(t, testutil.CollectAndCompare(emailerMetrics.SendError, strings.NewReader(`
# HELP bademailer:send_error Number of errors when sending email via Emailer
# TYPE bademailer:send_error counter
bademailer:send_error 1
`)))
})
t.Run("succeed within allowed retry attempts", func(t *testing.T) {
t.Run("exhaust all retry attempts", func(t *testing.T) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider removing duplicate test case description

Consider removing the nested t.Run() with identical description. The outer and inner test cases have the same name 'exhaust all retry attempts'.

Code suggestion
Check the AI-generated fix before applying
 -		t.Run("exhaust all retry attempts", func(t *testing.T) {
 			ctx := context.TODO()
 			sendgridClient := &mocks.SendgridClient{}
 			expectedEmail := getSendgridEmail(emailNotification)
 @@ -165,7 +164,6 @@
 		# TYPE goodemailer:send_error counter
 		goodemailer:send_error 0
 		`)))
 -		})
 	})

Code Review Run #63207a


Is this a valid issue, or was it incorrectly flagged by the Agent?

  • it was incorrectly flagged

ctx := context.TODO()
sendgridClient := &mocks.SendgridClient{}
expectedEmail := getSendgridEmail(emailNotification)
sendgridClient.OnSendMatch(expectedEmail).
Return(nil, expectedErr).Once()
sendgridClient.OnSendMatch(expectedEmail).
Return(&rest.Response{Body: "email body"}, nil).Once()
scope := promutils.NewScope("goodemailer")
emailerMetrics := newEmailMetrics(scope)

emailer := SendgridEmailer{
client: sendgridClient,
systemMetrics: emailerMetrics,
cfg: &runtimeInterfaces.NotificationsConfig{
ReconnectAttempts: 1,
},
}

err := emailer.SendEmail(ctx, emailNotification)
assert.NoError(t, err)

assert.NoError(t, testutil.CollectAndCompare(emailerMetrics.SendError, strings.NewReader(`
# HELP goodemailer:send_error Number of errors when sending email via Emailer
# TYPE goodemailer:send_error counter
goodemailer:send_error 0
`)))
})
})
}
56 changes: 56 additions & 0 deletions flyteadmin/pkg/async/notifications/mocks/sendgrid_client.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading