Skip to content

Commit

Permalink
fix: close SMTP submission correctly to handle errors (prometheus#4006)
Browse files Browse the repository at this point in the history
* fix: close SMTP submission correctly to handle errors

Signed-off-by: Danny Kopping <[email protected]>

* lint

Signed-off-by: Danny Kopping <[email protected]>

* comments

Signed-off-by: Danny Kopping <[email protected]>

---------

Signed-off-by: Danny Kopping <[email protected]>
Signed-off-by: Yevhen Sydorenko <[email protected]>
  • Loading branch information
dannykopping authored and mogoll92 committed Aug 30, 2024
1 parent ae07f25 commit c07eec8
Show file tree
Hide file tree
Showing 4 changed files with 182 additions and 17 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ require (
github.com/cenkalti/backoff/v4 v4.3.0
github.com/cespare/xxhash/v2 v2.3.0
github.com/coder/quartz v0.1.0
github.com/emersion/go-smtp v0.21.3
github.com/go-kit/log v0.2.1
github.com/go-openapi/analysis v0.23.0
github.com/go-openapi/errors v0.22.0
Expand Down Expand Up @@ -62,6 +63,7 @@ require (
github.com/coreos/go-systemd/v22 v22.5.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/docker/go-units v0.5.0 // indirect
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 // indirect
github.com/go-logfmt/logfmt v0.5.1 // indirect
github.com/go-logr/logr v1.4.1 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
4 changes: 4 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,10 @@ github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4=
github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21 h1:OJyUGMJTzHTd1XQp98QTaHernxMYzRaOasRir9hUlFQ=
github.com/emersion/go-sasl v0.0.0-20200509203442-7bfe0ed36a21/go.mod h1:iL2twTeMvZnrg54ZoPDNfJaJaqy0xIQFuBdrLsmspwQ=
github.com/emersion/go-smtp v0.21.3 h1:7uVwagE8iPYE48WhNsng3RRpCUpFvNl39JGNSIyGVMY=
github.com/emersion/go-smtp v0.21.3/go.mod h1:qm27SGYgoIPRot6ubfQ/GpiPy/g3PaZAVRxiO/sDUgQ=
github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98=
Expand Down
16 changes: 15 additions & 1 deletion notify/email/email.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"net/textproto"
"os"
"strings"
"sync"
"time"

"github.com/go-kit/log"
Expand Down Expand Up @@ -242,7 +243,15 @@ func (n *Email) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
if err != nil {
return true, fmt.Errorf("send DATA command: %w", err)
}
defer message.Close()
closeOnce := sync.OnceValue(func() error {
return message.Close()
})
// Close the message when this method exits in order to not leak resources. Even though we're calling this explicitly
// further down, the method may exit before then.
defer func() {
// If we try close an already-closed writer, it'll send a subsequent request to the server which is invalid.
_ = closeOnce()
}()

buffer := &bytes.Buffer{}
for header, t := range n.conf.Headers {
Expand Down Expand Up @@ -331,6 +340,11 @@ func (n *Email) Notify(ctx context.Context, as ...*types.Alert) (bool, error) {
return false, fmt.Errorf("write body buffer: %w", err)
}

// Complete the message and await response.
if err = closeOnce(); err != nil {
return true, fmt.Errorf("delivery failure: %w", err)
}

success = true
return false, nil
}
Expand Down
177 changes: 161 additions & 16 deletions notify/email/email_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,16 +32,22 @@ import (
"context"
"fmt"
"io"
"net"
"net/http"
"net/url"
"os"
"strconv"
"strings"
"testing"
"time"

"github.com/emersion/go-smtp"
"github.com/go-kit/log"
commoncfg "github.com/prometheus/common/config"
"github.com/prometheus/common/model"

// nolint:depguard // require cannot be called outside the main goroutine: https://pkg.go.dev/testing#T.FailNow
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"

Expand Down Expand Up @@ -165,29 +171,16 @@ func notifyEmail(cfg *config.EmailConfig, server *mailDev) (*email, bool, error)
// notifyEmailWithContext sends a notification with one firing alert and retrieves the
// email from the SMTP server if the notification has been successfully delivered.
func notifyEmailWithContext(ctx context.Context, cfg *config.EmailConfig, server *mailDev) (*email, bool, error) {
if cfg.RequireTLS == nil {
cfg.RequireTLS = new(bool)
}
if cfg.Headers == nil {
cfg.Headers = make(map[string]string)
}
firingAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{},
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
},
}
err := server.deleteAllEmails()
tmpl, firingAlert, err := prepare(cfg)
if err != nil {
return nil, false, err
}

tmpl, err := template.FromGlobs([]string{})
err = server.deleteAllEmails()
if err != nil {
return nil, false, err
}
tmpl.ExternalURL, _ = url.Parse("http://am")

email := New(cfg, tmpl, log.NewNopLogger())

retry, err := email.Notify(ctx, firingAlert)
Expand All @@ -204,6 +197,34 @@ func notifyEmailWithContext(ctx context.Context, cfg *config.EmailConfig, server
return e, retry, nil
}

func prepare(cfg *config.EmailConfig) (*template.Template, *types.Alert, error) {
if cfg == nil {
panic("nil config passed")
}

if cfg.RequireTLS == nil {
cfg.RequireTLS = new(bool)
}
if cfg.Headers == nil {
cfg.Headers = make(map[string]string)
}

tmpl, err := template.FromGlobs([]string{})
if err != nil {
return nil, nil, err
}
tmpl.ExternalURL, _ = url.Parse("http://am")

firingAlert := &types.Alert{
Alert: model.Alert{
Labels: model.LabelSet{},
StartsAt: time.Now(),
EndsAt: time.Now().Add(time.Hour),
},
}
return tmpl, firingAlert, nil
}

// TestEmailNotifyWithErrors tries to send emails with buggy inputs.
func TestEmailNotifyWithErrors(t *testing.T) {
cfgFile := os.Getenv(emailNoAuthConfigVar)
Expand Down Expand Up @@ -643,3 +664,127 @@ func TestEmailNoUsernameStillOk(t *testing.T) {
require.NoError(t, err)
require.Nil(t, a)
}

// TestEmailRejected simulates the failure of an otherwise valid message submission which fails at a later point than
// was previously expected by the code.
func TestEmailRejected(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), time.Second*10)
t.Cleanup(cancel)

// Setup mock SMTP server which will reject at the DATA stage.
srv, l, err := mockSMTPServer(t)
require.NoError(t, err)
t.Cleanup(func() {
// We expect that the server has already been closed in the test.
require.ErrorIs(t, srv.Shutdown(ctx), smtp.ErrServerClosed)
})

done := make(chan any, 1)
go func() {
// nolint:testifylint // require cannot be called outside the main goroutine: https://pkg.go.dev/testing#T.FailNow
assert.NoError(t, srv.Serve(l))
close(done)
}()

// Wait for mock SMTP server to become ready.
require.Eventuallyf(t, func() bool {
c, err := smtp.Dial(srv.Addr)
if err != nil {
t.Logf("dial failed to %q: %s", srv.Addr, err)
return false
}

// Ping.
if err = c.Noop(); err != nil {
t.Logf("ping failed to %q: %s", srv.Addr, err)
return false
}

// Ensure we close the connection to not prevent server from shutting down cleanly.
if err = c.Close(); err != nil {
t.Logf("close failed to %q: %s", srv.Addr, err)
return false
}

return true
}, time.Second*10, time.Millisecond*100, "mock SMTP server failed to start")

// Use mock SMTP server and prepare alert to be sent.
require.IsType(t, &net.TCPAddr{}, l.Addr())
addr := l.Addr().(*net.TCPAddr)
cfg := &config.EmailConfig{
Smarthost: config.HostPort{Host: addr.IP.String(), Port: strconv.Itoa(addr.Port)},
Hello: "localhost",
Headers: make(map[string]string),
From: "alertmanager@system",
To: "sre@company",
}
tmpl, firingAlert, err := prepare(cfg)
require.NoError(t, err)

e := New(cfg, tmpl, log.NewNopLogger())

// Send the alert to mock SMTP server.
retry, err := e.Notify(context.Background(), firingAlert)
require.ErrorContains(t, err, "501 5.5.4 Rejected!")
require.True(t, retry)
require.NoError(t, srv.Shutdown(ctx))

require.Eventuallyf(t, func() bool {
<-done
return true
}, time.Second*10, time.Millisecond*100, "mock SMTP server goroutine failed to close in time")
}

func mockSMTPServer(t *testing.T) (*smtp.Server, net.Listener, error) {
t.Helper()

// Listen on the next available high port.
l, err := net.Listen("tcp", "localhost:0")
if err != nil {
return nil, nil, fmt.Errorf("connect: %w", err)
}

addr, ok := l.Addr().(*net.TCPAddr)
if !ok {
return nil, nil, fmt.Errorf("unexpected address type: %T", l.Addr())
}

s := smtp.NewServer(&rejectingBackend{})
s.Addr = addr.String()
s.WriteTimeout = 10 * time.Second
s.ReadTimeout = 10 * time.Second

return s, l, nil
}

// rejectingBackend will reject submission at the DATA stage.
type rejectingBackend struct{}

func (b *rejectingBackend) NewSession(c *smtp.Conn) (smtp.Session, error) {
return &mockSMTPSession{
conn: c,
backend: b,
}, nil
}

type mockSMTPSession struct {
conn *smtp.Conn
backend smtp.Backend
}

func (s *mockSMTPSession) Mail(string, *smtp.MailOptions) error {
return nil
}

func (s *mockSMTPSession) Rcpt(string, *smtp.RcptOptions) error {
return nil
}

func (s *mockSMTPSession) Data(io.Reader) error {
return &smtp.SMTPError{Code: 501, EnhancedCode: smtp.EnhancedCode{5, 5, 4}, Message: "Rejected!"}
}

func (*mockSMTPSession) Reset() {}

func (*mockSMTPSession) Logout() error { return nil }

0 comments on commit c07eec8

Please sign in to comment.